Our on-call rotation was alerted to a complete logging pipeline failure. Kibana was unresponsive, and our Elasticsearch cluster was under extreme load. The root cause was not a backend service failure, but a seemingly innocuous front-end deployment. A bug in a new UI component was triggering thousands of client-side JavaScript errors per minute for every active user, flooding our ingestion endpoint. Our existing window.onerror
handler was dutifully sending every single error, creating a denial-of-service attack against our own infrastructure. This incident exposed a critical gap: we had no resilient mechanism for collecting, batching, and intelligently filtering high-volume client-side telemetry.
The post-mortem defined a clear set of requirements for a new system. It needed to:
- Reliably capture client-side logs (errors, performance metrics, user interactions) without blocking the main thread.
- Persist and batch logs, surviving page navigations and brief network outages.
- Provide a secure, scalable ingestion endpoint within our existing backend infrastructure.
- Implement an intelligent, stateful filtering layer to de-duplicate, sample, and rate-limit logs before they hit our primary data store, preventing future overloads.
Our initial architecture concept was a multi-stage pipeline. A Service Worker on the client would handle capture and persistence. A dedicated Laravel endpoint would act as a secure beacon. Fluentd would handle aggregation. The real challenge was the intelligent filtering layer. A static Fluentd configuration wouldn’t suffice; we needed dynamic logic. This led us to the filter_lua
plugin, a decision that proved central to the success of the project. It provided the performance and flexibility needed to implement stateful filtering directly within the data stream.
Stage 1: The Service Worker as a Resilient Log Collector
The first step was to build a robust client-side collector. A Service Worker is the ideal candidate due to its separate execution thread and persistent nature. We decided against navigator.sendBeacon
because it offers a “fire-and-forget” mechanism with no guarantee of delivery or control over batching. A Service Worker allows us to implement a sophisticated queueing system using IndexedDB.
The client-side application communicates with the Service Worker via postMessage
. The worker then pushes these log entries into an IndexedDB object store. This ensures that even if the user closes the tab immediately after an action, the log data is persisted.
public/service-worker.js
// A simple wrapper for IndexedDB operations using promises.
const DB_NAME = 'ClientLogsDB';
const STORE_NAME = 'logQueue';
const DB_VERSION = 1;
function openDB() {
return new Promise((resolve, reject) => {
const request = indexedDB.open(DB_NAME, DB_VERSION);
request.onerror = () => reject("Error opening DB.");
request.onsuccess = () => resolve(request.result);
request.onupgradeneeded = event => {
const db = event.target.result;
db.createObjectStore(STORE_NAME, { autoIncrement: true });
};
});
}
async function addToQueue(logEntry) {
const db = await openDB();
return new Promise((resolve, reject) => {
const transaction = db.transaction(STORE_NAME, 'readwrite');
const store = transaction.objectStore(STORE_NAME);
const request = store.add(logEntry);
request.onsuccess = () => resolve();
request.onerror = () => reject("Failed to add log to queue.");
});
}
async function getQueuedLogs(limit = 50) {
const db = await openDB();
return new Promise((resolve, reject) => {
const transaction = db.transaction(STORE_NAME, 'readwrite');
const store = transaction.objectStore(STORE_NAME);
const logs = [];
const keys = [];
// Using a cursor to iterate and collect keys for deletion
const cursorRequest = store.openCursor();
cursorRequest.onsuccess = event => {
const cursor = event.target.result;
if (cursor && logs.length < limit) {
logs.push(cursor.value);
keys.push(cursor.primaryKey);
cursor.continue();
} else {
// Once we have the logs, delete them from the queue
Promise.all(keys.map(key => {
return new Promise((res, rej) => {
const deleteRequest = store.delete(key);
deleteRequest.onsuccess = () => res();
deleteRequest.onerror = () => rej("Failed to delete log from queue.");
});
})).then(() => resolve(logs)).catch(reject);
}
};
cursorRequest.onerror = () => reject("Failed to open cursor.");
});
}
// Listen for messages from the client application.
self.addEventListener('message', event => {
if (event.data && event.data.type === 'QUEUE_LOG') {
event.waitUntil(addToQueue(event.data.payload));
}
});
const LOG_ENDPOINT = '/api/log/ingest';
const BATCH_INTERVAL = 15000; // Send logs every 15 seconds.
async function sendLogs() {
try {
const logs = await getQueuedLogs();
if (logs.length === 0) {
return;
}
const response = await fetch(LOG_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
// In a real-world project, this token should be managed securely.
'X-Client-Log-Token': 'your-secret-token-here'
},
body: JSON.stringify({ events: logs }),
// keepalive is crucial for requests that might outlive the page.
keepalive: true,
});
if (!response.ok) {
// A common mistake is not handling failed sends.
// If the server rejects the logs, we should re-queue them.
// For simplicity here, we log an error. A production system would
// need a more robust retry mechanism with backoff.
console.error('Failed to send logs to server, status:', response.status);
// Re-queue failed logs
await Promise.all(logs.map(log => addToQueue(log)));
}
} catch (error) {
console.error('Error sending logs:', error);
// On network error, logs are implicitly not deleted and will be retried on the next interval.
}
}
// Set up a periodic sync to send logs.
setInterval(sendLogs, BATCH_INTERVAL);
self.addEventListener('install', event => {
self.skipWaiting();
});
self.addEventListener('activate', event => {
event.waitUntil(self.clients.claim());
});
This implementation uses IndexedDB as a reliable buffer. A periodic timer attempts to flush the queue. A critical part is the transaction logic in getQueuedLogs
: it fetches a batch of logs and their keys, then initiates their deletion. If the network request fails, the deletion doesn’t happen, ensuring logs are not lost.
Stage 2: The Laravel Ingestion Beacon
With the client-side collector in place, we needed a backend endpoint to receive the batched data. Since our stack is Laravel, creating a dedicated controller and route was straightforward. The key was to decouple log processing from the request-response cycle. The controller’s only job should be to validate the incoming data and hand it off to Laravel’s logging system as quickly as possible.
We configured a custom logging channel in Laravel specifically for this client-side telemetry. This channel uses the fluent-logger-php
library to forward logs directly to our Fluentd instance over TCP.
config/logging.php
<?php
use Monolog\Handler\FluentdHandler;
use Monolog\Formatter\JsonFormatter;
return [
// ... other channels
'channels' => [
// ...
'fluentd_client' => [
'driver' => 'custom',
'via' => \App\Logging\FluentLoggerFactory::class,
'host' => env('FLUENTD_HOST', '127.0.0.1'),
'port' => env('FLUENTD_PORT', 24224),
'level' => 'debug', // We ingest everything and let Fluentd do the filtering.
'tag' => 'client.events',
],
],
];
app/Logging/FluentLoggerFactory.php
<?php
namespace App\Logging;
use Monolog\Logger;
use Monolog\Handler\FluentdHandler;
use Monolog\Formatter\JsonFormatter;
class FluentLoggerFactory
{
/**
* Create a custom Monolog instance.
*
* @param array $config
* @return \Monolog\Logger
*/
public function __invoke(array $config)
{
// The handler that sends data to Fluentd.
$handler = new FluentdHandler($config['host'], $config['port']);
// A pitfall is sending unstructured data. We enforce JSON formatting
// so that Fluentd can easily parse it.
$handler->setFormatter(new JsonFormatter());
return new Logger('fluentd_client', [$handler]);
}
}
routes/api.php
<?php
use Illuminate\Support\Facades\Route;
use App\Http\Controllers\Api\LogIngestionController;
Route::post('/log/ingest', [LogIngestionController::class, 'ingest'])
->middleware('auth.client_log'); // A dedicated middleware for authentication.
app/Http/Middleware/AuthenticateClientLogger.php
<?php
namespace App\Http\Middleware;
use Closure;
use Illuminate\Http\Request;
class AuthenticateClientLogger
{
public function handle(Request $request, Closure $next)
{
// This is a very basic token check. In production, this should be
// something more secure and stored properly, not hardcoded.
if ($request->header('X-Client-Log-Token') !== 'your-secret-token-here') {
return response()->json(['message' => 'Unauthorized'], 401);
}
return $next($request);
}
}
app/Http/Controllers/Api/LogIngestionController.php
<?php
namespace App\Http\Controllers\Api;
use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Validator;
use Illuminate\Validation\Rule;
class LogIngestionController extends Controller
{
public function ingest(Request $request)
{
$validator = Validator::make($request->all(), [
'events' => ['required', 'array', 'max:100'], // Limit batch size.
'events.*.level' => ['required', 'string', Rule::in(['info', 'warn', 'error', 'fatal', 'debug'])],
'events.*.message' => ['required', 'string', 'max:2048'],
'events.*.timestamp' => ['required', 'integer'],
'events.*.context' => ['sometimes', 'array'],
]);
if ($validator->fails()) {
return response()->json(['errors' => $validator->errors()], 422);
}
$events = $request->input('events');
foreach ($events as $event) {
// The controller's job is to dispatch, not process.
// We hand off each event to our dedicated 'fluentd_client' channel.
Log::channel('fluentd_client')->log(
$event['level'],
$event['message'],
// We wrap the original payload in a 'client' context
// to clearly namespace it in the final log record.
['client' => $event]
);
}
return response()->json(['status' => 'ok'], 202);
}
}
This setup provides a validated, authenticated, and non-blocking endpoint. Any request that passes validation immediately forwards its payload to Fluentd via the dedicated channel. By returning a 202 Accepted
status, we signal to the client that the logs have been received for processing, even though they haven’t been finalized yet.
Stage 3: Fluentd and the Lua Filtering Engine
This is where the system’s intelligence resides. Fluentd receives a stream of structured JSON logs from Laravel. Our goal is to filter this stream before it reaches the expensive storage backend.
The fluent.conf
is structured with three key parts:
- A
source
block to listen for forwarded logs from Laravel. - A
filter
block that invokes our Lua script. - A
match
block to route the filtered logs to their final destination.
fluentd/fluent.conf
# 1. Input Source: Listen for logs from Laravel
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# 2. Filter: The core logic using Lua
<filter client.events>
@type lua
# Path to the Lua script file within the Fluentd container/environment
script /fluentd/etc/scripts/filter.lua
# The function within the script to call for each record
call filter_record
</filter>
# 3. Match: Output the filtered logs. For demonstration, we use stdout.
# In production, this would be an elasticsearch, s3, or other output plugin.
<match client.events>
@type stdout
</match>
The real work happens in filter.lua
. This script inspects every log record and decides whether to drop it, modify it, or let it pass. We implemented two key features: de-duplication of recent, identical error messages and dynamic sampling based on log level.
fluentd/scripts/filter.lua
-- filter.lua: Stateful, intelligent log filtering script for Fluentd
-- This table will hold the state for our filter. It's crucial to understand
-- that this state is local to the Fluentd process. It is NOT shared across
-- multiple Fluentd workers or nodes.
local state = {
-- Cache for tracking recently seen error messages to de-duplicate them.
-- key: hash of the error message
-- value: { count = <number>, expiry = <timestamp> }
error_cache = {},
-- Counter for sampling INFO level logs.
info_counter = 0
}
-- A simple hashing function for message strings.
-- In a real-world project, a more robust hashing algorithm like murmur3 might be better.
local function simple_hash(str)
local hash = 5381
for i = 1, #str do
hash = hash * 33 + string.byte(str, i)
end
return hash
end
-- Function to clean up expired entries from the state cache to prevent memory leaks.
-- A common pitfall is letting state grow indefinitely. This is a simple TTL mechanism.
local function cleanup_cache(current_time)
local expiry_threshold = current_time - 300 -- 5 minutes
for hash, record in pairs(state.error_cache) do
if record.expiry < expiry_threshold then
state.error_cache[hash] = nil
end
end
end
-- This is the main function called by Fluentd for each log record.
-- It must return a modified (or original) record, or nil to drop the record.
function filter_record(tag, timestamp, record)
-- Lua scripts in Fluentd receive the record as a Lua table.
-- Our Laravel logger wraps the original event inside a 'client' key.
local client_data = record.client
if client_data == nil then
-- If the record doesn't have the expected structure, let it pass.
return timestamp, record
end
local level = client_data.level
local message = client_data.message
local current_time = os.time()
-- Run cache cleanup periodically. A simple way is to check on every Nth record.
if math.random(1, 100) == 1 then
cleanup_cache(current_time)
end
-- Rule 1: De-duplicate identical error/fatal messages within a 5-minute window.
if (level == 'error' or level == 'fatal') and message then
local msg_hash = simple_hash(message)
if state.error_cache[msg_hash] then
-- We've seen this exact error recently. Increment counter and drop the log.
state.error_cache[msg_hash].count = state.error_cache[msg_hash].count + 1
state.error_cache[msg_hash].expiry = current_time + 300 -- Extend expiry
return nil -- Drop the record
else
-- First time seeing this error. Log it and add to cache.
state.error_cache[msg_hash] = { count = 1, expiry = current_time + 300 }
-- Add a new field to the log indicating it's the first occurrence.
record.filter_action = 'dedupe_first_occurrence'
end
end
-- Rule 2: Dynamic sampling for 'info' level logs.
-- We only want to keep 10% of info logs to reduce volume.
if level == 'info' then
state.info_counter = state.info_counter + 1
if state.info_counter % 10 ~= 0 then
return nil -- Drop 9 out of 10 info logs.
end
record.filter_action = 'sampled_info'
end
-- Rule 3: Data Enrichment.
-- Let's parse the user agent string if it exists.
if client_data.context and client_data.context.userAgent then
local ua = client_data.context.userAgent
local browser, version = ua:match("(Firefox)/([%d%.]+)")
if not browser then browser, version = ua:match("(Chrome)/([%d%.]+)") end
if not browser then browser, version = ua:match("(Safari)/([%d%.]+)") end
if browser then
record.client_browser = {
name = browser,
version = version
}
end
end
-- If we've reached this point, the record is allowed to pass.
return timestamp, record
end
This Lua script is the heart of the solution. It maintains an in-memory state
table to track recent errors. When an error
or fatal
log arrives, it hashes the message. If the hash exists in the cache, the log is dropped. If not, it’s added to the cache with a 5-minute TTL. This effectively transforms a flood of 10,000 identical errors into a single log entry. We also implemented a simple counter-based sampling for noisy info
logs and an enrichment step to parse the user agent.
The entire architecture can be visualized as follows:
graph TD subgraph Browser A[Client Application] -- postMessage --> B{Service Worker}; B -- Batched POST --> D[Laravel Beacon Endpoint]; B -- Persist to --> C[(IndexedDB)]; end subgraph Backend D -- Forwards logs via TCP --> E{Fluentd}; E -- Applies Lua script --> F[Stateful Filter]; F -- Drops/Samples/Enriches --> E; E -- Sends filtered logs --> G[(Elasticsearch / Data Store)]; end style B fill:#f9f,stroke:#333,stroke-width:2px style F fill:#bbf,stroke:#333,stroke-width:2px
This system successfully weathered our next front-end incident. A faulty analytics event was firing excessively. We saw a massive influx of logs at the Laravel beacon, but the Lua filter correctly sampled the info
-level events, dropping 90% of them. The de-duplication filter caught a cascade of related errors. Our Elasticsearch cluster remained stable, and we were able to diagnose the issue using the trickle of sampled logs that made it through, without being overwhelmed.
The solution is not without its limitations. The de-duplication state in the Lua script is local to a single Fluentd process. If we were running a horizontally-scaled cluster of Fluentd nodes behind a load balancer, each instance would have its own independent cache. A globally consistent de-duplication would require an external state store like Redis, which introduces another dependency and point of failure. Furthermore, the filter logic is currently static within the Lua file; a more sophisticated implementation might involve having the Lua script periodically pull its rules (e.g., sampling rates, de-duplication windows) from a central configuration service like Consul or etcd, allowing for dynamic tuning of the pipeline without requiring a restart.