Building a CQRS-Driven Indexing Pipeline from a NestJS DDD Service to Solr via Consul


The initial pain point was a classic monolithic scaling problem. Our core ProductCatalog service, responsible for both managing product data and serving search queries, was buckling. A simple inventory update on a popular product would trigger a cascade of inefficient database queries and a full re-index job, making search results stale for unacceptable periods. The search functionality was directly coupled to the transactional PostgreSQL database, meaning complex search queries with multiple facets and filters were hammering the same resource needed for order processing. This tight coupling was a direct threat to business operations.

Our first conceptual move was to separate the concerns. The Command Query Responsibility Segregation (CQRS) pattern was the obvious architectural choice. We envisioned a clear split: a WriteService handling all state changes (Commands) and a completely independent QueryService optimized for reads. For the read model, Apache Solr was the clear winner, as we already had operational expertise and its text search capabilities far exceeded what PostgreSQL could offer. This decision immediately introduced the central challenge: how do we propagate state changes from the write-side’s source of truth to the Solr-backed read-side, ensuring reliability, low latency, and loose coupling in a distributed environment?

This led to our technology selection. For the backend, NestJS was chosen for its opinionated, modular structure which maps beautifully onto Domain-Driven Design (DDD) principles. Its first-class support for TypeScript and built-in modules for patterns like CQRS and event emitters made it a perfect fit for defining our Bounded Contexts. We decided to model the ProductCatalog as a distinct Bounded Context. Within this context, a CatalogWriteService would handle commands like CreateProduct or UpdateProductInventory. Upon successful execution, it would emit domain events like ProductCreated or ProductInventoryUpdated. A new, dedicated SearchIndexerService would listen for these events and be solely responsible for updating the Solr index. For service discovery in this new microservices landscape, HashiCorp’s Consul was selected for its robustness and simplicity. It would allow the SearchIndexerService to dynamically find healthy Solr nodes and allow our front-end’s API gateway to discover the QueryService. The front-end itself, a mature Angular application, would remain largely unchanged, simply pointing its data-fetching services to a new, faster, and more reliable search API endpoint.

Phase 1: Defining the Bounded Context and Write Model in NestJS

The foundation of the entire system is a well-defined domain model within the CatalogWriteService. A common mistake in such refactors is to simply replicate the old database schema as new service endpoints. We actively avoided this by using DDD principles to define a clear Product Aggregate.

The directory structure of our CatalogWriteService reflects this clear separation:

src/
└── product-catalog/
    ├── application/
    │   ├── commands/
    │   │   ├── impl/update-product-inventory.command.ts
    │   │   └── handlers/update-product-inventory.handler.ts
    │   ├── dtos/
    │   └── services/product.service.ts
    ├── domain/
    │   ├── model/product.aggregate.ts
    │   ├── events/impl/product-inventory-updated.event.ts
    │   └── repository/product.repository.ts
    ├── infrastructure/
    │   ├── persistence/
    │   │   └── typeorm/product.entity.ts
    │   └── product-catalog.module.ts
    └── presentation/
        └── product-catalog.controller.ts

The core logic resides in the command handler. It’s responsible for orchestrating the business logic: retrieving the aggregate, executing the domain operation, persisting the changes, and finally, publishing the domain event.

// src/product-catalog/application/commands/handlers/update-product-inventory.handler.ts

import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import { UpdateProductInventoryCommand } from '../impl/update-product-inventory.command';
import { ProductRepository } from '../../../../domain/repository/product.repository';
import { Product } from '../../../../domain/model/product.aggregate';
import { IProductRepository } from '../../../../domain/repository/product.repository.interface';

@CommandHandler(UpdateProductInventoryCommand)
export class UpdateProductInventoryHandler implements ICommandHandler<UpdateProductInventoryCommand> {
  constructor(
    @Inject('ProductRepository') // Using an interface-based injection
    private readonly productRepository: IProductRepository,
    private readonly publisher: EventPublisher,
  ) {}

