The user activity feed on our main dashboard was buckling under its own weight. Built on a standard Laravel stack, the initial implementation relied on frequent AJAX polling against a MySQL database that aggregated user actions. With a few dozen concurrent users, it was acceptable. As we scaled to thousands, the user_actions
table became a severe bottleneck, write contention was rampant, and the periodic polling from thousands of clients amounted to a self-inflicted DDoS attack. The user experience degraded from “live” to “live-ish” to “eventually consistent, maybe.” A fundamental architectural change was unavoidable.
Our initial concept was to decouple the event ingestion from the aggregation and presentation layers. A simple push to a queue seemed like a good start, but the core problem remained: the aggregation logic itself. Processing tens of thousands of raw events per second—calculating metrics like “views per minute,” “top active users,” and “trending content”—in PHP workers felt like using the wrong tool for the job. PHP excels as a request/response workhorse, but for a long-running, CPU-bound, stateful process, its performance and memory management characteristics are not ideal. We needed a dedicated, high-performance service for this one critical feature.
This led to a technology selection process rooted in pragmatism. The core Laravel application, representing years of development, was not going anywhere. The solution had to be an augmentation, a surgical enhancement, not a rewrite.
For the transport layer: Redis Streams over simple Pub/Sub. We needed durability. If the aggregation service went down, we couldn’t afford to lose events. Redis Pub/Sub is fire-and-forget. Redis Streams, however, provides a persistent, append-only log. This allows our new service to crash, restart, and pick up exactly where it left off. Furthermore, consumer groups would allow us to scale out the number of aggregator instances in the future without processing the same event twice.
For the aggregation service: Rust over Go or Node.js. Node.js was a contender due to its event-driven nature, but for pure computational throughput and predictable memory usage, it falls short of a compiled language. The choice between Go and Rust was more nuanced. Go offers simpler concurrency with goroutines, which was tempting. However, the critical nature of this service demanded absolute reliability. Rust’s compile-time guarantees against data races and its focus on zero-cost abstractions meant we could write highly concurrent code with a much higher degree of confidence. For a small, fire-and-forget service that must not fail or leak memory, Rust was the superior engineering choice.
For the real-time frontend delivery: Redis Pub/Sub and WebSockets. Once the Rust service aggregated the data, it needed to push the summarized results to clients. Redis Pub/Sub is perfect for this broadcast mechanism. A lightweight WebSocket server, integrated with our existing Laravel application, would subscribe to these Pub/Sub channels and forward messages to the relevant frontend clients.
For the frontend visualization: Styled-components within our existing React codebase. The frontend needed to not only display data but also react visually to its velocity and type. Styled-components allows for dynamic CSS based on props, making it trivial to implement features like flashing indicators for new data, color-coded metrics based on thresholds, or smooth animations for updating counters—all managed within the component’s logic.
The resulting architecture can be visualized as a data pipeline:
graph TD A[Laravel Application] -- "XADD user:action ..." --> B(Redis Stream: 'events_stream'); B -- "XREADGROUP consumer_group ..." --> C{Rust Aggregator Service}; C -- "In-memory Aggregation" --> C; C -- "PUBLISH dashboard:updates ..." --> D(Redis Pub/Sub: 'dashboard_updates'); E[Laravel WebSocket Server] -- "SUBSCRIBE dashboard:updates" --> D; F[React Frontend with Styled-components] -- "WebSocket Connection" --> E; E -- "Pushes Aggregated JSON" --> F;
Phase 1: Ejecting Events from Laravel
The first step was to stop writing raw events directly to the database during a web request. Instead, we dispatch a Laravel event.
In any controller where a user action occurs:app/Http/Controllers/ContentController.php
<?php
namespace App\Http\Controllers;
use App\Events\UserActionOccurred;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Auth;
class ContentController extends Controller
{
public function view(Request $request, int $contentId)
{
// ... business logic to fetch content ...
// Dispatch an event instead of writing to the DB directly.
// This is asynchronous and non-blocking for the user's request.
event(new UserActionOccurred(
'content_view',
[
'content_id' => $contentId,
'user_id' => Auth::id(),
'ip_address' => $request->ip(),
]
));
return response()->json(['status' => 'success']);
}
}
The event itself is a simple data container:app/Events/UserActionOccurred.php
<?php
namespace App\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class UserActionOccurred
{
use Dispatchable, SerializesModels;
public string $actionType;
public array $payload;
public int $timestamp;
/**
* Create a new event instance.
*
* @param string $actionType
* @param array $payload
*/
public function __construct(string $actionType, array $payload)
{
$this->actionType = $actionType;
$this->payload = $payload;
$this->timestamp = now()->getTimestamp();
}
}
A dedicated listener handles this event. Its only job is to serialize the data and push it into the Redis Stream using the XADD
command. In a real-world project, error handling for Redis connection failures is critical here; you might push to a fallback queue or log the failure for later reprocessing.
app/Listeners/PublishUserActionToStream.php
<?php
namespace App\Listeners;
use App\Events\UserActionOccurred;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
// Implementing ShouldQueue ensures this runs in the background via Laravel's queue worker.
class PublishUserActionToStream implements ShouldQueue
{
/**
* The name of the connection to use.
*
* @var string|null
*/
public $connection = 'redis';
/**
* The name of the queue the job should be sent to.
*
* @var string|null
*/
public $queue = 'listeners';
/**
* Handle the event.
*
* @param UserActionOccurred $event
* @return void
*/
public function handle(UserActionOccurred $event)
{
try {
// The key for our stream
$streamKey = 'events_stream';
// The data to be added to the stream.
// Redis streams work with flat key-value pairs.
$data = [
'type' => $event->actionType,
'payload' => json_encode($event->payload), // Serialize complex data
'timestamp' => $event->timestamp,
];
// XADD command. The '*' tells Redis to auto-generate an ID.
Redis::connection('default')->xadd($streamKey, '*', $data);
} catch (\Exception $e) {
// A common mistake is not handling Redis failures. If Redis is down,
// we should not let the job fail silently. Log it, and potentially
// re-queue with a delay.
Log::error('Failed to publish user action to Redis Stream: ' . $e->getMessage());
$this->release(30); // Release the job back onto the queue for 30 seconds
}
}
}
This simple change completely decoupled the application’s request lifecycle from the performance characteristics of our analytics backend. The user gets a fast response, and the event is safely logged in the Redis Stream for asynchronous processing.
Phase 2: The Rust Aggregator Service
This is the core of the new architecture. The Rust service is a standalone, long-running binary. Its responsibilities are:
- Connect to Redis and join a consumer group.
- Read batches of events from the stream.
- Deserialize and process these events.
- Maintain an in-memory state of aggregated data (e.g., counters).
- Periodically publish the aggregated state to a Redis Pub/Sub channel.
- Acknowledge processed messages (
XACK
) so they are not redelivered.
Here is the project structure and key code.
Cargo.toml
[package]
name = "event-aggregator"
version = "0.1.0"
edition = "2021"
[dependencies]
redis = { version = "0.23", features = ["tokio-comp"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.10"
log = "0.4"
The main application logic:src/main.rs
use std::collections::HashMap;
use std::time::Duration;
use log::{info, error, warn};
use redis::AsyncCommands;
use serde::Deserialize;
const STREAM_KEY: &str = "events_stream";
const GROUP_NAME: &str = "aggregator_group";
const CONSUMER_NAME: &str = "consumer_1";
const PUBLISH_CHANNEL: &str = "dashboard_updates";
const AGGREGATION_WINDOW_MS: u64 = 5000; // 5 seconds
// This struct must match the data structure being pushed by Laravel.
#[derive(Deserialize, Debug)]
struct UserActionEvent {
#[serde(rename = "type")]
event_type: String,
payload: String, // Keep as string, deserialize if needed
timestamp: String, // Redis returns all values as strings
}
// In-memory state for our aggregation
struct AppState {
event_counts: HashMap<String, u64>,
}
impl AppState {
fn new() -> Self {
AppState {
event_counts: HashMap::new(),
}
}
fn process_event(&mut self, event: &UserActionEvent) {
let counter = self.event_counts.entry(event.event_type.clone()).or_insert(0);
*counter += 1;
}
fn get_and_reset(&mut self) -> HashMap<String, u64> {
if self.event_counts.is_empty() {
return HashMap::new();
}
// Atomically swap the current map with a new, empty one.
// This is efficient and safe for concurrency if we had multiple tasks.
std::mem::replace(&mut self.event_counts, HashMap::new())
}
}
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
env_logger::init();
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string());
let client = redis::Client::open(redis_url)?;
let mut con = client.get_tokio_connection().await?;
let mut pub_con = client.get_tokio_connection().await?;
info!("Ensuring consumer group exists...");
// The MKSTREAM option creates the stream if it doesn't exist.
let _: () = con.xgroup_create_mkstream(STREAM_KEY, GROUP_NAME, "$").await.unwrap_or_else(|e| {
warn!("Could not create consumer group (it might already exist): {}", e);
});
let mut app_state = AppState::new();
let mut tick_interval = tokio::time::interval(Duration::from_millis(AGGREGATION_WINDOW_MS));
loop {
tokio::select! {
// This branch handles reading new events from the stream
_ = tokio::time::sleep(Duration::from_millis(10)) => { // Short sleep to prevent tight loop on no data
let read_opts = redis::streams::StreamReadOptions::default()
.group(GROUP_NAME, CONSUMER_NAME)
.count(100) // Process up to 100 messages at a time
.block(1000); // Block for up to 1 second waiting for messages
let result: Result<redis::streams::StreamReadReply, _> = con.xread_options(
&[STREAM_KEY],
&[">"], // '>' means read new, un-delivered messages
&read_opts
).await;
match result {
Ok(reply) => {
for stream_key in reply.keys {
let ids: Vec<String> = stream_key.ids.iter().map(|id| id.id.clone()).collect();
for message in stream_key.ids {
// A common pitfall is assuming the map has the fields you want.
// Production code needs more robust parsing.
let parsed_event: Result<UserActionEvent, _> = message.map.try_into();
match parsed_event {
Ok(event) => {
app_state.process_event(&event);
},
Err(e) => error!("Failed to parse message payload: {:?}. Error: {}", message.map, e),
}
}
// Acknowledge all processed messages in this batch
if !ids.is_empty() {
if let Err(e) = con.xack(STREAM_KEY, GROUP_NAME, &ids).await {
error!("Failed to XACK messages: {}", e);
// In a real project, this is a critical failure. We might
// need to stop processing to avoid re-processing messages.
}
}
}
},
Err(e) => {
error!("Error reading from stream: {}", e);
// Implement a backoff strategy before retrying connection
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
},
// This branch fires on a timer to publish aggregated data
_ = tick_interval.tick() => {
let aggregated_data = app_state.get_and_reset();
if !aggregated_data.is_empty() {
let payload = serde_json::to_string(&aggregated_data).unwrap_or_else(|_| "{}".to_string());
info!("Publishing aggregated data: {}", payload);
if let Err(e) = pub_con.publish(PUBLISH_CHANNEL, payload).await {
error!("Failed to publish to channel: {}", e);
}
}
}
}
}
}
// Custom Deserialization logic for redis::Map
// This shows how to handle the flat key-value structure from Redis.
impl TryFrom<redis::streams::StreamId> for UserActionEvent {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(value: redis::streams::StreamId) -> Result<Self, Self::Error> {
let event_type: String = value.get("type").ok_or("Missing field 'type'")?;
let payload: String = value.get("payload").ok_or("Missing field 'payload'")?;
let timestamp: String = value.get("timestamp").ok_or("Missing field 'timestamp'")?;
Ok(UserActionEvent { event_type, payload, timestamp })
}
}
This Rust service is robust. It uses Tokio’s select!
macro to concurrently wait for new stream messages and the publishing timer. It processes messages in batches and acknowledges them, ensuring at-least-once delivery. If it crashes, on restart it will automatically be fed any messages that were delivered but not yet acknowledged.
Phase 3: Frontend Real-time Updates
The final piece is delivering the aggregated data to the user’s browser. We used the laravel-websockets
package, a popular PHP implementation of a Pusher-compatible WebSocket server. It can be configured to listen to a Redis Pub/Sub channel for events.
In config/websockets.php
, you would configure an app and enable the Redis replication feature.
Then, on the frontend, our React component connects to this WebSocket server and listens for events. The magic of styled-components
comes into play when we want to provide visual feedback based on the data.
src/components/RealtimeActivityDashboard.js
import React, { useState, useEffect } from 'react';
import Pusher from 'pusher-js';
import styled, { keyframes, css } from 'styled-components';
// A subtle flash animation for when data updates
const flashAnimation = keyframes`
0% { background-color: #2a3b4d; }
50% { background-color: #3c526b; }
100% { background-color: #2a3b4d; }
`;
// A styled component for a single metric tile.
// It receives a 'justUpdated' prop to trigger the animation.
const MetricTile = styled.div`
background-color: #2a3b4d;
border-radius: 8px;
padding: 20px;
text-align: center;
color: white;
transition: background-color 0.3s ease;
${({ justUpdated }) =>
justUpdated &&
css`
animation: ${flashAnimation} 0.7s ease-in-out;
`}
`;
const MetricValue = styled.h2`
font-size: 3rem;
margin: 0;
color: #61dafb;
`;
const MetricLabel = styled.p`
font-size: 1rem;
margin-top: 5px;
color: #a0aec0;
`;
const RealtimeActivityDashboard = () => {
// State to hold the aggregated counts
const [metrics, setMetrics] = useState({
content_view: 0,
user_login: 0,
// ... other event types
});
// State to track which metrics were just updated for animations
const [updatedKeys, setUpdatedKeys] = useState(new Set());
useEffect(() => {
// A common mistake is to put credentials directly here.
// Use environment variables.
const pusher = new Pusher(process.env.REACT_APP_PUSHER_KEY, {
wsHost: process.env.REACT_APP_WS_HOST,
wsPort: process.env.REACT_APP_WS_PORT,
forceTLS: false,
encrypted: true,
disableStats: true,
enabledTransports: ['ws', 'wss'],
});
// Subscribe to the channel our Rust service publishes to
const channel = pusher.subscribe('dashboard_updates');
// Bind to the event. The event name is the channel name itself
// when using raw Redis Pub/Sub with laravel-websockets.
channel.bind('dashboard_updates', (data) => {
// The data from Rust is a JSON string of the aggregated map
const newAggregates = JSON.parse(data);
const justUpdated = new Set(Object.keys(newAggregates));
setMetrics(prevMetrics => {
const newMetrics = { ...prevMetrics };
for (const [key, value] of Object.entries(newAggregates)) {
// Add the new counts to our existing totals
newMetrics[key] = (newMetrics[key] || 0) + value;
}
return newMetrics;
});
setUpdatedKeys(justUpdated);
// Clear the 'justUpdated' status after the animation duration
setTimeout(() => {
setUpdatedKeys(new Set());
}, 700);
});
return () => {
channel.unbind_all();
pusher.unsubscribe('dashboard_updates');
};
}, []);
return (
<div style={{ display: 'grid', gridTemplateColumns: 'repeat(3, 1fr)', gap: '20px' }}>
{Object.entries(metrics).map(([key, value]) => (
<MetricTile key={key} justUpdated={updatedKeys.has(key)}>
<MetricValue>{value.toLocaleString()}</MetricValue>
<MetricLabel>{key.replace('_', ' ').toUpperCase()}</MetricLabel>
</MetricTile>
))}
</div>
);
};
export default RealtimeActivityDashboard;
This solution successfully offloaded the entire real-time aggregation workload from our PHP monolith to a purpose-built, hyper-efficient Rust service. The database load from this feature dropped to zero, server CPU usage stabilized, and the dashboard became truly real-time. The use of Redis Streams provided the necessary durability to make the solution production-ready.
The key limitation of this current implementation is that the Rust aggregator’s state is entirely in-memory. If the service is restarted, the counts for the current aggregation window are lost. While our 5-second window makes this a minor issue, for longer windows this would be unacceptable. A future iteration could involve periodically snapshotting the AppState
hashmap back to a Redis Hash. Another area for improvement is scaling; the current setup uses a single consumer. By leveraging the power of Redis consumer groups, we could run multiple instances of the Rust service, each processing a subset of the stream, though this would require a more complex state-sharing strategy (e.g., using Redis for storing the counters as well). Finally, backpressure is not explicitly handled. If the Rust service cannot keep up with the rate of incoming events, the pending message list in the consumer group will grow, consuming Redis memory. Monitoring the stream and consumer group lag is a critical operational requirement.