Building a High-Throughput Search Ingestion Service Using an ASP.NET Core and Rust Hybrid Architecture


The primary bottleneck was not the database or the business logic, but the synchronous indexing of documents into our Solr cluster directly from the request thread in our main ASP.NET Core application. Every product update, user profile change, or content submission triggered a direct HTTP call to Solr. Under moderate load, this was acceptable. Under peak load, this design coupled the application’s API latency directly to Solr’s indexing performance, leading to cascading failures. The requirement was clear: decouple the ingestion pipeline, make it asynchronous, and ensure it can handle sustained bursts of over 100,000 document updates per minute without affecting the main application’s performance or resource footprint.

Architectural Decision Point: The Ingestion Worker

Two primary architectural paths were considered for building this decoupled ingestion worker. The choice here would have long-term implications on performance, maintainability, and operational complexity.

Solution A: In-Process Background Service within the ASP.NET Core Ecosystem

The most straightforward approach was to leverage the existing ASP.NET Core stack. We could implement an IHostedService that runs a background loop. The main application would enqueue indexing jobs into a persistent message queue (like Redis Streams or RabbitMQ), and the background service would consume these jobs, batch them, and send them to Solr.

Pros:

  • Unified Stack: No new languages or build toolchains are introduced. The existing team can maintain the entire codebase.
  • Rapid Development: Leveraging familiar libraries like StackExchange.Redis and SolrNet would accelerate initial implementation.
  • Simplified Deployment: The worker could be a separate ASP.NET Core process deployed as a service, sharing much of the same containerization and CI/CD logic as the main application.

Cons:

  • Garbage Collection Overhead: In a high-throughput scenario, deserializing tens of thousands of messages per minute from the queue, creating corresponding C# objects, and then serializing them into Solr’s required format generates significant memory pressure. The .NET garbage collector, while highly optimized, can introduce non-deterministic pauses (stop-the-world events) that affect the predictability of our ingestion latency. For a service whose sole purpose is high-throughput data shoveling, GC pauses are a significant liability.
  • Limited Low-Level Control: Fine-tuning memory allocation, buffer management, and network socket behavior is less direct in a managed runtime. Achieving true zero-copy or near-zero-copy operations is difficult, which is critical for squeezing maximum performance out of the network and CPU for this I/O-bound task.
  • Resource Footprint: The .NET runtime carries a certain memory overhead. A dedicated process for this task, built on the same framework as the main web application, might consume more resources than a purpose-built, lean alternative.

Solution B: A Dedicated Rust Sidecar Service

This approach involves creating a small, highly optimized standalone service in Rust. This service would act as a “sidecar” in our deployment topology. Its only responsibility is to consume from the message queue, perform transformations, batch data, and communicate with Solr. Communication between the ASP.NET Core application and the Rust service would remain via the message queue.

Pros:

  • Predictable Performance: Rust has no garbage collector. Memory management is explicit and compile-time checked. This eliminates GC pauses entirely, making latency profiles highly predictable—a critical feature for a core infrastructure component.
  • Extreme Efficiency: Rust provides low-level control over system resources, rivaling C/C++. This allows for meticulous optimization of network I/O, memory buffers, and CPU usage. The resulting binary is small and has a minimal resource footprint.
  • Safety Guarantees: Rust’s ownership model and borrow checker eliminate entire classes of memory safety bugs (e.g., null pointer dereferences, data races) at compile time. For a critical service handling a constant stream of production data, this level of reliability is a massive architectural advantage.
  • True Decoupling: The ingestion worker is not just logically separate but is isolated at the runtime level. A crash or resource leak in the Rust service cannot impact the .NET runtime of the main application, and vice versa.

Cons:

  • Increased Complexity: Introduces a new programming language and toolchain into the ecosystem. This requires developer expertise, a separate build pipeline, and new monitoring considerations.
  • Inter-Process Communication Overhead: While the message queue acts as the primary buffer, the design relies on a well-defined and stable contract (the message format) between the C# producer and the Rust consumer. Schema evolution must be managed carefully.

The Final Choice and Rationale

