The project’s primary data store, a monolithic PostgreSQL instance, was buckling under concurrent write and complex read loads. The core of the problem was a classic one: the data shape optimal for transactional writes (normalized, consistent) was fundamentally at odds with the shape needed for fast, complex search queries (denormalized, aggregated). Every new feature requiring a slightly different search permutation added another JOIN
or a LIKE
query, slowly strangling the database. Writes were blocking reads, reads were consuming CPU, and the operational cost of scaling this single point of failure was becoming untenable.
The initial concept was to separate these concerns entirely, a direct application of the Command Query Responsibility Segregation (CQRS) pattern. This wasn’t a decision made lightly; it introduces complexity, particularly around data consistency. But the performance ceiling of the current architecture was too low.
Our technology selection process was guided by a mandate for a managed, serverless-first approach to minimize operational overhead.
- Write Model (Commands): For the command side, we needed a database optimized for high-throughput, low-latency key-value writes. Amazon DynamoDB was the obvious choice. Its predictable performance at scale and simple API for
PutItem
,UpdateItem
, andDeleteItem
operations fit the “Command” model perfectly. - Query Model (Queries): For the query side, we needed full-text search, complex filtering, and aggregations. Amazon OpenSearch Service (the successor to Elasticsearch Service) provides this natively. It’s built for the kind of complex reads that were crippling our relational database.
- Synchronization Logic: The critical piece is synchronizing data from the write model (DynamoDB) to the read model (OpenSearch). A polling mechanism was immediately discarded as inefficient and slow. The ideal solution was event-driven. DynamoDB Streams provides a time-ordered sequence of item-level changes, which can trigger an AWS Lambda function. Python was selected for the Lambda due to its mature ecosystem, excellent Boto3 AWS SDK, and the
opensearch-py
client library. This creates a robust, scalable, and cost-effective pipeline. - Frontend State Management: This distributed backend architecture presents a challenge to the frontend: eventual consistency. A user might create an item and not see it in search results for a few seconds. We needed a state management library that could gracefully handle this asynchronous reality. Jotai, with its atomic, bottom-up approach, was chosen. Its minimalist API allows for managing discrete pieces of state—like the status of a submitted command versus the state of the query results—without triggering large, unnecessary re-renders that are common with more monolithic state stores.
- Testing: Given the nuanced user experience requirements, testing the frontend’s handling of this asynchronous flow was critical. Jest, combined with the React Testing Library, provides the necessary tools to mock API calls and assert that the application state transitions correctly through its “pending,” “success,” and “eventually consistent” phases.
Architectural Blueprint and Infrastructure as Code
Before writing a single line of Python, the complete infrastructure must be defined. A common mistake is to configure services through the AWS console, leading to an environment that is impossible to replicate or version control. We use the Serverless Framework for this.
The following serverless.yml
defines all the necessary components: the DynamoDB table with its stream, the OpenSearch domain, the Python Lambda function, and the critical IAM roles that grant least-privilege access between them.
# serverless.yml
service: cqrs-data-pipeline
provider:
name: aws
runtime: python3.9
region: us-east-1
stage: dev
# Default environment variables for all functions
environment:
OPENSEARCH_HOST: ${cf:cqrs-data-pipeline-dev.OpenSearchDomainEndpoint}
LOG_LEVEL: INFO
# IAM role for the Lambda function
iam:
role:
statements:
- Effect: "Allow"
Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
Resource:
- Fn::GetAtt: [ProductTable, StreamArn]
- Effect: "Allow"
Action:
- es:ESHttpPost
- es:ESHttpPut
- es:ESHttpDelete
Resource:
- Fn::Join:
- ""
- - "arn:aws:es:"
- Ref: "AWS::Region"
- ":"
- Ref: "AWS::AccountId"
- ":domain/"
- ${self:custom.openSearchDomainName}
- "/*"
# Permissions for VPC networking if OpenSearch is in a VPC
- Effect: "Allow"
Action:
- ec2:CreateNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:DeleteNetworkInterface
Resource: "*"
functions:
dynamoStreamProcessor:
handler: handler.process_stream
memorySize: 256
timeout: 60
events:
- stream:
type: dynamodb
arn:
Fn::GetAtt: [ProductTable, StreamArn]
batchSize: 100
startingPosition: LATEST
maximumRetryAttempts: 3
# Dead Letter Queue for failed batches
onError: arn:aws:sns:${aws:region}:${aws:accountId}:cqrs-pipeline-dlq-topic
custom:
openSearchDomainName: cqrs-product-search-${self:provider.stage}
resources:
Resources:
# The primary DynamoDB table for our Write Model
ProductTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: product-catalog-${self:provider.stage}
AttributeDefinitions:
- AttributeName: productId
AttributeType: S
KeySchema:
- AttributeName: productId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
# The OpenSearch domain for our Query Model
OpenSearchDomain:
Type: AWS::OpenSearchService::Domain
Properties:
DomainName: ${self:custom.openSearchDomainName}
EngineVersion: "OpenSearch_2.5"
ClusterConfig:
InstanceType: "t3.small.search"
InstanceCount: 1
DedicatedMasterEnabled: false
EBSOptions:
EBSEnabled: true
VolumeSize: 10
VolumeType: gp2
AccessPolicies:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
AWS: "*" # In production, lock this down to specific IAM roles
Action: "es:*"
Resource:
Fn::Join:
- ""
- - "arn:aws:es:"
- Ref: "AWS::Region"
- ":"
- Ref: "AWS::AccountId"
- ":domain/"
- ${self:custom.openSearchDomainName}
- "/*"
# SNS Topic to act as a Dead Letter Queue
DeadLetterQueueTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: "CQRS-Pipeline-DLQ-Topic"
TopicName: "cqrs-pipeline-dlq-topic"
# Outputs to easily retrieve resource information
Outputs:
OpenSearchDomainEndpoint:
Description: "Endpoint for the OpenSearch domain"
Value:
Fn::GetAtt: [OpenSearchDomain, DomainEndpoint]
A few critical decisions are codified here. StreamViewType
is NEW_AND_OLD_IMAGES
, which is vital for handling updates and deletes correctly. The Lambda function has a batchSize
of 100 and a maximumRetryAttempts
of 3. If a batch fails all retries, it’s sent to the onError
SNS topic, our Dead Letter Queue (DLQ), preventing a single poison pill message from blocking the entire stream. The IAM permissions are scoped as tightly as possible to the specific resources.
The Python Stream Processor
This Lambda function is the heart of the CQRS pipeline. Its only job is to receive a batch of records from the DynamoDB stream, transform them, and send them to OpenSearch in a resilient manner.
# handler.py
import os
import json
import logging
from typing import Dict, Any, List
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
# --- Configuration ---
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
OPENSEARCH_HOST = os.environ.get("OPENSEARCH_HOST")
OPENSEARCH_INDEX_NAME = "products"
# --- Logging Setup ---
# A common mistake is to rely on simple print statements.
# Proper logging is essential for debugging in a serverless environment.
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
# --- AWS Service Clients ---
# Initialize clients outside the handler for connection reuse
session = boto3.Session()
credentials = session.get_credentials()
aws_auth = AWSV4SignerAuth(credentials, session.region_name, "es")
opensearch_client = OpenSearch(
hosts=[{"host": OPENSEARCH_HOST, "port": 443}],
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20, # Recommended for Lambda
)
# --- Utility Functions ---
def unmarshal_dynamodb_item(item: Dict[str, Any]) -> Dict[str, Any]:
"""
Converts a DynamoDB JSON item into a regular Python dictionary.
This is a surprisingly common point of failure if not handled correctly.
"""
deserializer = boto3.dynamodb.types.TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in item.items()}
def build_bulk_actions(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Prepares a list of actions for the OpenSearch bulk API.
Handles INSERT, MODIFY, and REMOVE events from the DynamoDB stream.
"""
actions = []
for record in records:
event_name = record.get("eventName")
# We only care about the new state for INSERT/MODIFY
if event_name in ("INSERT", "MODIFY"):
try:
# The document ID in OpenSearch will be the DynamoDB primary key
doc_id = record["dynamodb"]["Keys"]["productId"]["S"]
document = unmarshal_dynamodb_item(record["dynamodb"]["NewImage"])
actions.append({"index": {"_index": OPENSEARCH_INDEX_NAME, "_id": doc_id}})
actions.append(document)
except KeyError as e:
logger.error(f"Malformed INSERT/MODIFY record, missing key: {e}. Record: {record}")
continue # Skip malformed record
elif event_name == "REMOVE":
try:
doc_id = record["dynamodb"]["Keys"]["productId"]["S"]
actions.append({"delete": {"_index": OPENSEARCH_INDEX_NAME, "_id": doc_id}})
except KeyError as e:
logger.error(f"Malformed REMOVE record, missing key: {e}. Record: {record}")
continue
return actions
# --- Main Handler ---
def process_stream(event: Dict[str, Any], context: object) -> Dict[str, Any]:
"""
AWS Lambda handler function triggered by DynamoDB Streams.
"""
logger.info(f"Received {len(event.get('Records', []))} records from DynamoDB stream.")
records = event.get("Records", [])
if not records:
return {"statusCode": 200, "body": "No records to process."}
bulk_actions = build_bulk_actions(records)
if not bulk_actions:
logger.warning("No valid actions to perform after processing records.")
return {"statusCode": 200, "body": "No valid actions generated."}
try:
# The bulk helper is more resilient than a raw bulk call
response = opensearch_client.bulk(body=bulk_actions)
if response["errors"]:
logger.error(f"Bulk indexing had errors: {json.dumps(response)}")
# This is where a more sophisticated retry or partial failure handling
# mechanism would be implemented. For now, we raise an exception
# to trigger the Lambda's retry policy.
raise Exception("Bulk indexing to OpenSearch failed for one or more documents.")
logger.info(f"Successfully indexed {len(response['items'])} documents.")
return {"statusCode": 200, "body": "Successfully processed batch."}
except Exception as e:
logger.critical(f"Failed to process batch. Error: {e}", exc_info=True)
# Re-raising the exception is crucial. This tells the Lambda service
# that the invocation failed, triggering the configured retry behavior.
# After all retries, the batch will be sent to the DLQ.
raise e
The code structure is intentional. Clients are initialized globally to be reused across invocations. The unmarshal_dynamodb_item
function handles the verbose DynamoDB JSON format. The build_bulk_actions
function is the core transformation logic, correctly interpreting INSERT
, MODIFY
, and REMOVE
events. A common pitfall is only handling INSERT
s, leaving the search index stale when data is updated or deleted.
The handler’s error management is paramount. Any exception raised will cause Lambda to re-process the entire batch. If this fails multiple times, the DLQ configured in serverless.yml
catches the payload, allowing for offline analysis without losing data.
Frontend State Management with Jotai
On the client side, the challenge is presenting a smooth user experience over this eventually consistent backend. Optimistic updates are one approach, but they can be complex to manage. A more pragmatic strategy is to provide clear feedback to the user about the state of their actions.
Jotai’s atomic model excels here. We can define discrete atoms for search state and submission state.
// src/state/productAtoms.js
import { atom } from 'jotai';
// Atom to hold the search query string
export const searchQueryAtom = atom('');
// Atom to hold the search results from OpenSearch
export const searchResultsAtom = atom([]);
// Atom to track the loading state of a search query
export const searchIsLoadingAtom = atom(false);
// This is a derived atom. When the searchQueryAtom changes,
// this async atom will re-fetch data from our search API.
export const fetchSearchResultsAtom = atom(
(get) => get(searchResultsAtom), // read-only part just returns current results
async (get, set) => {
const query = get(searchQueryAtom);
if (!query) {
set(searchResultsAtom, []);
return;
}
set(searchIsLoadingAtom, true);
try {
// API call to the backend endpoint that queries OpenSearch
const response = await fetch(`/api/search?q=${encodeURIComponent(query)}`);
if (!response.ok) {
throw new Error('Search API failed');
}
const data = await response.json();
set(searchResultsAtom, data.results);
} catch (error) {
console.error("Failed to fetch search results:", error);
set(searchResultsAtom, []); // Clear results on error
} finally {
set(searchIsLoadingAtom, false);
}
}
);
// --- State for the Command (Write) side ---
// Atom to track the submission state of a new product
// Possible values: 'idle', 'submitting', 'success', 'error'
export const productSubmissionStatusAtom = atom('idle');
// An action-oriented atom to handle the submission logic
export const submitNewProductAtom = atom(
null, // This is a write-only atom
async (get, set, productData) => {
set(productSubmissionStatusAtom, 'submitting');
try {
// API call to the backend endpoint that writes to DynamoDB
const response = await fetch('/api/products', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(productData),
});
if (!response.ok) {
throw new Error('Failed to create product');
}
set(productSubmissionStatusAtom, 'success');
// In a real-world project, you might trigger a re-fetch of the search
// after a short delay to reflect the new item.
setTimeout(() => {
set(productSubmissionStatusAtom, 'idle'); // Reset status
}, 3000);
} catch (error) {
console.error("Product submission failed:", error);
set(productSubmissionStatusAtom, 'error');
}
}
);
A React component can then use these atoms to create a responsive UI. It doesn’t need to know the complexities of the CQRS backend; it only needs to react to these simple state atoms.
// src/components/ProductSearch.jsx
import React from 'react';
import { useAtom } from 'jotai';
import {
searchQueryAtom,
searchResultsAtom,
searchIsLoadingAtom,
fetchSearchResultsAtom,
productSubmissionStatusAtom,
submitNewProductAtom,
} from '../state/productAtoms';
function ProductSearch() {
const [query, setQuery] = useAtom(searchQueryAtom);
const [results] = useAtom(searchResultsAtom);
const [isLoading] = useAtom(searchIsLoadingAtom);
const [, fetchResults] = useAtom(fetchSearchResultsAtom);
const [submissionStatus] = useAtom(productSubmissionStatusAtom);
const [, submitProduct] = useAtom(submitNewProductAtom);
const handleSearch = (e) => {
e.preventDefault();
fetchResults();
};
const handleAddProduct = () => {
const newProduct = {
name: `New Gadget ${Date.now()}`,
description: 'A fantastic new product.',
price: 99.99
};
submitProduct(newProduct);
};
return (
<div>
<form onSubmit={handleSearch}>
<input type="text" value={query} onChange={(e) => setQuery(e.target.value)} />
<button type="submit" disabled={isLoading}>Search</button>
</form>
<button onClick={handleAddProduct} disabled={submissionStatus === 'submitting'}>
{submissionStatus === 'submitting' ? 'Adding...' : 'Add New Product'}
</button>
{submissionStatus === 'success' && <p>Product added! It will appear in search results shortly.</p>}
{submissionStatus === 'error' && <p>Failed to add product.</p>}
{isLoading && <p>Loading results...</p>}
<ul>
{results.map((product) => (
<li key={product.productId}>{product.name} - ${product.price}</li>
))}
</ul>
</div>
);
}
This component provides clear user feedback. The “Add New Product” button enters a disabled “Adding…” state. Upon success, it shows a message explicitly telling the user about the eventual consistency delay. This manages expectations and prevents user frustration.
Testing the Frontend Logic with Jest
The asynchronous nature and multiple states of the UI make it a prime candidate for thorough testing. Jest and React Testing Library allow us to simulate user interactions and verify that our Jotai state logic behaves as expected.
// src/components/ProductSearch.test.jsx
import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import { Provider } from 'jotai';
import ProductSearch from './ProductSearch';
// Mock the global fetch API
global.fetch = jest.fn();
beforeEach(() => {
fetch.mockClear();
});
test('handles successful product submission flow', async () => {
// Mock a successful POST request
fetch.mockResolvedValueOnce({ ok: true, json: () => Promise.resolve({}) });
render(
<Provider>
<ProductSearch />
</Provider>
);
const addButton = screen.getByRole('button', { name: /Add New Product/i });
expect(addButton).not.toBeDisabled();
fireEvent.click(addButton);
// The button should immediately enter a disabled, submitting state
expect(screen.getByRole('button', { name: /Adding.../i })).toBeDisabled();
// Wait for the async submission logic to complete
await waitFor(() => {
// A success message appears
expect(screen.getByText(/Product added!/i)).toBeInTheDocument();
});
// The button text should revert, but it might still be part of the UI
expect(screen.getByRole('button', { name: /Add New Product/i })).toBeInTheDocument();
});
test('handles failed product submission', async () => {
// Mock a failed POST request
fetch.mockRejectedValueOnce(new Error('API is down'));
render(
<Provider>
<ProductSearch />
</Provider>
);
const addButton = screen.getByRole('button', { name: /Add New Product/i });
fireEvent.click(addButton);
await waitFor(() => {
expect(screen.getByText(/Failed to add product/i)).toBeInTheDocument();
});
// The button should become enabled again after the failure
expect(screen.getByRole('button', { name: /Add New Product/i })).not.toBeDisabled();
});
These tests don’t touch the backend. They validate that our frontend state machine, powered by Jotai, correctly transitions between states based on mocked API responses. This ensures that the user interface remains predictable and reliable, even when the underlying data flow is complex and asynchronous.
This architecture successfully decouples the write and read workloads, allowing each to scale independently. The final system is resilient, observable through logging and the DLQ, and provides a clear path for handling failures.
The most significant trade-off is the introduced latency. The time for a write in DynamoDB to be reflected in OpenSearch is typically in the low single-digit seconds but is not guaranteed. This solution is therefore unsuitable for use cases requiring strict read-your-writes consistency. Furthermore, the operational burden shifts from database management to pipeline monitoring. An automated process for replaying messages from the DLQ is a necessary next step for a production system, as is implementing more granular monitoring on the Lambda’s execution time and error rate to detect upstream issues with OpenSearch. Finally, the schema is now managed in two places; a change to the data model requires updates to both the DynamoDB write logic and the OpenSearch mapping, introducing a need for careful migration planning.