  async execute(command: UpdateProductInventoryCommand): Promise<void> {
    const { productId, newStockLevel, correlationId } = command;

    // 1. Retrieve the aggregate from the repository.
    // The repository handles the mapping from persistence (TypeORM entity) to the domain model.
    const productAggregate = await this.productRepository.findById(productId);
    if (!productAggregate) {
      // In a real-world project, this would throw a domain-specific exception.
      throw new Error(`Product with ID ${productId} not found.`);
    }

    // 2. Encapsulate the domain logic within the aggregate itself.
    // The aggregate is responsible for maintaining its own invariants.
    productAggregate.updateInventory(newStockLevel);

    // 3. Bridge the domain model with the NestJS EventPublisher.
    // This allows the aggregate to publish events without being coupled to the eventing infrastructure.
    const product = this.publisher.mergeObjectContext(productAggregate);

    // 4. Persist the changes.
    // The repository handles saving the updated aggregate state.
    await this.productRepository.save(product);

    // 5. Commit the events. This is a crucial step.
    // The events are only dispatched AFTER the state has been successfully persisted.
    // This prevents a scenario where an event is published but the database transaction fails.
    product.commit();
  }
}

The Product aggregate contains the actual business logic and event creation.

// src/product-catalog/domain/model/product.aggregate.ts

import { AggregateRoot } from '@nestjs/cqrs';
import { ProductInventoryUpdatedEvent } from '../events/impl/product-inventory-updated.event';

export class Product extends AggregateRoot {
  constructor(
    private readonly id: string,
    private name: string,
    private stockLevel: number,
  ) {
    super();
  }

  // Business logic method
  updateInventory(newStockLevel: number) {
    if (newStockLevel < 0) {
      throw new Error('Stock level cannot be negative.'); // Invariant check
    }

    const oldStockLevel = this.stockLevel;
    this.stockLevel = newStockLevel;

    // The key step: apply an event to the aggregate.
    // This event is not dispatched immediately but queued within the aggregate.
    this.apply(
      new ProductInventoryUpdatedEvent(
        this.id,
        this.stockLevel,
        oldStockLevel,
      ),
    );
  }

  // Other properties and methods...
}

A significant challenge we faced here was ensuring atomicity. What if the database save operation succeeds but the event bus is down and product.commit() fails? This would lead to data inconsistency. We solved this by implementing the Transactional Outbox pattern. Instead of publishing directly to a message broker, the product.commit() call would persist the event to an outbox table within the same database transaction as the aggregate’s state change. A separate, simple poller process then reads from this table and reliably forwards events to our message broker (Kafka in our case). This guarantees that an event is only published if and only if the business transaction was successful.

Phase 2: The Search Indexer Service and Consul Integration

The SearchIndexerService is another NestJS application, but its concerns are purely infrastructural. Its job is to consume domain events and translate them into Solr update commands. Its primary dependency is not a business domain, but rather the Solr cluster itself.

This is where Consul becomes critical. We cannot hardcode Solr hostnames. In a production environment, nodes can be added, removed, or fail. The indexer needs to be able to discover a healthy Solr node at runtime.

We created a custom ConsulModule in NestJS to encapsulate this logic.

// src/shared/consul/consul.module.ts

import { Module, HttpModule, Global } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ConsulService } from './consul.service';

@Global() // Make the service available across the application
@Module({
  imports: [
    HttpModule.registerAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        timeout: configService.get('HTTP_TIMEOUT', 5000),
      }),
      inject: [ConfigService],
    }),
    ConfigModule,
  ],
  providers: [ConsulService],
  exports: [ConsulService],
})
export class ConsulModule {}

The ConsulService is responsible for querying Consul’s Health API to find available instances of a given service.

// src/shared/consul/consul.service.ts

