The operational challenge was straightforward: terabytes of semi-structured application logs were being dumped into HDFS daily, and our support engineering team needed to query this data with sub-second latency. The existing process involved running Hive queries that took anywhere from 15 minutes to several hours, rendering any interactive investigation impossible. The goal was to build a system that could provide a rich, faceted search experience over this data with a latency SLO of under 500ms for p95 queries. This is the log of how that system was built, the trade-offs made, and the validation framework that ensured it stayed reliable.
The Initial Architectural Blueprint and Technology Rationale
The firehose of log data already landed in HDFS, managed by a mature Hadoop cluster. Moving this data was a non-starter due to organizational constraints. Therefore, the architecture had to originate from Hadoop.
Indexing Engine: SolrCloud. The choice between Elasticsearch and Solr was debated. We chose SolrCloud for two pragmatic reasons. First, our operations team had deep expertise in managing Apache ZooKeeper, a hard dependency for SolrCloud, which lowered the operational barrier. Second, Solr’s more rigid schema definition (
managed-schema
) was seen as a feature, not a bug, enforcing discipline on the log data structure from the outset. In a real-world project, this kind of constraint prevents downstream chaos.Indexing Pipeline: Hadoop MapReduce. While Spark is more modern, the specific YARN queues allocated for this project were optimized for traditional MapReduce jobs. A custom Java MapReduce job provided direct, low-level control over data parsing, transformation into
SolrInputDocument
objects, and batch submission to the SolrCloud cluster. It was a less glamorous but more resource-efficient choice for our specific environment.Frontend Interface: React with Shadcn UI. The user-facing component needed to be built quickly but remain maintainable. We explicitly avoided heavy-handed component libraries. Shadcn UI, being a collection of unstyled, composable primitives built on Radix UI and styled with Tailwind CSS, was the perfect fit. It gave us the building blocks (data tables, inputs, dropdowns) without imposing an opinionated design system, allowing for rapid development of a clean, functional internal tool.
Validation Framework: Playwright. Simple API health checks were insufficient. We needed to validate the entire user journey, from typing a query to seeing rendered results, and measure the end-to-end latency. Playwright was chosen over Selenium or Cypress for its superior performance, modern API, and, critically, its built-in tracing and network interception capabilities, which are essential for diagnosing performance bottlenecks across the stack.
Here is the high-level data flow we settled on:
graph TD A[Log Files in HDFS] --> B{Hadoop MapReduce Job}; B --> C[SolrInputDocument Batches]; C --> D[SolrCloud Cluster]; D <--> E{Node.js API Gateway}; E <--> F[React Frontend w/ Shadcn UI]; G[Playwright E2E Tests] --> F; G --> E; subgraph "Data Ingestion (Batch)" A B C end subgraph "Search & Query" D E F end subgraph "Validation & CI" G end
The Core: Hadoop MapReduce Indexing Job
The heart of the ingestion pipeline is a MapReduce job that reads raw log files, parses them, and indexes them into Solr. A common mistake here is to perform individual document puts to Solr from within the mappers or reducers, which creates immense network overhead and hammers the Solr update handlers. The correct approach is to batch documents.
The job is structured with a Mapper that parses lines and a Reducer that collects documents for a given shard and sends them in batches. We use the SolrJ library for communication.
Project pom.xml
Dependencies:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>8.11.2</version>
</dependency>
<!-- For logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
</dependency>
</dependencies>
The Mapper: It simply parses each log line (assuming a simple JSON format for this example) and emits the Solr shard URL as the key and the SolrInputDocument
as the value. This ensures all documents destined for the same shard go to the same reducer.
// src/main/java/com/company/hadoop/solr/LogIndexerMapper.java
package com.company.hadoop.solr;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.solr.common.SolrInputDocument;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.UUID;
public class LogIndexerMapper extends Mapper<Object, Text, Text, SolrInputDocumentWritable> {
private static final Logger LOG = LoggerFactory.getLogger(LogIndexerMapper.class);
private Text shardUrlKey = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
try {
JSONObject logEntry = new JSONObject(value.toString());
String level = logEntry.optString("level", "INFO");
String message = logEntry.getString("message");
String timestamp = logEntry.getString("timestamp");
String loggerName = logEntry.optString("logger_name", "unknown");
// Basic data validation and transformation
Instant parsedTimestamp;
try {
parsedTimestamp = Instant.parse(timestamp);
} catch (DateTimeParseException e) {
// If the timestamp is invalid, we skip this record.
context.getCounter("ParseErrors", "InvalidTimestamp").increment(1);
return;
}
SolrInputDocument doc = new SolrInputDocument();
// A common pitfall is not providing a unique ID. Solr will overwrite documents with the same ID.
doc.addField("id", UUID.randomUUID().toString());
doc.addField("level_s", level.toUpperCase()); // Using dynamic field suffix "_s" for string
doc.addField("message_t", message); // "_t" for tokenized text
doc.addField("timestamp_dt", parsedTimestamp.toString()); // "_dt" for date
doc.addField("logger_name_s", loggerName);
// This is a simplified sharding strategy. In a real-world project, this logic
// would be more sophisticated, possibly involving a custom sharding field.
// Here, we just round-robin based on hash code.
String[] solrUrls = context.getConfiguration().getStrings("solr.urls");
int shardIndex = (doc.getFieldValue("id").hashCode() & Integer.MAX_VALUE) % solrUrls.length;
shardUrlKey.set(solrUrls[shardIndex]);
context.write(shardUrlKey, new SolrInputDocumentWritable(doc));
} catch (Exception e) {
LOG.error("Failed to parse log line: " + value.toString(), e);
context.getCounter("ParseErrors", "MalformedJSON").increment(1);
}
}
}
Note: SolrInputDocumentWritable
is a custom Writable
wrapper around SolrInputDocument
since it’s not natively serializable by Hadoop.
The Reducer: This is where the batching happens. It collects documents for its assigned shard and periodically flushes them to Solr.
// src/main/java/com/company/hadoop/solr/LogIndexerReducer.java
package com.company.hadoop.solr;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class LogIndexerReducer extends Reducer<Text, SolrInputDocumentWritable, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(LogIndexerReducer.class);
private int batchSize;
private Http2SolrClient solrClient;
private List<SolrInputDocument> docBatch;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// The batch size is critical for performance tuning. Too small = network overhead.
// Too large = high memory pressure on the reducer task.
this.batchSize = context.getConfiguration().getInt("solr.batch.size", 1000);
this.docBatch = new ArrayList<>(batchSize);
// A single reducer connects to a single Solr node URL passed as the key.
String solrUrl = context.getCurrentKey().toString();
this.solrClient = new Http2SolrClient.Builder(solrUrl).build();
}
@Override
protected void reduce(Text key, Iterable<SolrInputDocumentWritable> values, Context context)
throws IOException, InterruptedException {
for (SolrInputDocumentWritable writable : values) {
docBatch.add(writable.get());
if (docBatch.size() >= batchSize) {
flushBatch(context);
}
}
}
private void flushBatch(Context context) {
if (docBatch.isEmpty()) {
return;
}
try {
// Using a collection add is more efficient than individual adds.
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.add(docBatch);
solrClient.request(updateRequest);
context.getCounter("Solr", "DocumentsIndexed").increment(docBatch.size());
LOG.info("Flushed batch of {} documents to Solr.", docBatch.size());
} catch (Exception e) {
LOG.error("Failed to flush batch to Solr", e);
context.getCounter("Solr", "IndexingFailures").increment(docBatch.size());
} finally {
docBatch.clear();
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// The final flush is crucial to index any remaining documents.
flushBatch(context);
// Final commit to make documents searchable. In production, this is often handled
// by Solr's auto-commit settings to avoid hard commits from clients.
try {
solrClient.commit();
} catch (Exception e) {
LOG.error("Final commit to Solr failed.", e);
}
solrClient.close();
}
}
This MapReduce job is then packaged into a JAR and executed via a shell script, passing the Solr URLs and batch size as configuration properties.
The API and Frontend Implementation
The frontend doesn’t talk to Solr directly. A simple Node.js API gateway sits in between to handle authentication, simplify query logic, and prevent complex Solr query syntax from leaking into the client.
Node.js API Gateway Snippet (server.js
):
const express = require('express');
const { createClient } = require('solr-client');
const cors = require('cors');
const app = express();
const port = 3001;
// The client should be configured to talk to the Zookeeper ensemble
// so it's aware of the entire SolrCloud cluster topology.
const solrClient = createClient({
host: 'zookeeper1.example.com:2181,zookeeper2.example.com:2181',
zkRoot: '/solr', // The Zookeeper chroot path for Solr
collection: 'logs_collection',
core: 'logs_collection'
});
app.use(cors());
app.use(express.json());
app.get('/api/search', async (req, res) => {
const { q = '*:*', page = 0, facets = '' } = req.query;
const rows = 50;
const start = parseInt(page) * rows;
try {
const query = solrClient.createQuery()
.q(q)
.start(start)
.rows(rows);
// Faceting is a core requirement for this kind of tool.
if (facets) {
query.facet({
field: facets.split(','),
mincount: 1
});
}
const result = await solrClient.search(query);
res.json(result);
} catch (error) {
console.error('Solr query failed:', error);
res.status(500).json({ error: 'Failed to execute search query' });
}
});
app.listen(port, () => {
console.log(`API Gateway listening on port ${port}`);
});
React Frontend with Shadcn UI:
The frontend uses React with hooks to manage state. We pull in Shadcn components via its CLI. The core component orchestrates the search input, facet selection, and results display.
// src/components/SearchDashboard.tsx
import React, { useState, useEffect, useCallback } from 'react';
import { Input } from '@/components/ui/input';
import { DataTable } from '@/components/ui/data-table'; // Assuming a custom data table component
import { Checkbox } from '@/components/ui/checkbox';
import { useDebounce } from '@/hooks/useDebounce'; // Custom debounce hook
// Define columns for the DataTable
const columns = [
{ accessorKey: 'timestamp_dt', header: 'Timestamp' },
{ accessorKey: 'level_s', header: 'Level' },
{ accessorKey: 'message_t', header: 'Message' },
];
export function SearchDashboard() {
const [query, setQuery] = useState('*:*');
const [results, setResults] = useState([]);
const [facets, setFacets] = useState({});
const [selectedFacets, setSelectedFacets] = useState({});
const [loading, setLoading] = useState(false);
const debouncedQuery = useDebounce(query, 500); // Debouncing input is critical for performance.
const fetchData = useCallback(async () => {
setLoading(true);
let solrQuery = debouncedQuery || '*:*';
// Construct filter query (fq) from selected facets
const fq = Object.entries(selectedFacets)
.filter(([, values]) => values.length > 0)
.map(([field, values]) => `${field}:("${values.join('" OR "')}")`)
.join(' AND ');
if (fq) {
solrQuery = `(${solrQuery}) AND ${fq}`;
}
try {
const response = await fetch(`http://localhost:3001/api/search?q=${encodeURIComponent(solrQuery)}&facets=level_s`);
const data = await response.json();
setResults(data.response.docs);
// Process Solr facet format into a more usable structure
const facetData = {};
const facetFields = data.facet_counts?.facet_fields?.level_s || [];
for (let i = 0; i < facetFields.length; i += 2) {
facetData[facetFields[i]] = facetFields[i + 1];
}
setFacets(facetData);
} catch (error) {
console.error('Failed to fetch search results:', error);
} finally {
setLoading(false);
}
}, [debouncedQuery, selectedFacets]);
useEffect(() => {
fetchData();
}, [fetchData]);
// Handler for facet checkbox changes
const handleFacetChange = (field, value) => {
setSelectedFacets(prev => {
const currentValues = prev[field] || [];
const newValues = currentValues.includes(value)
? currentValues.filter(v => v !== value)
: [...currentValues, value];
return { ...prev, [field]: newValues };
});
};
return (
<div className="container mx-auto p-4">
<Input
placeholder="Search logs..."
value={query}
onChange={(e) => setQuery(e.target.value)}
className="mb-4"
/>
<div className="flex">
<div className="w-1/4 pr-4">
<h3 className="font-bold mb-2">Filter by Level</h3>
{Object.entries(facets).map(([level, count]) => (
<div key={level} className="flex items-center space-x-2 mb-1">
<Checkbox
id={level}
onCheckedChange={() => handleFacetChange('level_s', level)}
/>
<label htmlFor={level}>{level} ({count})</label>
</div>
))}
</div>
<div className="w-3/4">
{loading ? <p>Loading...</p> : <DataTable columns={columns} data={results} />}
</div>
</div>
</div>
);
}
End-to-End Validation with Playwright
This is the final, crucial piece. We need to programmatically verify that the entire system works and meets its performance SLO. A Playwright script is added to the CI/CD pipeline.
The script performs two key tests: one for functional correctness and one for performance measurement. The pitfall here is writing brittle tests. We use data attributes (data-testid
) for resilient selectors.
tests/search.spec.ts
:
import { test, expect, Page } from '@playwright/test';
const BASE_URL = 'http://localhost:3000';
test.describe('Log Search Dashboard', () => {
test('should perform a basic search and display results', async ({ page }) => {
await page.goto(BASE_URL);
// Wait for initial data to load
await expect(page.locator('table > tbody > tr')).toHaveCount(50);
await page.locator('input[placeholder="Search logs..."]').fill('level_s:ERROR');
await page.waitForResponse('**/api/search**');
// Assert that all visible rows now contain the word 'ERROR' in the level column
const rows = await page.locator('table > tbody > tr').all();
for (const row of rows) {
const levelCell = row.locator('td').nth(1);
await expect(levelCell).toHaveText('ERROR');
}
});
test('should validate API response time SLO', async ({ page }) => {
await page.goto(BASE_URL);
const apiSearchPromise = page.waitForResponse(
response => response.url().includes('/api/search') && response.status() === 200
);
const startTime = performance.now();
await page.locator('input[placeholder="Search logs..."]').fill('message_t:failure');
const response = await apiSearchPromise;
const endTime = performance.now();
const duration = endTime - startTime;
console.log(`E2E API response time for "failure" query: ${duration.toFixed(2)} ms`);
// The critical SLO check. If the API takes too long from the client's perspective,
// the build fails.
expect(duration).toBeLessThan(500);
// Also check Solr's own reported query time (QTime)
const responseBody = await response.json();
const qTime = responseBody.responseHeader.QTime;
console.log(`Solr internal QTime: ${qTime} ms`);
expect(qTime).toBeLessThan(250); // The backend itself should be much faster.
});
test('should apply a facet and verify filtered results', async ({ page }) => {
await page.goto(BASE_URL);
await page.waitForResponse('**/api/search**');
// Check the facet for 'WARN'
await page.locator('label:has-text("WARN")').click();
await page.waitForResponse('**/api/search**');
const rows = await page.locator('table > tbody > tr').all();
expect(rows.length).toBeGreaterThan(0); // Assuming there are WARN logs
for (const row of rows) {
await expect(row.locator('td').nth(1)).toHaveText('WARN');
}
// Uncheck the facet and verify it returns to a mixed state
await page.locator('label:has-text("WARN")').click();
await page.waitForResponse('**/api/search**');
const firstRowLevel = await page.locator('table > tbody > tr:first-child > td').nth(1).textContent();
// This is a probabilistic check, but it's likely the first result is not WARN anymore
expect(firstRowLevel).not.toBe('WARN');
});
});
This test suite, run on every commit, provides a high degree of confidence that a change in the frontend code, the API, or even a re-indexing in Solr hasn’t broken the user experience or violated our performance contract.
Lingering Issues and Future Work
The current system meets the requirements, but it’s not without its limitations. The MapReduce indexing job is a batch process, meaning there is an inherent data latency of several hours, determined by the job’s execution frequency. For true real-time log analysis, this pipeline would need to be augmented or replaced with a streaming ingestion path using technologies like Apache Flink or Spark Streaming to consume from Kafka and index into Solr continuously.
Furthermore, the Playwright performance tests, while valuable, run against a staging environment with synthetic data. The next level of maturity would involve building a framework to replay anonymized production traffic against this system to uncover performance bottlenecks under realistic load patterns and query complexity. The operational overhead of managing a SolrCloud and ZooKeeper cluster remains significant; a future cost-benefit analysis may warrant exploring managed search services, though the data gravity within our on-premise Hadoop ecosystem makes that a complex proposition.