We selected Solution B: The Rust Sidecar Service. In a production environment, predictable performance under load and operational resilience are paramount. The risk of GC pauses in the .NET worker (Solution A) directly impacting our ability to keep the ingestion queue clear was deemed too high. The long-term benefits of a rock-solid, low-footprint, and highly performant ingestion service built in Rust justified the initial investment in mastering a new technology stack. The decoupling provided by this model allows the main application team to focus on business logic, while the infrastructure team can own and optimize this critical data pipeline component. The cost of a production incident caused by resource contention far outweighs the cost of maintaining a small, specialized Rust service.

Core Implementation

The final architecture follows this data flow:

graph TD
    subgraph Frontend Tier
        A[React Component] -->|API Call| B{ASP.NET Core API}
        C[Legacy Vue/Pinia Component] -->|API Call| B
    end

    subgraph Backend Tier
        B -->|Enqueues Message| D[Redis Stream: 'solr-ingest-stream']
        E[Rust Ingestion Service] -->|Reads Batch| D
        E -->|Bulk Update| F[Solr Cluster]
    end

    subgraph Query Path
        A -->|Search Query API| B
        B -->|Queries| F
    end

    style E fill:#f9f,stroke:#333,stroke-width:2px

1. ASP.NET Core Producer

The C# application is responsible for serializing the data into a compact binary format and pushing it to Redis. We chose MessagePack for its performance and cross-language support.

First, the configuration in appsettings.json:

{
  "Redis": {
    "ConnectionString": "redis:6379",
    "IngestStreamName": "solr-ingest-stream"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  }
}

The data model and the producer service:

// In a separate project for shared models
using MessagePack;

namespace HybridIngestion.Models
{
    [MessagePackObject]
    public class DocumentToIndex
    {
        [Key(0)]
        public string Id { get; set; }

        [Key(1)]
        public string Content { get; set; }

        [Key(2)]
        public string Category { get; set; }

        [Key(3)]
        public long TimestampUtc { get; set; }
    }
}

// In the ASP.NET Core application
using StackExchange.Redis;
using MessagePack;
using Microsoft.Extensions.Options;
using HybridIngestion.Models;

public class RedisIngestionService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly IDatabase _db;
    private readonly string _streamName;
    private readonly ILogger<RedisIngestionService> _logger;

    public RedisIngestionService(IConnectionMultiplexer redis, IOptions<RedisSettings> settings, ILogger<RedisIngestionService> logger)
    {
        _redis = redis;
        _db = _redis.GetDatabase();
        _streamName = settings.Value.IngestStreamName;
        _logger = logger;
    }

    public async Task EnqueueDocumentAsync(DocumentToIndex document)
    {
        try
        {
            // The core logic is to serialize the object into a byte array using MessagePack.
            // This is significantly more efficient than JSON for inter-service communication.
            var serializedDocument = MessagePackSerializer.Serialize(document);

            var values = new NameValueEntry[]
            {
                new NameValueEntry("data", serializedDocument)
            };

            // StreamAddAsync is the command to append to a Redis Stream.
            // This provides a durable, append-only log structure.
            var messageId = await _db.StreamAddAsync(_streamName, values);
            
            _logger.LogInformation("Successfully enqueued document {DocumentId} with message ID {MessageId}", document.Id, messageId);
        }
        catch (RedisConnectionException ex)
        {
            _logger.LogError(ex, "Failed to connect to Redis while enqueuing document {DocumentId}", document.Id);
            // In a real-world project, this should trigger a circuit breaker or a fallback mechanism.
            throw;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An unexpected error occurred while enqueuing document {DocumentId}", document.Id);
            throw;
        }
    }
}

public class RedisSettings
{
    public string ConnectionString { get; set; }
    public string IngestStreamName { get; set; }
}

This service is registered in Program.cs as a singleton and injected wherever a document needs to be indexed.

2. The Rust Ingestion Sidecar

This is the heart of the solution. The Rust application is a long-running process that connects to Redis, consumes messages in batches, deserializes them, constructs a Solr bulk update request, and sends it.

The Cargo.toml file defines the dependencies:

[package]
name = "solr_ingester"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1", features = ["full"] }
redis = { version = "0.23", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] }
rmp-serde = "1.1" # MessagePack serializer/deserializer
reqwest = { version = "0.11", features = ["json"] }
serde_json = "1.0"
config = "0.13"
log = "0.4"
env_logger = "0.10"
thiserror = "1.0"

The core logic in main.rs:

use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use log::{info, error, warn};
use thiserror::Error;

// This struct must exactly match the C# MessagePackObject structure.
// Any deviation will cause deserialization failures.
#[derive(Serialize, Deserialize, Debug)]
struct DocumentToIndex {
    #[serde(rename = "Id")]
    id: String,
    #[serde(rename = "Content")]
    content: String,
    #[serde(rename = "Category")]
    category: String,
    #[serde(rename = "TimestampUtc")]
    timestamp_utc: i64,
}

#[derive(Error, Debug)]
enum IngesterError {
    #[error("Redis connection error: {0}")]
    RedisError(#[from] redis::RedisError),
    #[error("MessagePack deserialization error: {0}")]
    DeserializationError(#[from] rmp_serde::decode::Error),
    #[error("Solr request failed: {0}")]
    SolrRequestError(#[from] reqwest::Error),
    #[error("Solr API returned an error: {0}")]
    SolrApiError(String),
    #[error("Configuration error: {0}")]
    ConfigError(#[from] config::ConfigError),
}

const BATCH_SIZE: usize = 500;
const BATCH_TIMEOUT_MS: u64 = 1000;

#[tokio::main]
async fn main() -> Result<(), IngesterError> {
    env_logger::init();

    // --- Configuration Loading ---
    let settings = config::Config::builder()
        .add_source(config::File::with_name("config/default"))
        .build()?
        .try_deserialize::<Settings>()?;
    
    info!("Configuration loaded. Connecting to Redis at {}", &settings.redis.connection_string);
    
    let redis_client = redis::Client::open(settings.redis.connection_string.clone())?;
    let mut redis_con = redis_client.get_async_connection().await?;
    
    let http_client = reqwest::Client::builder()
        .timeout(Duration::from_secs(10))
        .build()?;

    let stream_name = &settings.redis.ingest_stream_name;
    let group_name = "ingester_group";
    let consumer_name = format!("consumer-{}", uuid::Uuid::new_v4());

    // Create a consumer group. In a distributed setup, multiple instances of this service
    // can run, and Redis will distribute messages among them.
    // The 'mkstream' option creates the stream if it doesn't exist.
    let _: () = redis::cmd("XGROUP")
        .arg("CREATE")
        .arg(stream_name)
        .arg(group_name)
        .arg("0")
        .arg("MKSTREAM")
        .query_async(&mut redis_con)
        .await
        .or_else(|e| {
            // Ignore the "group already exists" error, which is expected on subsequent runs.
            if e.to_string().contains("BUSYGROUP") { Ok(()) } else { Err(e) }
        })?;
    
    info!("Consumer group '{}' created/ensured for stream '{}'", group_name, stream_name);
    info!("Starting message consumption loop as consumer '{}'", consumer_name);
    
    loop {
        if let Err(e) = process_message_batch(&mut redis_con, &http_client, &settings, group_name, &consumer_name).await {
            error!("An error occurred while processing a batch: {}. Retrying after delay...", e);
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }
}

async fn process_message_batch(
    redis_con: &mut redis::aio::Connection,
    http_client: &reqwest::Client,
    settings: &Settings,
    group_name: &str,
    consumer_name: &str
) -> Result<(), IngesterError> {

    // --- Consume from Redis Stream ---
    // This command blocks for up to BATCH_TIMEOUT_MS waiting for messages.
    // It will return up to BATCH_SIZE messages. This is the core of our batching strategy.
    let result: Option<redis::streams::StreamReadReply> = redis::cmd("XREADGROUP")
        .arg("GROUP").arg(group_name).arg(consumer_name)
        .arg("COUNT").arg(BATCH_SIZE)
        .arg("BLOCK").arg(BATCH_TIMEOUT_MS)
        .arg("STREAMS").arg(&settings.redis.ingest_stream_name)
        .arg(">") // '>' means read new, unread messages for this consumer.
        .query_async(redis_con)
        .await?;

    let reply = match result {
        Some(r) => r,
        None => { // This happens on timeout when no new messages are available.
            return Ok(());
        }
    };
    
    let mut documents_to_index = Vec::new();
    let mut message_ids_to_ack = Vec::new();

    for key in &reply.keys {
        for msg in &key.ids {
            // The data is stored in a hash. We retrieve the 'data' field, which is a byte array.
            if let Some(data_bytes) = msg.map.get("data").and_then(|val| {
                if let redis::Value::Data(bytes) = val { Some(bytes) } else { None }
            }) {
                match rmp_serde::from_slice::<DocumentToIndex>(data_bytes) {
                    Ok(doc) => documents_to_index.push(doc),
                    Err(e) => {
                        error!("Failed to deserialize message {}: {}. Moving to dead-letter queue is needed.", &msg.id, e);
                        // A proper implementation would move this message to a separate list for manual inspection.
                    }
                }
                message_ids_to_ack.push(msg.id.clone());
            }
        }
    }

    if documents_to_index.is_empty() {
        return Ok(());
    }
    
    let batch_size = documents_to_index.len();
    info!("Processing batch of {} documents.", batch_size);

    // --- Send Batch to Solr ---
    let solr_url = format!("{}/solr/{}/update?commit=false", settings.solr.base_url, settings.solr.collection);
    
    // The critical detail here is to use commit=false. Commits are expensive in Solr.
    // We rely on Solr's auto-commit settings (e.g., commit every 60 seconds) for efficiency.
    let response = http_client.post(&solr_url)
        .json(&documents_to_index)
        .send()
        .await?;
    
    if response.status().is_success() {
        // --- Acknowledge Messages in Redis ---
        // Only after a successful push to Solr do we acknowledge the messages.
        // This provides at-least-once delivery semantics.
        let _: i64 = redis::cmd("XACK")
            .arg(&settings.redis.ingest_stream_name)
            .arg(group_name)
            .arg(&message_ids_to_ack)
            .query_async(redis_con)
            .await?;
        info!("Successfully indexed and ACKed batch of {} documents.", batch_size);
    } else {
        let status = response.status();
        let body = response.text().await.unwrap_or_else(|_| "Could not read body".to_string());
        error!("Solr returned non-success status: {}. Body: {}", status, body);
        // Do not ACK messages on failure. Redis will re-deliver them to another consumer after a timeout.
        return Err(IngesterError::SolrApiError(format!("Status: {}, Body: {}", status, body)));
    }

    Ok(())
}


// --- Configuration Structures ---
#[derive(Debug, Deserialize)]
struct Settings {
    redis: RedisSettings,
    solr: SolrSettings,
}

#[derive(Debug, Deserialize)]
struct RedisSettings {
    connection_string: String,
    ingest_stream_name: String,
}

#[derive(Debug, Deserialize)]
struct SolrSettings {
    base_url: String,
    collection: String,
}

This Rust service is robust. It uses Tokio for an asynchronous event loop, handles Redis and network errors, and implements a batch processing loop that is both time- and size-bound. The use of Redis Streams consumer groups provides load balancing and fault tolerance out of the box.

3. Frontend Testing and Integration

On the frontend, the architecture is a mix of a legacy Vue/Pinia application and new development using React. The React Testing Library becomes crucial to ensure new components that rely on the search functionality are reliable and correctly handle various states (loading, success, error) independent of the complex backend pipeline.

Consider a React component that fetches search results from an ASP.NET Core endpoint which in turn queries Solr.

SearchComponent.jsx:

import React, { useState, useEffect } from 'react';

const SearchComponent = ({ query }) => {
    const [status, setStatus] = useState('idle');
    const [results, setResults] = useState([]);
    const [error, setError] = useState(null);

    useEffect(() => {
        if (!query) {
            setStatus('idle');
            setResults([]);
            return;
        }

        const controller = new AbortController();
        const { signal } = controller;
        
        setStatus('loading');
        fetch(`/api/search?q=${encodeURIComponent(query)}`, { signal })
            .then(res => {
                if (!res.ok) {
                    throw new Error(`HTTP error! status: ${res.status}`);
                }
                return res.json();
            })
            .then(data => {
                setResults(data);
                setStatus('succeeded');
            })
            .catch(err => {
                if (err.name !== 'AbortError') {
                    setError(err.message);
                    setStatus('failed');
                }
            });

        return () => {
            controller.abort();
        };
    }, [query]);

    if (status === 'loading') {
        return <div>Loading...</div>;
    }

    if (status === 'failed') {
        return <div role="alert">Error: {error}</div>;
    }

    return (
        <ul>
            {results.map(result => (
                <li key={result.id}>{result.content}</li>
            ))}
        </ul>
    );
};

export default SearchComponent;

Testing this component requires isolating it from the actual network. A common mistake is to mock the fetch function directly. A more robust approach uses a service worker mock like msw to intercept network requests at the network level, providing a realistic testing environment.

SearchComponent.test.js:

import React from 'react';
import { render, screen, waitFor } from '@testing-library/react';
import { rest } from 'msw';
import { setupServer } from 'msw/node';
import SearchComponent from './SearchComponent';

// Setup the mock server to intercept fetch calls.
const server = setupServer(
    rest.get('/api/search', (req, res, ctx) => {
        const query = req.url.searchParams.get('q');
        if (query === 'rust') {
            return res(
                ctx.status(200),
                ctx.json([
                    { id: '1', content: 'Fast and memory-efficient.' },
                    { id: '2', content: 'Ownership model is key.' },
                ])
            );
        }
        if (query === 'error') {
            return res(
                ctx.status(500),
                ctx.json({ message: 'Internal Server Error' })
            );
        }
        return res(ctx.json([]));
    })
);

beforeAll(() => server.listen());
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

test('renders loading state initially', () => {
    render(<SearchComponent query="rust" />);
    expect(screen.getByText(/Loading.../i)).toBeInTheDocument();
});

test('renders search results on successful fetch', async () => {
    render(<SearchComponent query="rust" />);
    
    // Wait for the asynchronous fetch to complete and the UI to update.
    const item1 = await screen.findByText(/Fast and memory-efficient./i);
    const item2 = await screen.findByText(/Ownership model is key./i);

    expect(item1).toBeInTheDocument();
    expect(item2).toBeInTheDocument();
    expect(screen.queryByText(/Loading.../i)).not.toBeInTheDocument();
});

test('renders error state on fetch failure', async () => {
    render(<SearchComponent query="error" />);

    // Wait for the error message to be displayed.
    const alert = await screen.findByRole('alert');
    expect(alert).toHaveTextContent(/Error: HTTP error! status: 500/i);
    expect(screen.queryByText(/Loading.../i)).not.toBeInTheDocument();
});

This test suite validates the component’s behavior without needing the entire backend stack to be running. It demonstrates how, even in a system with a complex, heterogeneous backend, frontend development can proceed with confidence and robust testing practices. The presence of Pinia in the legacy parts of the application simply underscores the reality of enterprise systems: they are often heterogeneous collections of technologies that must be managed and evolved.

Limitations and Future Iterations

This architecture, while robust, is not without its boundaries. The current error handling in the Rust service is basic; messages that repeatedly fail deserialization or cause a 4xx error from Solr will be re-processed indefinitely. A proper dead-letter queue (DLQ) mechanism is required, where failing messages are moved after a certain number of retries for manual inspection.

Furthermore, the communication contract (the MessagePack schema) is implicit. Any change to the DocumentToIndex class in C# must be perfectly mirrored in the Rust struct. For a larger team, implementing a schema registry or using a format like Avro or Protobuf with formal schema definitions would be a necessary step to prevent runtime errors.

Future optimizations could involve exploring io_uring on the Linux platform for the Rust service to achieve even higher I/O throughput, although the current tokio implementation is already exceptionally performant. The Redis stream can also become a bottleneck; for extreme scale, partitioning the ingestion stream or moving to a more scalable message broker like Kafka might be necessary.


  TOC