import { Injectable, HttpService, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { firstValueFrom } from 'rxjs';

interface ConsulServiceInstance {
  Service: {
    ID: string;
    Service: string;
    Address: string;
    Port: number;
  };
}

@Injectable()
export class ConsulService {
  private readonly logger = new Logger(ConsulService.name);
  private readonly consulUrl: string;

  constructor(
    private readonly httpService: HttpService,
    private readonly configService: ConfigService,
  ) {
    this.consulUrl = this.configService.get<string>('CONSUL_HTTP_ADDR');
    if (!this.consulUrl) {
      throw new Error('CONSUL_HTTP_ADDR environment variable is not set.');
    }
  }

  async findHealthyService(serviceName: string): Promise<{ host: string; port: number }> {
    const requestUrl = `${this.consulUrl}/v1/health/service/${serviceName}?passing=true`;
    
    try {
      const response = await firstValueFrom(
        this.httpService.get<ConsulServiceInstance[]>(requestUrl)
      );

      const services = response.data;
      if (!services || services.length === 0) {
        throw new Error(`No healthy instances found for service: ${serviceName}`);
      }
      
      // In a real-world scenario, you might want a load balancing strategy (e.g., random).
      // For simplicity, we'll just take the first healthy one.
      const instance = services[0].Service;
      this.logger.log(`Discovered healthy ${serviceName} at ${instance.Address}:${instance.Port}`);
      
      return { host: instance.Address, port: instance.Port };

    } catch (error) {
      this.logger.error(`Failed to discover service ${serviceName} from Consul at ${this.consulUrl}`, error.stack);
      // This error should be handled gracefully by the calling service, perhaps with a retry mechanism.
      throw new Error(`Consul service discovery failed for ${serviceName}`);
    }
  }
}

This service is then used by the SolrClientService, which is responsible for the actual communication with Solr. The pitfall here is caching. Calling Consul for every single indexing operation is inefficient. We implemented a simple in-memory cache with a short TTL (e.g., 60 seconds) on the resolved Solr address. This provides a good balance between performance and responsiveness to topology changes.

Phase 3: The Data Transformation and Indexing Logic

With service discovery handled, the core logic of the indexer is to process an event and update Solr. This is not a simple one-to-one mapping. A domain model is optimized for transactional consistency, while a search document is optimized for querying and relevance.

The event handler in the indexer orchestrates this.

// src/indexer/event-handlers/product-inventory-updated.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Logger } from '@nestjs/common';
import { ProductInventoryUpdatedEvent } from './events/product-inventory-updated.event'; // DTO for event
import { SolrClientService } from '../../solr/solr-client.service';

// This is a simplified event DTO. In a real system, it would be consumed from a message broker.
interface ProductInventoryUpdatedEventPayload {
  productId: string;
  newStockLevel: number;
  // Other fields might be needed for context...
}

@EventsHandler(ProductInventoryUpdatedEvent)
export class ProductInventoryUpdatedHandler implements IEventHandler<ProductInventoryUpdatedEventPayload> {
  private readonly logger = new Logger(ProductInventoryUpdatedHandler.name);

  constructor(private readonly solrClient: SolrClientService) {}

  async handle(event: ProductInventoryUpdatedEventPayload) {
    this.logger.log(`Handling inventory update for product: ${event.productId}`);

    // The key is the "atomic update" feature in Solr.
    // We don't need to re-index the entire document for a simple stock change.
    // This is far more efficient.
    const partialDocument = {
      id: event.productId,
      stock_level_i: { set: event.newStockLevel }, // Solr's atomic update syntax
      // We also update a timestamp to track freshness
      last_indexed_dt: { set: new Date().toISOString() },
    };

    try {
      // The collection name would come from configuration
      await this.solrClient.updateDocument('products', partialDocument);
      this.logger.log(`Successfully indexed inventory update for product: ${event.productId}`);
    } catch (error) {
      this.logger.error(`Failed to index inventory update for ${event.productId}`, error.stack);
      // CRITICAL: This is where a robust dead-letter queue (DLQ) and alerting mechanism is essential.
      // A failed index operation must be retried or investigated manually.
      // Simply swallowing the error is a recipe for data drift between the write and read models.
    }
  }
}

The SolrClientService uses the ConsulService to find Solr and then sends the actual HTTP request.

// src/solr/solr-client.service.ts
import { Injectable, HttpService, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ConsulService } from '../shared/consul/consul.service';

@Injectable()
export class SolrClientService implements OnModuleInit {
    private solrBaseUrl: string;

    constructor(
        private readonly httpService: HttpService,
        private readonly consulService: ConsulService,
        private readonly config: ConfigService,
    ) {}

    // We resolve the Solr address on module initialization to fail fast if Solr is not discoverable.
    async onModuleInit() {
        await this.resolveSolrAddress();
    }

    private async resolveSolrAddress() {
        try {
            const solrService = await this.consulService.findHealthyService('solr');
            const core = this.config.get<string>('SOLR_CORE', 'products');
            this.solrBaseUrl = `http://${solrService.host}:${solrService.port}/solr/${core}`;
        } catch (error) {
            // This is a startup failure condition. The service should not start if it cannot find Solr.
            throw new Error('Could not resolve Solr address from Consul on startup.');
        }
    }

    async updateDocument(collection: string, doc: Record<string, any>): Promise<void> {
        // In a real project, we would batch updates using a buffer to improve performance.
        const updateUrl = `${this.solrBaseUrl}/update?commit=true`;
        
        try {
            await this.httpService.post(updateUrl, [doc]).toPromise();
        } catch (error) {
            // If a request fails (e.g., Solr node went down), we should try to re-resolve the address.
            if (error.response?.status >= 500) {
                await this.resolveSolrAddress();
            }
            throw error; // Re-throw for the handler to manage retry/DLQ logic.
        }
    }
}

The data transformation logic becomes more complex for ProductCreatedEvent, where we must construct a full Solr document, denormalizing data from other domains (e.g., category names, brand information) to create a rich, searchable document. This denormalization is a key aspect of a CQRS read model.

Phase 4: The Angular Front-end and the Query Service

With the indexing pipeline in place, the final piece was the query path. We built a lean CatalogQueryService (another NestJS app) whose only job was to expose a clean, RESTful API for searching products. It used the same ConsulService to discover Solr.

The Angular side was then simplified. The ProductSearchService no longer needed to construct complex queries.

// a-simplified-angular-service.ts
import { Injectable } from '@angular/core';
import { HttpClient, HttpParams } from '@angular/common/http';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

export interface ProductSearchResult {
  id: string;
  name: string;
  price: number;
  // ... other fields exposed by the QueryService
}

@Injectable({ providedIn: 'root' })
export class ProductSearchService {
  // The API_GATEWAY_URL would be configured via environment variables.
  private searchApiUrl = 'API_GATEWAY_URL/v1/products/search';

  constructor(private http: HttpClient) {}

  search(term: string, filters: Record<string, string>): Observable<ProductSearchResult[]> {
    let params = new HttpParams().set('q', term);
    for (const key in filters) {
      if (filters.hasOwnProperty(key)) {
        params = params.append(key, filters[key]);
      }
    }

    // The front-end is completely decoupled from the complexities of Solr, CQRS, and DDD.
    // It consumes a simple, stable API contract.
    return this.http.get<{ results: ProductSearchResult[] }>(this.searchApiUrl, { params }).pipe(
      map(response => response.results || [])
    );
  }
}

This architecture gives us a clear data flow.

sequenceDiagram
    participant User
    participant AngularApp
    participant APIGateway
    participant WriteService as CatalogWriteService (NestJS)
    participant DB as PostgreSQL
    participant EventBus as Kafka
    participant Indexer as SearchIndexerService (NestJS)
    participant Consul
    participant Solr

    User->>AngularApp: Update product inventory
    AngularApp->>APIGateway: POST /products/{id}/inventory
    APIGateway->>WriteService: handleUpdateInventory(command)
    WriteService->>DB: Begin TX
    WriteService->>DB: Find Product by ID
    WriteService->>WriteService: product.updateInventory()
    WriteService->>DB: Save Product State & Event to Outbox
    WriteService->>DB: Commit TX
    
    Note over EventBus, Indexer: Asynchronous Flow
    EventBus->>Indexer: Consume ProductInventoryUpdatedEvent
    Indexer->>Consul: Find healthy 'solr' service
    Consul-->>Indexer: Return solr-node-1:8983
    Indexer->>Solr: POST /update (Atomic Update)
    Solr-->>Indexer: 200 OK

This entire effort resulted in a system that is more resilient, scalable, and maintainable. Search queries are now isolated and consistently fast, and the write-side can evolve its domain model without breaking the search experience.

The current implementation, while robust, relies on polling the outbox table, which introduces a small but measurable latency. A future iteration will explore using a Change Data Capture (CDC) tool like Debezium to stream database changes directly to Kafka, further reducing the time from data change to index update. Furthermore, our error handling, which currently relies on a DLQ, could be enhanced with an automated reconciliation service that periodically compares the source of truth with the Solr index to correct any inconsistencies that might arise from prolonged downstream failures. The service discovery is also basic; leveraging Consul’s KV store for dynamic configuration or feature flags within the services presents a clear path for further operational maturity.


  TOC