The project’s central failure point was state divergence. We were building a collaborative planning tool where multiple users could manipulate a shared dataset in real-time. Our initial V1, built on a conventional REST API with client-side polling, was a predictable disaster in any scenario involving intermittent network connectivity. State on the client would drift from the server’s source of truth, leading to inconsistent views, lost updates, and data corruption that required manual intervention. A subsequent attempt with WebSockets improved latency but introduced its own state management complexity and proved fragile. The core issue remained: we were synchronizing mutable state, a fundamentally hard problem.
This led to a hard reset on our architectural approach. The pain point wasn’t the transport layer; it was the model of state itself. The new hypothesis was to treat the entire application as an immutable log of events. The server’s only job would be to validate and append events to this log, and the client’s only job would be to render a projection of this log. This event sourcing model promised perfect auditability and state consistency. The challenge was building the robust, resilient pipeline to deliver this event stream from a server to a Progressive Web App (PWA) that needed to function offline.
Our technology selection was driven entirely by this architectural principle. For the backend, we needed absolute correctness in our event model. Haskell, with its powerful type system, was the logical choice. It allows us to encode business invariants directly into our event types, making invalid state transitions a compile-time error. For pushing the event stream to the client, Server-Sent Events (SSE) offered a simpler, more resilient alternative to WebSockets. It’s a one-way channel, which perfectly mirrors the flow of an event log, and it has built-in support for reconnection, automatically handling transient network drops. On the frontend, Redux was a natural fit, as its core philosophy of state being derived from a series of actions aligns perfectly with event sourcing. Finally, a PWA architecture with a dedicated Service Worker was non-negotiable for handling the offline requirement, providing the mechanism to buffer user-generated events and reconcile state upon reconnection.
The following is a chronicle of building this system, focusing on the core implementation details of the Haskell backend, the Redux middleware, and the PWA’s reconciliation logic.
Part 1: The Haskell Backend - An Immutable Log and an SSE Stream
The foundation of the entire system is the backend’s ability to maintain a tamper-proof, ordered log of events and stream them reliably.
First, we define our event types. Using Haskell’s algebraic data types (ADTs) and Aeson
for JSON serialization gives us compile-time guarantees about the structure of our data.
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Event where
import Data.Aeson
import Data.Text (Text)
import Data.Time (UTCTime)
import GHC.Generics (Generic)
-- A unique identifier for each event
type EventId = Int
-- A unique identifier for a user-facing entity, e.g., a task in a planner
type ItemId = Text
data AppEvent =
ItemAdded
{ eventId :: EventId
, timestamp :: UTCTime
, itemId :: ItemId
, title :: Text
}
| ItemCompleted
{ eventId :: EventId
, timestamp :: UTCTime
, itemId :: ItemId
}
| ItemTitleUpdated
{ eventId :: EventId
, timestamp :: UTCTime
, itemId :: ItemId
, newTitle :: Text
}
deriving (Show, Eq, Generic)
-- Boilerplate Aeson instances for JSON conversion
instance ToJSON AppEvent
instance FromJSON AppEvent
-- Helper to extract the ID from any event type
getEventId :: AppEvent -> EventId
getEventId (ItemAdded eid _ _ _) = eid
getEventId (ItemCompleted eid _ _) = eid
getEventId (ItemTitleUpdated eid _ _ _) = eid
Next, we need an event store. For production, this would be a specialized database like EventStoreDB or a durable message queue like Kafka. For this implementation, we’ll use an in-memory store protected by a TVar
(Transactional Variable) from the stm
library for safe concurrent access. This demonstrates the core logic without introducing database dependencies.
module EventStore where
import Control.Concurrent.STM
import Control.Monad (forM_, when)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Event
-- The EventStore holds the sequence of events and a broadcast channel for new events.
data EventStore = EventStore
{ eventLog :: TVar [AppEvent]
, eventCounter :: TVar EventId
, eventChannel :: TChan AppEvent
}
-- Creates a new, empty event store
newEventStore :: IO EventStore
newEventStore = atomically $ do
log' <- newTVar []
counter' <- newTVar 0
-- Use a broadcast channel so multiple connected clients get updates
chan' <- newBroadcastTChan
return $ EventStore log' counter' chan'
-- Atomically appends a new event to the store
-- This is the single "write" point in our system.
-- The `(AppEvent -> AppEvent)` is a function that takes the generated event ID
-- and timestamp and constructs the final event.
addEvent :: EventStore -> (EventId -> UTCTime -> AppEvent) -> IO AppEvent
addEvent store eventBuilder = do
now <- getPOSIXTime
let utcNow = posixSecondsToUTCTime now
newEvent <- atomically $ do
-- Increment event ID
currentId <- readTVar (eventCounter store)
let nextId = currentId + 1
writeTVar (eventCounter store) nextId
-- Build and store the event
let finalEvent = eventBuilder nextId utcNow
modifyTVar' (eventLog store) (finalEvent :) -- Prepending is faster for lists
-- Write to the broadcast channel for live listeners
writeTChan (eventChannel store) finalEvent
return finalEvent
-- Rudimentary logging
putStrLn $ "Event persisted: " ++ show newEvent
return newEvent
-- Retrieves events that occurred after a given ID.
-- This is crucial for client reconnection and state reconciliation.
getEventsSince :: EventStore -> Maybe EventId -> IO [AppEvent]
getEventsSince store lastKnownId = atomically $ do
log' <- readTVar (eventLog store)
let sortedLog = reverse log' -- Events are stored reversed, so fix order
case lastKnownId of
Nothing -> return sortedLog -- No ID, send the whole log
Just lastId -> return $ filter (\e -> getEventId e > lastId) sortedLog
With the store in place, we build the SSE endpoint using the wai
and wai-sse
libraries. This endpoint is responsible for streaming events to the client.
{-# LANGUAGE OverloadedStrings #-}
import Wai.Application.Static (staticApp, defaultWebAppSettings)
import Network.Wai.Handler.Warp (run)
import Network.Wai (Application)
import Network.HTTP.Types (status200)
import qualified Network.Wai.Middleware.RequestLogger as Logger
import Network.Wai.EventSource (eventSourceAppIO)
import Data.Aeson (encode)
import qualified Data.Text as T
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (atomically, dupTChan, readTChan)
import Control.Monad (forever)
import Text.Read (readMaybe)
import Event
import EventStore
-- The core of the SSE endpoint logic
sseApp :: EventStore -> Application
sseApp store req sendResponse = do
-- SSE clients can provide a 'Last-Event-ID' header to resume a stream
let lastEventIdHeader = lookup "Last-Event-ID" (requestHeaders req)
let lastKnownId = lastEventIdHeader >>= (readMaybe . T.unpack . decodeUtf8)
-- First, send any missed events
missedEvents <- getEventsSince store lastKnownId
-- Now, subscribe to live events
liveChannel <- atomically $ dupTChan (eventChannel store)
eventSourceAppIO (sendMissedEvents missedEvents >> sendLiveEvents liveChannel) req sendResponse
where
-- Function to format an event into SSE message format
toSseEvent :: AppEvent -> ServerEvent
toSseEvent event = ServerEvent
{ eventName = Just "message" -- Generic event name
, eventId = Just . fromString . show . getEventId $ event
, eventData = [fromLazyByteString . encode $ event]
}
-- Stream the historical events the client missed while disconnected
sendMissedEvents :: [AppEvent] -> IO ()
sendMissedEvents events = forM_ events $ \e -> do
-- A small delay can prevent overwhelming the client on initial connection
threadDelay 50000 -- 50ms
yield $ toSseEvent e
-- Listen on the broadcast channel for new events and stream them
sendLiveEvents :: TChan AppEvent -> IO ()
sendLiveEvents chan = forever $ do
event <- atomically $ readTChan chan
yield $ toSseEvent event
-- Dummy application to add events for demonstration
-- In a real app, these would be POST requests to command endpoints
main :: IO ()
main = do
putStrLn "Starting event-sourced server on port 8080"
store <- newEventStore
-- A background thread to simulate events being created
_ <- forkIO $ forever $ do
threadDelay 5000000 -- 5 seconds
_ <- addEvent store (\eid ts -> ItemAdded eid ts "item-1" "New background task")
return ()
let app = Logger.logStdoutDev $ sseApp store
run 8080 app
This Haskell backend is now a source of truth. It validates and stores events atomically and provides a resilient streaming endpoint that clients can connect and reconnect to, ensuring they never miss an event. The use of Last-Event-ID
is critical for production-grade resilience.
Part 2: The Redux Client and SSE Middleware
On the client side, our goal is to pipe the SSE event stream directly into our Redux store. A custom middleware is the perfect place to encapsulate this logic, keeping our components clean and unaware of the data source.
First, let’s define our Redux state, actions, and reducer. This looks standard, but conceptually, the actions are direct counterparts to the events defined in Haskell.
// src/store/reducer.js
const initialState = {
items: {},
isConnected: false,
lastEventId: null,
};
// These action types correspond to events from the backend
const ActionTypes = {
SET_CONNECTION_STATUS: 'SET_CONNECTION_STATUS',
PROCESS_SERVER_EVENT: 'PROCESS_SERVER_EVENT',
SET_LAST_EVENT_ID: 'SET_LAST_EVENT_ID',
};
// Reducer logic to apply events to the state
export const rootReducer = (state = initialState, action) => {
switch (action.type) {
case ActionTypes.SET_CONNECTION_STATUS:
return { ...state, isConnected: action.payload.isConnected };
case ActionTypes.PROCESS_SERVER_EVENT:
const event = action.payload.event;
// In a real app, you'd check for event duplication using eventId
// and ensure events are processed in order.
const newItems = { ...state.items };
if (event.tag === 'ItemAdded') {
newItems[event.itemId] = { id: event.itemId, title: event.title, completed: false };
} else if (event.tag === 'ItemCompleted') {
if (newItems[event.itemId]) {
newItems[event.itemId].completed = true;
}
} else if (event.tag === 'ItemTitleUpdated') {
if (newItems[event.itemId]) {
newItems[event.itemId].title = event.newTitle;
}
}
return { ...state, items: newItems, lastEventId: event.eventId };
case ActionTypes.SET_LAST_EVENT_ID:
return { ...state, lastEventId: action.payload.id };
default:
return state;
}
};
export const setConnectionStatus = (isConnected) => ({
type: ActionTypes.SET_CONNECTION_STATUS,
payload: { isConnected },
});
export const processServerEvent = (event) => ({
type: ActionTypes.PROCESS_SERVER_EVENT,
payload: { event }
});
Now for the core client-side logic: the SSE middleware. This middleware will initialize the EventSource
, handle its lifecycle, and dispatch actions to the store.
// src/store/sseMiddleware.js
let eventSource = null;
const sseMiddleware = (store) => (next) => (action) => {
// We can add actions to explicitly connect or disconnect
if (action.type === 'SSE_CONNECT') {
// Avoid multiple connections
if (eventSource) {
eventSource.close();
}
const { lastEventId } = store.getState();
const headers = {};
// This header is the key to resilience.
if (lastEventId) {
headers['Last-Event-ID'] = lastEventId;
}
console.log(`Connecting to SSE with Last-Event-ID: ${lastEventId}`);
eventSource = new EventSource('/events', { headers });
eventSource.onopen = () => {
console.log('SSE connection established.');
store.dispatch({ type: 'SET_CONNECTION_STATUS', payload: { isConnected: true } });
};
eventSource.onerror = (err) => {
console.error('EventSource failed:', err);
store.dispatch({ type: 'SET_CONNECTION_STATUS', payload: { isConnected: false } });
// EventSource will automatically try to reconnect. We just update the UI state.
};
eventSource.onmessage = (event) => {
try {
const parsedEvent = JSON.parse(event.data);
console.log('Received server event:', parsedEvent);
// Dispatch the event to be processed by the reducer
store.dispatch({ type: 'PROCESS_SERVER_EVENT', payload: { event: parsedEvent } });
} catch (e) {
console.error('Failed to parse server event:', e);
}
};
}
return next(action);
};
export default sseMiddleware;
This middleware cleanly separates the concern of real-time communication. Components only need to dispatch a single SSE_CONNECT
action on startup. From then on, the Redux state will automatically stay in sync with the server’s event stream.
Part 3: The PWA Service Worker for Offline Resilience
The system is now real-time and resilient to transient connection drops, but it does not handle being truly offline. If the user performs an action while offline, that action is lost. This is where the Service Worker becomes critical.
The strategy is:
- The main application attempts to send commands (user actions) to the server via
fetch
. - The Service Worker intercepts these
fetch
requests. - If the application is online, the request is passed through.
- If the application is offline, the Service Worker serializes the request and stores it in an IndexedDB queue.
- When the application comes back online, a “sync” event is triggered to process the queue.
Here is the Service Worker implementation.
// public/sw.js
import { openDB } from 'idb';
const DB_NAME = 'pwa-event-sourcing-db';
const STORE_NAME = 'request-queue';
const dbPromise = openDB(DB_NAME, 1, {
upgrade(db) {
db.createObjectStore(STORE_NAME, { autoIncrement: true });
},
});
// Intercept fetch requests
self.addEventListener('fetch', (event) => {
// We only intercept command POST requests, not GETs or our SSE stream.
const { method, url } = event.request;
if (method !== 'POST' || !url.includes('/api/command')) {
return;
}
// Respond immediately to the client to keep the UI snappy
// Then, handle the request in the background
event.respondWith(
(async () => {
try {
const response = await fetch(event.request.clone());
// If online and successful, we are done
return response;
} catch (error) {
// If offline, queue the request
console.log('Fetch failed, client likely offline. Queuing request.');
await queueRequest(event.request);
// Return a synthetic "Accepted" response to the client
return new Response(JSON.stringify({ status: 'queued' }), {
status: 202,
headers: { 'Content-Type': 'application/json' },
});
}
})()
);
});
async function queueRequest(request) {
const db = await dbPromise;
const tx = db.transaction(STORE_NAME, 'readwrite');
const store = tx.objectStore(STORE_NAME);
// We need to serialize the request to store it.
const serializedRequest = {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers.entries()),
body: await request.text(), // Assuming JSON body
};
await store.add(serializedRequest);
await tx.done;
// Trigger a background sync to process the queue when connection is restored
self.registration.sync.register('process-request-queue');
}
// Listen for the sync event
self.addEventListener('sync', (event) => {
if (event.tag === 'process-request-queue') {
event.waitUntil(processQueue());
}
});
async function processQueue() {
const db = await dbPromise;
const tx = db.transaction(STORE_NAME, 'readwrite');
const store = tx.objectStore(STORE_NAME);
let cursor = await store.openCursor();
while (cursor) {
const { key, value: req } = cursor;
try {
console.log('Processing queued request:', req);
await fetch(new Request(req.url, {
method: req.method,
headers: req.headers,
body: req.body,
}));
// If successful, remove from the queue
await store.delete(key);
} catch (error) {
console.error('Failed to process queued request, will retry on next sync.', error);
// If it fails again, it will remain in the queue for the next sync event.
// A more robust solution might have a retry limit.
return;
}
cursor = await store.openCursor();
}
}
This Service Worker, combined with the Background Sync API, makes the application robust. The user can continue working offline, and their actions are queued. When they reconnect, the Service Worker replays these actions against the server. Because our SSE middleware is already handling incoming events and reconciliation, the UI will seamlessly update once the queued commands are processed by the server and broadcast back as events.
Part 4: Visualizing the State Reconciliation Flow
The full reconciliation process upon reconnection is the most complex interaction in this architecture. A Mermaid diagram helps clarify the sequence.
sequenceDiagram participant ClientApp as Client (Redux) participant ServiceWorker as Service Worker participant Server as Haskell Backend Note over ClientApp, Server: Client is offline. User performs Action A. ClientApp->>ServiceWorker: POST /api/command (Action A) ServiceWorker-->>ClientApp: 202 Accepted (Queued) ServiceWorker->>IndexedDB: Store Action A Note over ClientApp, Server: User performs Action B. ClientApp->>ServiceWorker: POST /api/command (Action B) ServiceWorker-->>ClientApp: 202 Accepted (Queued) ServiceWorker->>IndexedDB: Store Action B Note over ClientApp, Server: Network connection is restored. ClientApp->>Server: SSE Connect (Last-Event-ID: 100) Server-->>ClientApp: Streams events 101, 102, 103... ClientApp->>ClientApp: Redux store updates with new events ServiceWorker->>ServiceWorker: Background Sync event triggered ServiceWorker->>IndexedDB: Read Action A from queue ServiceWorker->>Server: POST /api/command (Action A) Server->>Server: Persists Event 104 Server-->>ClientApp: SSE stream sends Event 104 ClientApp->>ClientApp: Redux store updates with Event 104 ServiceWorker->>IndexedDB: Delete Action A ServiceWorker->>IndexedDB: Read Action B from queue ServiceWorker->>Server: POST /api/command (Action B) Server->>Server: Persists Event 105 Server-->>ClientApp: SSE stream sends Event 105 ClientApp->>ClientApp: Redux store updates with Event 105 ServiceWorker->>IndexedDB: Delete Action B
This flow ensures that the client first catches up on all events that happened globally while it was offline. Only then does it attempt to replay its own queued actions. This “fetch-then-replay” sequence minimizes the chance of optimistic UI updates being invalidated.
The architecture is not without its own complexities and trade-offs. The reliance on an in-memory event store in this example is a significant simplification; a production system requires a durable, scalable event log like Kafka or a dedicated database. Event schema versioning becomes a critical concern over the application’s lifecycle; careful planning is needed to ensure that changes to event structures are backward-compatible.
Furthermore, for long-running systems, replaying the entire event history for every client is inefficient. This naturally leads to the introduction of snapshotting, where the server periodically calculates and stores a snapshot of the current state, allowing clients to load a snapshot first and then apply only the events that have occurred since. Finally, this design does not explicitly handle command conflicts. If a user’s offline action is invalid by the time it’s replayed (e.g., trying to modify a deleted item), the server must have robust validation and a mechanism to report the rejection back to the user. This is a significant challenge in its own right, often pushing architectures towards a full CQRS pattern for more explicit command handling.