The core challenge is a fundamental dichotomy in system requirements: a data ingestion process that is inherently slow, resource-intensive, and unpredictable, coupled with a user-facing query interface that must be instantaneous, reliable, and available offline. A monolithic design, where a user request directly triggers a web scraping and AI processing job, is a non-starter in any production environment. The user would face unacceptable latency, and the system would crumble under concurrent requests as long-running processes tie up web server resources.
Consider a typical synchronous flow:
- User submits a URL to be analyzed.
- API endpoint receives the request.
- The API handler launches a Puppeteer instance, navigates to the URL, and extracts raw text content. This can take anywhere from 5 to 30 seconds.
- The extracted text is then sent to an LLM via LangChain for summarization or structured data extraction. This adds another 5 to 20 seconds.
- The result is stored in a database.
- A success response is sent back to the user.
The cumulative latency makes this approach untenable for any interactive application. Furthermore, this tight coupling means a failure in any step—a Puppeteer timeout, a network issue, an LLM API error—causes the entire user request to fail. The system is brittle and offers a poor user experience.
An alternative is to offload the job and have the client poll for results. This is a marginal improvement but introduces client-side complexity and still doesn’t address the core architectural coupling. The web server is still involved in orchestrating the long-running task, even if asynchronously. In a real-world project, this leads to state management headaches and scaling bottlenecks. The fundamental mistake is attempting to handle two vastly different workloads—a high-latency, write-heavy command and a low-latency, read-heavy query—within the same logical flow and infrastructure.
The solution lies in completely segregating these concerns. This is not just about asynchronous processing; it’s about a formal architectural separation. Command Query Responsibility Segregation (CQRS) provides the blueprint. We will design a system where “Commands” (requests to perform an action, like analyzing a URL) are handled by one dedicated path, and “Queries” (requests to retrieve data) are handled by another, highly optimized path. The two sides will be connected by an asynchronous messaging backbone, achieving eventual consistency.
This leads us to a robust architecture:
- Command Side: An API endpoint accepts a URL analysis command. It does minimal validation and immediately publishes a message to a queue. It returns a
202 Accepted
response to the client instantly. A separate, scalable pool of worker processes consumes these messages, executing the long-running Puppeteer and LangChain tasks. - Query Side: The results of the command processing are written to a denormalized read model, specifically optimized for fast lookups. A separate, lightweight API provides read-only access to this data.
- Client-Side Resilience: The client application uses a Service Worker to aggressively cache the query results. This provides an instantaneous UI and full offline capability for previously viewed data, leveraging the
stale-while-revalidate
pattern.
The architectural decision is clear. We trade the simplicity of a monolithic model and the guarantee of strong consistency for massive gains in scalability, resilience, and user-perceived performance.
graph TD subgraph Client UI[User Interface] SW[Service Worker] end subgraph API Server CmdAPI[Command API Endpoint] QueryAPI[Query API Endpoint] end subgraph Backend Infrastructure MQ[Message Queue e.g., Redis Streams] Workers[Puppeteer & LangChain Workers] WriteDB[(Write Database - optional)] ReadDB[(Read Database - Optimized for Queries)] end UI -- Command (Analyze URL) --> CmdAPI CmdAPI -- Places Job --> MQ CmdAPI -- 202 Accepted --> UI MQ -- Distributes Job --> Workers Workers -- Scrapes with Puppeteer --> External_Website[External Website] Workers -- Processes with LangChain --> LLM_API[LLM API] Workers -- Persists Result --> ReadDB UI -- Fetch Data --> SW SW -- Cache Hit / Network Request --> QueryAPI QueryAPI -- Reads from --> ReadDB QueryAPI -- Returns Data --> SW SW -- Serves Data --> UI style Workers fill:#f9f,stroke:#333,stroke-width:2px style MQ fill:#ccf,stroke:#333,stroke-width:2px
This model ensures that the user-facing components (API Server and Client) remain lightweight and responsive. The heavy, unpredictable work is isolated in a backend system that can be scaled, monitored, and managed independently.
The Command Side Implementation
The command side’s sole responsibility is to accept, validate, and queue work. We’ll use NestJS for its robust dependency injection and modularity, which aligns well with CQRS principles. Redis will serve as our message queue due to its performance and simplicity.
1. Defining the Command and the API Endpoint
First, we define the data transfer object (DTO) for our command. This provides clear validation rules.
// src/commands/dto/analyze-url.dto.ts
import { IsUrl, IsNotEmpty, IsString, IsUUID } from 'class-validator';
import { v4 as uuidv4 } from 'uuid';
export class AnalyzeUrlCommand {
@IsUUID()
public readonly commandId: string;
@IsUrl()
@IsNotEmpty()
public readonly url: string;
@IsString()
@IsNotEmpty()
public readonly userId: string; // For multi-tenancy and tracking
constructor(url: string, userId: string) {
this.commandId = uuidv4();
this.url = url;
this.userId = userId;
}
}
The controller is lean. Its only job is to receive the request, create a command object, and dispatch it. It does not wait for the work to be done.
// src/commands/commands.controller.ts
import { Controller, Post, Body, HttpCode, HttpStatus, Logger } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { AnalyzeUrlCommand } from './dto/analyze-url.dto';
interface AnalyzeUrlRequestBody {
url: string;
userId: string;
}
@Controller('commands')
export class CommandsController {
private readonly logger = new Logger(CommandsController.name);
constructor(private readonly commandBus: CommandBus) {}
@Post('analyze-url')
@HttpCode(HttpStatus.ACCEPTED)
async analyzeUrl(@Body() body: AnalyzeUrlRequestBody): Promise<{ commandId: string }> {
const command = new AnalyzeUrlCommand(body.url, body.userId);
this.logger.log(`Dispatching AnalyzeUrlCommand: ${command.commandId} for URL: ${command.url}`);
// The commandBus will delegate this to the appropriate handler.
await this.commandBus.execute(command);
return { commandId: command.commandId };
}
}
2. The Command Handler: Decoupling via Message Queue
The command handler’s role is not to execute the task but to place it on the message queue. This is a critical decoupling point. A common mistake is to have the command handler perform the long-running task directly, which defeats the purpose of the pattern in a distributed context.
We’ll use ioredis
to interact with Redis Streams.
// src/commands/handlers/analyze-url.handler.ts
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { AnalyzeUrlCommand } from '../dto/analyze-url.dto';
import { Inject, Logger } from '@nestjs/common';
import { Redis } from 'ioredis';
export const REDIS_CLIENT = 'REDIS_CLIENT';
export const ANALYSIS_STREAM_KEY = 'analysis_jobs';
@CommandHandler(AnalyzeUrlCommand)
export class AnalyzeUrlHandler implements ICommandHandler<AnalyzeUrlCommand> {
private readonly logger = new Logger(AnalyzeUrlHandler.name);
constructor(@Inject(REDIS_CLIENT) private readonly redis: Redis) {}
async execute(command: AnalyzeUrlCommand) {
try {
// The payload is the serialized command.
// In a real project, consider Protobuf or Avro for schema evolution.
const payload = JSON.stringify(command);
// XADD is the command to add an entry to a Redis Stream.
// '*' generates a new unique ID for the entry.
const result = await this.redis.xadd(
ANALYSIS_STREAM_KEY,
'*',
'commandId', command.commandId,
'payload', payload,
);
this.logger.log(`Successfully queued command ${command.commandId} to stream ${ANALYSIS_STREAM_KEY}. Message ID: ${result}`);
} catch (error) {
this.logger.error(`Failed to queue command ${command.commandId}: ${error.message}`, error.stack);
// In production, you'd have a dead-letter queue or retry mechanism here.
throw new Error('Failed to queue analysis job.');
}
}
}
The API server’s responsibility now ends. It has accepted the work and guaranteed its persistence in the queue.
The Worker: Heavy Lifting in Isolation
The worker is a completely separate Node.js process. It has no HTTP server. Its only purpose is to connect to Redis, pull jobs from the stream, and process them. This isolation means we can scale the workers based on queue length, and if a worker crashes, it doesn’t affect the user-facing API.
1. Worker Setup and Queue Consumption
The worker uses a blocking read (XREADGROUP
) on the Redis Stream to wait for new jobs efficiently.
// worker/src/main.ts
import { Redis } from 'ioredis';
import { AnalysisProcessor } from './analysis.processor';
import pino from 'pino';
const logger = pino({ level: 'info' });
const STREAM_KEY = 'analysis_jobs';
const GROUP_NAME = 'analysis_group';
const CONSUMER_NAME = `consumer-${process.pid}`;
const redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
maxRetriesPerRequest: null, // Important for blocking reads
});
const analysisProcessor = new AnalysisProcessor();
async function createConsumerGroup() {
try {
// MKSTREAM creates the stream if it doesn't exist.
await redisClient.xgroup('CREATE', STREAM_KEY, GROUP_NAME, '$', 'MKSTREAM');
logger.info(`Consumer group '${GROUP_NAME}' created or already exists for stream '${STREAM_KEY}'.`);
} catch (err) {
if (err.message.includes('BUSYGROUP')) {
logger.warn(`Consumer group '${GROUP_NAME}' already exists.`);
} else {
logger.error(err, 'Failed to create consumer group.');
process.exit(1);
}
}
}
async function processStream() {
logger.info(`Worker ${CONSUMER_NAME} starting to listen for jobs...`);
while (true) {
try {
const results = await redisClient.xreadgroup(
'GROUP', GROUP_NAME, CONSUMER_NAME,
'COUNT', 1, // Process one message at a time
'BLOCK', 5000, // Block for up to 5 seconds
'STREAMS', STREAM_KEY,
'>' // '>' means new messages never seen by this group
);
if (results) {
const [streamName, messages] = results[0];
const [messageId, fields] = messages[0];
const payloadIndex = fields.findIndex(f => f === 'payload');
const commandPayload = JSON.parse(fields[payloadIndex + 1]);
logger.info({ commandId: commandPayload.commandId, messageId }, 'Processing job...');
await analysisProcessor.process(commandPayload);
// Acknowledge the message so it's not redelivered.
await redisClient.xack(STREAM_KEY, GROUP_NAME, messageId);
logger.info({ commandId: commandPayload.commandId, messageId }, 'Job processed and acknowledged.');
}
} catch (error) {
logger.error(error, 'Error processing stream message. This worker will restart processing loop.');
// A proper implementation needs a dead-letter queue for failing jobs.
await new Promise(resolve => setTimeout(resolve, 5000)); // Backoff
}
}
}
async function main() {
await createConsumerGroup();
await analysisProcessor.initialize();
await processStream();
}
main().catch(err => {
logger.error(err, 'Worker process encountered a fatal error.');
process.exit(1);
});
2. Puppeteer and LangChain Integration
The AnalysisProcessor
is where the core logic resides. It orchestrates Puppeteer for data extraction and LangChain for AI-driven analysis. The key here is robust error handling and resource management. Puppeteer browsers are heavy and must be managed carefully.
// worker/src/analysis.processor.ts
import puppeteer, { Browser } from 'puppeteer';
import { OpenAI } from '@langchain/openai';
import { PromptTemplate } from "@langchain/core/prompts";
import { LLMChain } from "langchain/chains";
import { connect, Mongoose } from 'mongoose';
import { AnalysisResultModel } from './schemas/analysis-result.schema';
import pino from 'pino';
const logger = pino({ level: 'info' });
export class AnalysisProcessor {
private browser: Browser | null = null;
private llm: OpenAI;
private mongoClient: Mongoose;
constructor() {
// In a real project, secrets must come from a vault or env variables.
this.llm = new OpenAI({
openAIApiKey: process.env.OPENAI_API_KEY,
temperature: 0.2,
modelName: 'gpt-3.5-turbo-instruct',
});
}
async initialize() {
logger.info('Initializing Puppeteer browser...');
this.browser = await puppeteer.launch({
headless: true,
args: ['--no-sandbox', '--disable-setuid-sandbox'] // Essential for Docker
});
logger.info('Connecting to MongoDB...');
this.mongoClient = await connect(process.env.MONGO_URI || 'mongodb://localhost:27017/read_model');
logger.info('Worker initialization complete.');
}
async process(command: any): Promise<void> {
const { commandId, url, userId } = command;
let page;
try {
page = await this.browser.newPage();
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
// Extract main content, avoiding boilerplate like navbars and footers.
const rawContent = await page.evaluate(() => {
// A more robust implementation would use a library like Readability.js
const main = document.querySelector('main, article, #main, .main');
return main ? main.innerText : document.body.innerText;
});
await page.close();
if (rawContent.trim().length < 100) {
throw new Error('Extracted content is too short to be meaningful.');
}
logger.info({ commandId }, `Extracted ${rawContent.length} characters from URL.`);
const summary = await this.generateSummary(rawContent);
await this.saveResult(commandId, url, userId, summary);
} catch (error) {
logger.error({ commandId, url, error: error.message }, 'Processing failed.');
if (page && !page.isClosed()) {
await page.close();
}
// Here we would update the read model with a 'FAILED' status.
await this.saveResult(commandId, url, userId, 'Analysis failed.', 'FAILED');
}
}
private async generateSummary(content: string): Promise<string> {
const template = "Summarize the following text for a business executive. Focus on the key takeaways and actionable insights:\n\n---\n\n{text}\n\n---\n\nSUMMARY:";
const prompt = new PromptTemplate({ template, inputVariables: ["text"] });
const chain = new LLMChain({ llm: this.llm, prompt });
// Truncate content to fit model context window.
const truncatedContent = content.substring(0, 8000);
const result = await chain.call({ text: truncatedContent });
return result.text.trim();
}
private async saveResult(commandId: string, url: string, userId: string, summary: string, status: string = 'COMPLETED') {
const result = {
commandId,
url,
userId,
summary,
status,
createdAt: new Date(),
updatedAt: new Date(),
};
// Using `findOneAndUpdate` with `upsert` is idempotent.
await AnalysisResultModel.findOneAndUpdate(
{ commandId: commandId },
result,
{ upsert: true, new: true }
);
logger.info({ commandId }, `Result saved to read model with status: ${status}.`);
}
}
The worker now successfully bridges the gap between the command and the query model.
The Query Side and Client-Side Resilience
The query side is designed for speed and simplicity. It reads from a denormalized model and does nothing else. The real magic for user experience, however, happens on the client with a Service Worker.
1. The Read Model and Query API
The Mongoose schema for our read model is straightforward. We would add indices on userId
and url
for fast lookups.
// worker/src/schemas/analysis-result.schema.ts
import { Schema, model } from 'mongoose';
export const AnalysisResultSchema = new Schema({
commandId: { type: String, required: true, unique: true, index: true },
url: { type: String, required: true, index: true },
userId: { type: String, required: true, index: true },
summary: { type: String, required: true },
status: { type: String, required: true, enum: ['PENDING', 'COMPLETED', 'FAILED'], default: 'PENDING' },
createdAt: { type: Date, default: Date.now },
updatedAt: { type: Date, default: Date.now }
});
export const AnalysisResultModel = model('AnalysisResult', AnalysisResultSchema);
The query API in our NestJS application is a simple read-only endpoint.
// src/queries/queries.controller.ts
import { Controller, Get, Query, Logger, NotFoundException } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { AnalysisResult } from './interfaces/analysis-result.interface'; // Mongoose Document interface
@Controller('queries')
export class QueriesController {
private readonly logger = new Logger(QueriesController.name);
constructor(
@InjectModel('AnalysisResult') private readonly resultModel: Model<AnalysisResult>,
) {}
@Get('analysis-by-url')
async getAnalysisByUrl(@Query('url') url: string, @Query('userId') userId: string) {
if (!url || !userId) {
throw new NotFoundException('URL and userId are required.');
}
this.logger.log(`Fetching analysis for URL: ${url}`);
// Find the most recent completed analysis for a given URL.
const result = await this.resultModel
.findOne({ url, userId, status: 'COMPLETED' })
.sort({ createdAt: -1 })
.exec();
if (!result) {
throw new NotFoundException(`No completed analysis found for URL: ${url}`);
}
return {
url: result.url,
summary: result.summary,
lastAnalyzed: result.updatedAt,
};
}
}
2. The Service Worker for an Offline-First Experience
The Service Worker is the final piece. It intercepts network requests from the client application and serves cached data when available, providing an instant-loading and offline-capable experience. We’ll use a stale-while-revalidate
strategy for our query API.
// public/sw.js
const CACHE_NAME = 'web-intelligence-cache-v1';
const API_ENDPOINT = '/queries/analysis-by-url';
// On install, pre-cache the application shell (HTML, CSS, JS).
self.addEventListener('install', (event) => {
event.waitUntil(
caches.open(CACHE_NAME).then((cache) => {
console.log('Service Worker: Caching app shell');
return cache.addAll([
'/',
'/index.html',
'/styles.css',
'/app.js'
]);
})
);
});
// Clean up old caches on activation.
self.addEventListener('activate', (event) => {
event.waitUntil(
caches.keys().then((cacheNames) => {
return Promise.all(
cacheNames.map((cacheName) => {
if (cacheName !== CACHE_NAME) {
console.log('Service Worker: Clearing old cache', cacheName);
return caches.delete(cacheName);
}
})
);
})
);
});
self.addEventListener('fetch', (event) => {
const requestUrl = new URL(event.request.url);
// Only apply the strategy to our specific API endpoint.
if (requestUrl.pathname === API_ENDPOINT) {
event.respondWith(staleWhileRevalidate(event.request));
} else {
// For other requests, use a cache-first strategy.
event.respondWith(
caches.match(event.request).then((response) => {
return response || fetch(event.request);
})
);
}
});
function staleWhileRevalidate(request) {
return caches.open(CACHE_NAME).then((cache) => {
return cache.match(request).then((cachedResponse) => {
// Fetch a fresh version in the background.
const fetchPromise = fetch(request).then((networkResponse) => {
// A pitfall here is caching error responses.
// We only cache successful (2xx) responses.
if (networkResponse.ok) {
cache.put(request, networkResponse.clone());
}
return networkResponse;
});
// Return the cached response immediately if available, otherwise wait for the network.
return cachedResponse || fetchPromise;
});
});
}
This client-side implementation transforms the user experience. When a user revisits a page for an already-analyzed URL, the summary appears instantly from the cache, while the service worker silently fetches a fresh version in the background to update the cache for the next visit. If the user is offline, the cached version is served without issue.
Architectural Limitations and Future Considerations
This architecture, while powerful, introduces its own set of complexities. The most significant trade-off is the shift from strong consistency to eventual consistency. There will be a delay, ranging from seconds to minutes, between a user requesting an analysis and the result being available for query. The UI must be designed to handle this, perhaps by showing a “processing” state. The operational overhead is also higher; we now have to manage and monitor a message queue and a separate pool of worker processes. A robust implementation would require a dead-letter queue for failed jobs, sophisticated retry logic, and detailed monitoring on queue length and worker throughput. Finally, the cost of running persistent Puppeteer workers and making frequent LLM API calls must be carefully managed. This pattern is not a universal solution but is exceptionally effective for systems with the specific asymmetrical performance requirements we defined—where the cost of a slow command is acceptable if it enables an instantaneous and resilient query experience.