The initial requirement was deceptively simple: add collaborative text editing to an existing internal tool. Our first pass was a naive implementation using WebSocket broadcasts with a last-write-wins strategy. It fell apart within minutes of internal testing. Race conditions, overwritten text, and complete document desynchronization were rampant. The subsequent exploration into Operational Transformation (OT) revealed a level of complexity and server-side state management that felt too brittle for our small team to maintain. This led us down the path of Conflict-free Replicated Data Types (CRDTs), which promised eventual consistency without a complex central coordinator.
Our technology stack was unconventional. The backend was already Elixir, a choice we made for its proven real-time capabilities via Phoenix. For the client, we had a strategic goal to unify our web and future mobile development, making Dart the logical candidate. For UI velocity, Tailwind CSS was the non-negotiable standard across our projects. The real challenge wasn’t just implementing a CRDT, but integrating these three disparate ecosystems into a resilient, production-ready system.
We decided on a log-structured sequence CRDT, similar in spirit to Logoot. This approach avoids the large state payloads of state-based CRDTs by exchanging small, immutable operations. Each character in the document is given a unique, fractional position identifier that allows for concurrent insertions without conflict.
The Elixir Backbone: An Isolated Process Per Document
The actor model in Elixir/Erlang is a perfect fit for this problem. A document is a self-contained unit of state. Mapping each active collaborative document to its own GenServer
process provides natural state isolation and fault tolerance. If one document’s process crashes, it has zero impact on others. This architecture scales horizontally with remarkable simplicity.
The core of the backend is the DocumentServer
. It’s a GenServer
responsible for holding the document’s state (our sequence CRDT) and broadcasting operations to all connected clients.
# lib/collaborative_editor/documents/document_server.ex
defmodule CollaborativeEditor.Documents.DocumentServer do
use GenServer
require Logger
# Opaque document state. In a real app, this would be persisted.
@typep document_state :: list(%{id: map(), char: String.t()})
# GenServer API
def start_link(opts) do
doc_id = Keyword.fetch!(opts, :doc_id)
GenServer.start_link(__MODULE__, doc_id, name: via_tuple(doc_id))
end
def subscribe(doc_id) do
GenServer.call(via_tuple(doc_id), :subscribe)
end
def apply_op(doc_id, op) do
# Use cast to avoid blocking the client's channel process.
# The broadcast will confirm the operation asynchronously.
GenServer.cast(via_tuple(doc_id), {:apply_op, op})
end
# Helper to resolve the process name
defp via_tuple(doc_id) do
{:via, Registry, {CollaborativeEditor.DocumentRegistry, doc_id}}
end
# GenServer Callbacks
@impl true
def init(doc_id) do
Logger.info("DocumentServer started for doc_id: #{doc_id}")
# In a production system, you'd load the document from a database here.
# For this example, we start with an empty document.
initial_state = [%{id: %{pos: [0]}, char: "<s>"}, %{id: %{pos: [1]}, char: "</s>"}] # Start/End markers
{:ok, %{doc_id: doc_id, state: initial_state, subscribers: MapSet.new()}}
end
@impl true
def handle_call(:subscribe, {from_pid, _ref}, state) do
new_subscribers = MapSet.put(state.subscribers, from_pid)
# Give the new subscriber the full current state to initialize.
{:reply, {:ok, state.state}, %{state | subscribers: new_subscribers}}
end
@impl true
def handle_cast({:apply_op, op}, state) do
case apply_operation(state.state, op) do
{:ok, new_state} ->
broadcast(state.subscribers, {:new_op, op})
# Persist new_state to the database asynchronously here.
{:noreply, %{state | state: new_state}}
{:error, reason} ->
Logger.error("Failed to apply op for doc #{state.doc_id}: #{inspect(reason)}")
# We don't change state if the op is invalid.
{:noreply, state}
end
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
Logger.info("Subscriber #{inspect(pid)} for doc #{state.doc_id} went down.")
new_subscribers = MapSet.delete(state.subscribers, pid)
{:noreply, %{state | subscribers: new_subscribers}}
end
# --- Private CRDT Logic ---
# This is a simplified insertion logic. A real Logoot implementation is more complex.
# It finds the position between prev_pos and next_pos and inserts the new character.
defp apply_operation(doc_state, %{"type" => "ins", "char" => char, "pos" => new_pos, "prev_pos" => prev_pos, "next_pos" => next_pos}) do
# In a real system, you'd perform rigorous validation of the operation here.
# Is the position valid? Is the prev/next correct?
prev_index = find_index_by_pos(doc_state, prev_pos)
next_index = find_index_by_pos(doc_state, next_pos)
# Basic check for consistency. A production implementation needs more robust validation.
if prev_index + 1 == next_index do
new_char_data = %{id: %{pos: new_pos}, char: char}
new_state = List.insert_at(doc_state, next_index, new_char_data)
{:ok, new_state}
else
{:error, :position_conflict}
end
end
# Handle deletions by replacing char with a tombstone, not removing it.
# This prevents position identifiers from becoming invalid.
defp apply_operation(doc_state, %{"type" => "del", "pos" => pos_to_delete}) do
case find_index_by_pos(doc_state, pos_to_delete) do
nil -> {:error, :char_not_found}
index ->
# A real implementation would use a tombstone marker.
# For simplicity, we filter it out, but this is not a robust CRDT practice
# without handling regeneration of positions. We are keeping it simple here.
new_state = List.delete_at(doc_state, index)
{:ok, new_state}
end
end
defp find_index_by_pos(doc_state, pos) do
Enum.find_index(doc_state, fn elem -> elem.id.pos == pos end)
end
defp broadcast(subscribers, message) do
for pid <- subscribers do
send(pid, message)
end
end
end
The corresponding Phoenix Channel is the bridge between the WebSocket connection and the DocumentServer
. Its primary job is to authenticate the user, join them to the correct document’s process, and forward messages between the two.
# lib/collaborative_editor_web/channels/document_channel.ex
defmodule CollaborativeEditorWeb.DocumentChannel do
use Phoenix.Channel
alias CollaborativeEditor.Documents.DocumentServer
def join("doc:" <> doc_id, _payload, socket) do
# In a real-world project, you'd check if the user has permission to access this document.
# This is a critical security step.
# Monitor the channel process from the DocumentServer
{:ok, initial_state} = DocumentServer.subscribe(doc_id)
send(self(), {:after_join, initial_state})
{:ok, %{doc_id: doc_id}, socket}
end
# Handle the initial state push after joining
def handle_info({:after_join, initial_state}, socket) do
push(socket, "initial_state", %{state: initial_state})
{:noreply, socket}
end
# Handle incoming ops from the client
def handle_in("new_op", payload, socket) do
# The client sends an operation, we pass it to the GenServer.
# The GenServer will handle validation and broadcasting.
DocumentServer.apply_op(socket.assigns.doc_id, payload)
{:noreply, socket}
end
# Handle broadcasts from the DocumentServer
def handle_info({:new_op, op}, socket) do
# Only push the op to the client if they are not the original sender.
# This is an optimization; the client should ideally handle its own ops idempotently.
# if op["siteId"] != socket.assigns.site_id do
# push(socket, "remote_op", op)
# end
# For simplicity, we broadcast to all, including the sender,
# which also serves as an acknowledgement.
push(socket, "remote_op", op)
{:noreply, socket}
end
def terminate(reason, socket) do
Logger.warn("Channel for doc #{socket.assigns.doc_id} terminated: #{inspect(reason)}")
:ok
end
end
This architecture is robust. The channel process is lightweight, acting only as a messenger. The stateful, heavy work is done in the DocumentServer
, which is supervised and isolated. The use of a Registry
provides a clean way to map a public doc_id
to a running process PID.
The Dart Client: Managing State and Real-time Communication
On the client side, Dart is responsible for three things:
- Maintaining the local copy of the CRDT data structure.
- Communicating with the Elixir backend via the Phoenix Channel WebSocket protocol.
- Rendering the document and handling user input.
A key pitfall in collaborative systems is mixing view state with model state. Our Dart CRDT model is a pure data structure, completely unaware of the DOM.
// lib/crdt/logoot.dart
import 'dart:math';
// A unique identifier for a character in the document
class PositionIdentifier {
final List<int> path;
final String siteId; // Unique ID for each client
PositionIdentifier(this.path, this.siteId);
// Implement comparison logic for sorting
int compareTo(PositionIdentifier other) {
for (var i = 0; i < min(path.length, other.path.length); i++) {
if (path[i] != other.path[i]) {
return path[i].compareTo(other.path[i]);
}
}
if (path.length != other.path.length) {
return path.length.compareTo(other.path.length);
}
return siteId.compareTo(other.siteId);
}
Map<String, dynamic> toJson() => {'pos': path};
}
// Represents a character and its position
class CRDTChar {
final PositionIdentifier id;
final String value;
bool isVisible = true; // For tombstones
CRDTChar(this.id, this.value);
}
// The main CRDT document model
class LogootDocument {
final String siteId;
List<CRDTChar> _chars = [];
// Start and end markers
LogootDocument(this.siteId) {
_chars.add(CRDTChar(PositionIdentifier([0], "SYSTEM"), "<s>"));
_chars.add(CRDTChar(PositionIdentifier([1], "SYSTEM"), "</s>"));
}
String get text => _chars.where((c) => c.isVisible).map((c) => c.value).join('');
// Apply a remote operation received from the server
void applyRemoteOperation(Map<String, dynamic> op) {
// A production implementation requires robust validation and error handling
if (op['type'] == 'ins') {
final newPos = PositionIdentifier((op['pos'] as List).cast<int>(), op['siteId']);
final newChar = CRDTChar(newPos, op['char']);
// Find insertion point and insert
final index = _findInsertIndex(newChar);
_chars.insert(index, newChar);
} else if (op['type'] == 'del') {
// Deletion is handled by finding the char by position and marking it invisible
// This is a simplification; a real system needs tombstones.
}
}
// Generate an insert operation from local user input
Map<String, dynamic> generateInsertOp(int index, String char) {
// This is the core of the Logoot algorithm: generating a new
// fractional position between two existing characters.
final prev = _chars[index - 1];
final next = _chars[index];
final newPos = _generatePositionBetween(prev.id, next.id);
final op = {
"type": "ins",
"char": char,
"pos": newPos.path,
"prev_pos": prev.id.path,
"next_pos": next.id.path,
"siteId": siteId
};
// Apply locally immediately for responsive UI
applyRemoteOperation(op);
return op;
}
int _findInsertIndex(CRDTChar char) {
// Binary search would be more efficient here.
for (var i = 0; i < _chars.length; i++) {
if (char.id.compareTo(_chars[i].id) < 0) {
return i;
}
}
return _chars.length;
}
PositionIdentifier _generatePositionBetween(PositionIdentifier pos1, PositionIdentifier pos2) {
// This is a highly simplified position generation algorithm.
// A robust one needs to handle depth, boundaries, and random allocation
// to avoid generating excessively long position paths.
List<int> newPath = [];
int p1Head = pos1.path.isNotEmpty ? pos1.path[0] : 0;
int p2Head = pos2.path.isNotEmpty ? pos2.path[0] : 1;
// For simplicity, we just add a random number between. This is not production-ready.
int newHead = p1Head + 1 + Random().nextInt(1000);
newPath.add(newHead);
return PositionIdentifier(newPath, siteId);
}
}
Connecting to the Phoenix Channel from Dart is straightforward with a standard WebSocket library. The complexity is in handling the Phoenix Channel protocol (joins, heartbeats, replies) and robustly managing the connection state.
// lib/services/channel_connector.dart
import 'dart:convert';
import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';
class PhoenixChannel {
final String _endpoint;
WebSocketChannel? _channel;
Timer? _heartbeatTimer;
int _ref = 1;
final StreamController<Map<String, dynamic>> _messages = StreamController.broadcast();
Stream<Map<String, dynamic>> get messages => _messages.stream;
PhoenixChannel(this._endpoint);
void connect(String topic) {
_channel = WebSocketChannel.connect(Uri.parse(_endpoint));
_channel!.stream.listen(_onMessage, onError: _onError, onDone: _onDone);
// Join the channel
_joinTopic(topic);
}
void push(String event, Map<String, dynamic> payload) {
if (_channel == null) return;
final message = jsonEncode({
"topic": "doc:123", // Hardcoded for this example
"event": event,
"payload": payload,
"ref": (_ref++).toString()
});
_channel!.sink.add(message);
}
void _joinTopic(String topic) {
final joinMsg = jsonEncode({
"topic": topic,
"event": "phx_join",
"payload": {},
"ref": (_ref++).toString()
});
_channel!.sink.add(joinMsg);
}
void _startHeartbeat() {
_heartbeatTimer = Timer.periodic(Duration(seconds: 30), (_) {
push("heartbeat", {});
});
}
void _onMessage(dynamic message) {
final decoded = jsonDecode(message);
if (decoded['event'] == 'phx_reply' && decoded['payload']['status'] == 'ok') {
// Successful join, start heartbeat
_startHeartobin();
}
// Add any message with a payload to the stream for the app to consume
if(decoded['payload'] != null){
_messages.add(decoded as Map<String, dynamic>);
}
}
void _onError(error) {
print("WebSocket error: $error");
_reconnect();
}
void _onDone() {
print("WebSocket connection closed.");
_heartbeatTimer?.cancel();
_reconnect();
}
void _reconnect() {
// Implement a robust reconnection strategy with exponential backoff.
// This is critical for production clients.
print("Attempting to reconnect...");
Future.delayed(Duration(seconds: 5), () => connect("doc:123"));
}
void dispose() {
_heartbeatTimer?.cancel();
_channel?.sink.close();
_messages.close();
}
}
UI Integration: The Tailwind CSS Build Process Hurdle
This was where we hit our first significant tooling problem. Tailwind’s Just-In-Time (JIT) compiler works by scanning source files for class names. Dart web projects compile to JavaScript and don’t typically have raw HTML files in the source tree that Tailwind can easily scan. We had to create a custom build pipeline.
Our solution was to configure tailwind.config.js
to scan the Dart files themselves for string literals containing Tailwind classes.
// tailwind.config.js
module.exports = {
content: [
'./web/**/*.html',
'./lib/**/*.dart', // Scan our Dart files for class names
],
theme: {
extend: {},
},
plugins: [],
}
We then used npm
scripts to run the Tailwind CLI in watch mode in parallel with the Dart web development server.
// package.json
{
"scripts": {
"serve": "webdev serve",
"watch:css": "tailwindcss -i ./web/styles/input.css -o ./web/styles/output.css --watch",
"dev": "npm-run-all --parallel serve watch:css"
},
"devDependencies": {
"npm-run-all": "^4.1.5",
"tailwindcss": "^3.0.0"
}
}
This setup, while functional, is fragile. A developer must remember to run npm run dev
instead of just the standard webdev serve
. It’s a pragmatic solution, but in a larger team, this would need to be baked into a more robust, unified build tool.
The UI itself was then straightforward to build. We used a simple contenteditable
div and attached event listeners to it. The key was to ensure that any change to the LogootDocument
model triggered a re-render of the view.
// web/main.dart
import 'dart:html';
import 'package:collaborative_editor/crdt/logoot.dart';
import 'package:collaborative_editor/services/channel_connector.dart';
void main() {
final editorElement = querySelector('#editor') as DivElement;
final document = LogootDocument('site-${DateTime.now().millisecondsSinceEpoch}'); // Generate a unique site ID
final channel = PhoenixChannel('ws://localhost:4000/socket/websocket');
// Listen for remote operations
channel.messages.listen((message) {
if (message['event'] == 'remote_op') {
document.applyRemoteOperation(message['payload']);
render(editorElement, document);
} else if (message['event'] == 'initial_state') {
// Here you would parse the full initial state and build the document
}
});
channel.connect('doc:123');
// Listen for local user input
editorElement.onInput.listen((event) {
// This is a naive diffing logic. A production system would use a more
// sophisticated library to calculate the precise changes.
// For this example, we assume single character insertion.
final op = document.generateInsertOp(editorElement.text?.length ?? 0, 'new-char');
channel.push('new_op', op);
render(editorElement, document);
});
}
void render(DivElement element, LogootDocument doc) {
// A huge performance pitfall here: re-rendering the entire text on every change.
// A real application needs a virtual DOM or a more granular update mechanism.
element.text = doc.text;
}
The flow of data is now complete. A keystroke in one client generates a CRDT operation, which is applied locally for immediate feedback, sent to the Elixir server, broadcast to all other clients, and finally applied to their local CRDT models, triggering a UI update.
sequenceDiagram participant ClientA as Dart Client A participant Elixir as Phoenix Backend participant ClientB as Dart Client B ClientA->>+Elixir: Joins "doc:123" channel Elixir-->>-ClientA: Join OK, sends initial state ClientB->>+Elixir: Joins "doc:123" channel Elixir-->>-ClientB: Join OK, sends initial state Note over ClientA: User types 'A' ClientA->>ClientA: Generate CRDT op (insert 'A') ClientA->>ClientA: Apply op to local model & update UI ClientA->>Elixir: Pushes op via WebSocket Elixir->>Elixir: DocumentServer receives op Elixir->>Elixir: Applies op to master CRDT state Elixir-->>ClientA: Broadcasts op (acts as ack) Elixir-->>ClientB: Broadcasts op to other subscriber Note over ClientB: Receives op ClientB->>ClientB: Applies op to local CRDT model ClientB->>ClientB: Renders updated document in UI
This architecture works. It provides a highly available, fault-tolerant backend and a responsive client experience. However, this implementation is far from complete. The CRDT logic for generating positions is naive and could lead to performance degradation over time as position identifiers grow. Deletion is not fully implemented with tombstones, which means the document state can only grow, a significant issue in a long-running system. The client-side rendering is inefficient and would not scale to large documents. Furthermore, synchronizing user cursors and text selections is an entirely separate and complex problem that has not been addressed. The current solution forms a solid foundation, but these lingering issues are the difference between a functional proof-of-concept and a truly production-grade collaborative tool.