The initial system design was straightforward and, in hindsight, naive. An API endpoint received a block of text, passed it to a Python worker for Named Entity Recognition (NER) using spaCy
, and returned the extracted entities. For a while, this worked. But as request volume grew from hundreds per minute to thousands per second during peak traffic, the P99 latency ballooned to over 800ms, shattering our 100ms SLA. The core problem wasn’t a single slow component, but a systemic failure to handle concurrent, repetitive, and CPU-intensive work efficiently. Simply scaling out the spaCy
workers horizontally was a temporary, and expensive, fix that didn’t address the root cause: we were treating a batch-friendly workload as a series of isolated, high-latency transactions.
Our initial attempt at a solution was to introduce a caching layer. Memcached
was selected for its raw speed and simplicity. The implementation was trivial: hash the input text to create a key, check Memcached
, and on a miss, perform the spaCy
processing and write the result back to the cache with a TTL.
# initial_caching_attempt.py
import hashlib
import json
import spacy
from pymemcache.client.base import Client as MemcachedClient
# Global instances - a common mistake in early-stage services.
# Loading a large model like this on module import can cause issues.
NLP_MODEL = spacy.load("en_core_web_lg")
MEMCACHED_CLIENT = MemcachedClient(('memcached-host', 11211))
def get_entities_with_simple_cache(text: str) -> list:
"""
Processes text to extract entities, with a simple cache-aside pattern.
"""
# Using SHA256 is overkill and slower than necessary, but was the first choice.
cache_key = hashlib.sha256(text.encode()).hexdigest()
cached_result = MEMCACHED_CLIENT.get(cache_key)
if cached_result:
# Assuming the result is stored as a JSON string
return json.loads(cached_result.decode('utf-8'))
# Cache miss - this is the expensive operation
doc = NLP_MODEL(text)
entities = [(ent.text, ent.label_) for ent in doc.ents]
# Store in cache for 1 hour
MEMCACHED_CLIENT.set(cache_key, json.dumps(entities).encode('utf-8'), expire=3600)
return entities
This provided immediate relief for high-frequency, identical requests. The cache hit rate climbed to 40%, and average latency dropped. However, the P99 latency remained stubbornly high. The long tail of unique text inputs always resulted in a cache miss, hitting the expensive spaCy
model directly. The core spaCy
processing for a single document was the bottleneck, and no amount of simple caching could fix that fundamental constraint. The real breakthrough came when we analyzed the workload itself and realized spaCy
is significantly more performant when processing documents in batches using nlp.pipe()
. The challenge then shifted: how to transform a stream of individual, asynchronous API requests into efficient batches for the NLP model.
Architecting a Request Aggregation Layer
The solution was to build an intermediate request aggregation service. This component would sit between the public-facing API gateway and the spaCy
workers. Its sole purpose is to buffer incoming requests for a very short time window (e.g., 20 milliseconds), group them into a single batch, send that batch to a worker, and then distribute the results back to the waiting original callers.
This pattern trades a small, predictable amount of latency for a massive gain in overall system throughput. asyncio
in Python was the natural choice for building such a non-blocking, I/O-bound component.
Here is the core structure of the aggregator. It relies on a shared queue and a background worker task that performs the batching.
# request_aggregator.py
import asyncio
import logging
import time
import uuid
from collections import namedtuple
from typing import Dict, List, Tuple
# Configure structured logging for production environments
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Represents a single incoming request waiting for processing.
# The `future` is the key mechanism to return the result to the caller.
PendingRequest = namedtuple('PendingRequest', ['text', 'future'])
class NlpBatchProcessor:
"""
Manages batching of NLP requests to improve throughput.
"""
def __init__(self, spacy_worker_func, max_batch_size: int, aggregation_window_ms: float):
if not asyncio.iscoroutinefunction(spacy_worker_func):
raise TypeError("spacy_worker_func must be an awaitable coroutine.")
self.request_queue = asyncio.Queue()
self.spacy_worker_func = spacy_worker_func
self.max_batch_size = max_batch_size
self.aggregation_window = aggregation_window_ms / 1000.0 # Convert ms to seconds
self._task = asyncio.create_task(self._batch_processor_loop())
logger.info(f"Initialized NlpBatchProcessor with batch size {max_batch_size} and window {aggregation_window_ms}ms")
async def _batch_processor_loop(self):
"""The core background task that continuously forms and processes batches."""
while True:
try:
await self._process_next_batch()
except asyncio.CancelledError:
logger.info("Batch processor loop cancelled.")
break
except Exception as e:
logger.error(f"Unhandled exception in batch processor loop: {e}", exc_info=True)
# In a real-world scenario, you might want a backoff strategy.
await asyncio.sleep(1)
async def _process_next_batch(self):
"""Waits for requests and processes a batch when conditions are met."""
batch_start_time = time.monotonic()
# Wait for the first request to start a new batch
first_request: PendingRequest = await self.request_queue.get()
current_batch = [first_request]
# Collect more requests until the batch is full or the time window expires
while len(current_batch) < self.max_batch_size:
time_spent = time.monotonic() - batch_start_time
if time_spent >= self.aggregation_window:
break
remaining_time = self.aggregation_window - time_spent
try:
# Wait for the next item, but with a timeout
request = await asyncio.wait_for(self.request_queue.get(), timeout=remaining_time)
current_batch.append(request)
except asyncio.TimeoutError:
# Time window expired, proceed with the current batch
break
batch_texts = [req.text for req in current_batch]
batch_id = str(uuid.uuid4())
logger.info(f"Processing batch {batch_id} with {len(current_batch)} items.")
try:
# This is the call to the actual NLP worker pool
results = await self.spacy_worker_func(batch_texts)
if len(results) != len(current_batch):
raise ValueError("Mismatch between number of requests and results.")
# Distribute results back to the original callers
for request, result in zip(current_batch, results):
# This is how the original caller gets its specific response
if not request.future.done():
request.future.set_result(result)
except Exception as e:
logger.error(f"Failed to process batch {batch_id}: {e}", exc_info=True)
# Propagate the error to all waiting callers in the batch
for request in current_batch:
if not request.future.done():
request.future.set_exception(e)
async def process(self, text: str) -> List[Tuple[str, str]]:
"""Public method for submitting a text for processing."""
future = asyncio.get_running_loop().create_future()
await self.request_queue.put(PendingRequest(text=text, future=future))
# The `await future` call will pause this coroutine until the
# background batch processor sets the result or an exception.
return await future
def shutdown(self):
"""Gracefully shuts down the background processor task."""
logger.info("Shutting down batch processor.")
self._task.cancel()
This aggregator transformed our architecture. The spaCy
workers no longer received a chaotic stream of single requests but a steady, orderly flow of large batches, allowing nlp.pipe()
to achieve near-optimal CPU utilization.
Integrating Snowflake for Pre-Computation
Even with batching, some NLP tasks remained too slow for real-time processing, particularly those requiring complex lookups against our massive internal knowledge base stored in Snowflake
. The solution was to introduce a “cold” pre-computation layer. Nightly, we run jobs using the Snowflake Python connector to scan new and updated text corpora, perform the heaviest parts of the analysis, and cache the results in a format that the real-time service can consume quickly.
This is not just caching; it’s a fundamental shift of computational load from real-time to batch. A typical job might identify all known company names in our entire document store.
# snowflake_precomputation.py
import os
import snowflake.connector
import json
# In a production system, use a secure secret management tool.
SNOWFLAKE_USER = os.environ.get("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.environ.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = os.environ.get("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"
SNOWFLAKE_DATABASE = "ANALYTICS"
SNOWFLAKE_SCHEMA = "NLP_PRECOMPUTE"
def run_entity_extraction_job():
"""
Connects to Snowflake, runs a heavy query, and stores the results.
This would typically be orchestrated by Airflow or a similar tool.
"""
ctx = None
try:
ctx = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA
)
cs = ctx.cursor()
# A simplified query. The real query might involve UDFs or complex joins.
# The goal is to offload this expensive work from the real-time path.
query = """
CREATE OR REPLACE TABLE PRECOMPUTED_ENTITIES AS
SELECT
document_id,
NER_EXTRACT_UDF(document_text) AS entities_json
FROM
RAW_DOCUMENTS
WHERE
last_updated_timestamp > (SELECT MAX(last_processed_ts) FROM processing_log);
"""
logger.info("Starting Snowflake pre-computation job.")
cs.execute(query)
logger.info(f"Job completed. Processed {cs.rowcount} records.")
except snowflake.connector.errors.ProgrammingError as e:
logger.error(f"Snowflake error: {e}")
# Add specific error handling for credentials, network issues, etc.
finally:
if ctx:
ctx.close()
# The real-time service can then query this `PRECOMPUTED_ENTITIES` table
# for specific document_ids, which is vastly faster than running NLP on the fly.
The architecture now had three tiers of data access:
- Hot Tier (Memcached): Sub-millisecond lookup for frequently accessed, identical texts.
- Warm Tier (Python/spaCy Service): Real-time, batched processing for dynamic, unseen texts. Latency of 20-50ms.
- Cold Tier (Snowflake): Nightly, large-scale batch processing. The results feed into the warm tier’s lookup tables or are directly cached.
graph TD subgraph Real-Time Path A[API Gateway] --> B{Request Aggregator}; B --> C{Memcached Cache}; C -- Cache Hit --> A; C -- Cache Miss --> B; B -- Batch (texts) --> D[spaCy Worker Pool]; D -- Fetches pre-computed data --> E[Fast Lookup DB / Cache]; D -- Batch Results --> B; B -- Individual Results --> A; end subgraph Offline Path F[Raw Text Data Source] --> G[Snowflake Warehouse]; H[Nightly ETL Job] -- Runs heavy NLP queries --> G; G -- Pre-computed Results --> E; end style A fill:#cde4ff style D fill:#d5e8d4 style G fill:#f8cecc
Testing the Concurrent Aggregator
Testing an asyncio
-based concurrent component is non-trivial. It requires careful control over the event loop and mocking of external dependencies. We used pytest-asyncio
to validate the aggregator’s logic.
# test_aggregator.py
import asyncio
import pytest
from unittest.mock import AsyncMock
from request_aggregator import NlpBatchProcessor
# Mark all tests in this file as asyncio
pytestmark = pytest.mark.asyncio
async def mock_spacy_worker(texts: list[str]) -> list:
"""A mock worker that simulates processing and returns predictable results."""
# Simulate I/O and CPU work
await asyncio.sleep(0.05)
return [[(f"entity_{i}", "ORG")] for i in range(len(texts))]
@pytest.fixture
def processor():
"""Fixture to set up and tear down the NlpBatchProcessor."""
proc = NlpBatchProcessor(
spacy_worker_func=mock_spacy_worker,
max_batch_size=10,
aggregation_window_ms=20
)
yield proc
# Teardown
proc.shutdown()
async def test_batching_by_size(processor: NlpBatchProcessor):
"""Verify that a full batch is triggered by size."""
# Replace the real worker with a mock to inspect calls
processor.spacy_worker_func = AsyncMock(wraps=mock_spacy_worker)
# Fire off 10 requests concurrently, which should fill the batch
tasks = [asyncio.create_task(processor.process(f"text {i}")) for i in range(10)]
results = await asyncio.gather(*tasks)
assert len(results) == 10
# The worker should have been called exactly once with a batch of 10
processor.spacy_worker_func.assert_called_once()
assert len(processor.spacy_worker_func.call_args[0][0]) == 10
async def test_batching_by_time_window(processor: NlpBatchProcessor):
"""Verify that a partial batch is triggered by the time window expiring."""
processor.spacy_worker_func = AsyncMock(wraps=mock_spacy_worker)
# Fire off 3 requests, less than the batch size
tasks = [asyncio.create_task(processor.process(f"text {i}")) for i in range(3)]
# Give it more than enough time for the aggregation window to close
await asyncio.sleep(0.05)
results = await asyncio.gather(*tasks)
assert len(results) == 3
# Worker should have been called once with a batch of 3
processor.spacy_worker_func.assert_called_once()
assert len(processor.spacy_worker_func.call_args[0][0]) == 3
A Lightweight Diagnostic Dashboard with Rollup
Monitoring this distributed system became a new challenge. We needed a simple way to visualize the aggregator’s performance: incoming request rate, batch size distribution, cache hit ratio, and P99 latency. Building a full-fledged React or Vue application felt like overkill.
The decision was to create a single, self-contained HTML file with an embedded JavaScript bundle. Rollup
was the perfect tool for this. It could take our vanilla JavaScript code, a small charting library, and a WebSocket client, and bundle them into one minified file with no external dependencies. This monitor.js
could be easily served by our Python backend and embedded in any internal admin tool.
The rollup.config.js
is focused on creating this minimal bundle.
// rollup.config.js
import { nodeResolve } from '@rollup/plugin-node-resolve';
import terser from '@rollup/plugin-terser';
import commonjs from '@rollup/plugin-commonjs';
export default {
input: 'src/monitor.js',
output: {
file: 'dist/monitor.bundle.js',
format: 'iife', // Immediately Invoked Function Expression, for browser embedding
name: 'NlpMonitor',
sourcemap: false
},
plugins: [
// To find dependencies in node_modules
nodeResolve(),
// To convert CommonJS modules to ES6
commonjs(),
// Minify the output for production
terser()
]
};
The corresponding JavaScript would establish a WebSocket connection to a metrics endpoint on our Python server and render the data.
// src/monitor.js
// A lightweight charting library like Chart.js or a custom D3 implementation
import Chart from 'chart.js/auto';
const WS_URL = "ws://localhost:8000/ws/metrics";
function initializeChart(ctx, label) {
return new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: label,
data: [],
borderColor: 'rgb(75, 192, 192)',
tension: 0.1
}]
},
options: {
scales: { y: { beginAtZero: true } }
}
});
}
function updateChart(chart, label, value) {
chart.data.labels.push(label);
chart.data.datasets[0].data.push(value);
// Limit data points to avoid memory leaks in long-running dashboards
if (chart.data.labels.length > 50) {
chart.data.labels.shift();
chart.data.datasets[0].data.shift();
}
chart.update();
}
document.addEventListener("DOMContentLoaded", () => {
const p99LatencyCtx = document.getElementById('p99LatencyChart').getContext('2d');
const batchSizeCtx = document.getElementById('batchSizeChart').getContext('2d');
const latencyChart = initializeChart(p99LatencyCtx, 'P99 Latency (ms)');
const batchSizeChart = initializeChart(batchSizeCtx, 'Average Batch Size');
const ws = new WebSocket(WS_URL);
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
const timestamp = new Date(data.timestamp).toLocaleTimeString();
updateChart(latencyChart, timestamp, data.p99_latency_ms);
updateChart(batchSizeChart, timestamp, data.avg_batch_size);
document.getElementById('cacheHitRatio').innerText = `${data.cache_hit_ratio.toFixed(2)}%`;
} catch (e) {
console.error("Failed to parse metrics", e);
}
};
ws.onerror = (error) => {
console.error("WebSocket Error: ", error);
};
ws.onclose = () => {
console.log("WebSocket connection closed. Attempting to reconnect...");
// Add reconnect logic here.
};
});
This approach delivered a functional, performant monitoring tool with minimal development overhead, proving that modern front-end tooling like Rollup
has its place even in backend-focused infrastructure projects.
The current system now consistently operates with a P99 latency under 50ms, even under significantly higher load than the original architecture could handle. However, the request aggregator itself remains a potential single point of failure. A future iteration could involve a distributed aggregation mechanism using a shared medium like Redis streams or Kafka, allowing for horizontal scaling of the aggregator component itself. Furthermore, our cache invalidation is purely TTL-based; for data that changes more dynamically, implementing a Change Data Capture (CDC) pipeline from Snowflake
to proactively invalidate Memcached
entries would be the next logical step toward a more robust and responsive system.