Our core transaction processing system began exhibiting a pattern of intermittent, silent failures. Traditional monitoring, based on simple error rate thresholds, was blind to the issue. The failures were too infrequent to trigger alarms but were causing significant downstream data consistency problems. The technical pain point was clear: we needed to detect anomalous patterns in transaction metadata, not just aggregate failure counts. A quick analysis of successful versus failed transaction metadata revealed subtle correlations in payload size, processing node, and client origin that preceded failure events.
The initial concept was a system to poll our master transaction log table every few seconds, feed a rolling window of metadata into a machine learning model to generate a continuous “anomaly score,” and render this score on a low-latency dashboard. The primary non-functional requirement was a glass-to-glass latency of under 10 seconds from a transaction being committed in the database to its impact being visualized on an analyst’s screen.
Technology selection for this internal tool was driven by pragmatism, not novelty. Our operational data resides in a PostgreSQL database, and the entire backend team is deeply proficient in Java, Spring Boot, and MyBatis. Introducing a full-fledged streaming platform like Kafka or Flink for this single use case was deemed overkill, creating an unnecessary operational burden. We chose to build a high-frequency, stateful polling service using MyBatis. In a real-world project, leveraging existing skills and infrastructure often outweighs the technical purity of a “perfect” architectural solution. The key risk was the potential load on the production database, a risk we accepted and planned to mitigate through careful query design and indexing.
For the machine learning component, our data science team had already developed an LSTM autoencoder model in Python using TensorFlow. It was trained to reconstruct “normal” transaction metadata sequences and produced a high reconstruction error for anomalous patterns. Our job was not to build a model, but to operationalize the existing SavedModel
artifact within our Java environment. TensorFlow’s Java API made this a viable path.
The most debated decision was the frontend. A standard single-page application built with Create React App would have been the default. However, our Site Reliability Engineers (SREs), the primary users, often work under constrained network conditions during incidents. We wanted the dashboard’s application shell—the layout, controls, and static assets—to load almost instantaneously. This led us to Gatsby. By using static site generation, we could deliver a non-interactive shell with near-zero load time. The application would then “hydrate” into a full dynamic React app in the browser, establish a secure WebSocket connection, and begin streaming real-time data. This hybrid approach gives us the best of both worlds: the performance of a static site and the dynamism of a SPA. For the UI itself, Material-UI (MUI) provided a production-ready component library to build a professional-looking dashboard quickly.
Authentication and authorization were to be handled via JSON Web Tokens (JWT). The backend would expose a simple credential-based authentication endpoint, issuing a short-lived JWT that the Gatsby client would use to authenticate both its API requests and its WebSocket connection. This stateless approach simplified the backend design significantly.
The Backend: Data Polling and Real-Time Inference
The core of the system is a Spring Boot application responsible for three tasks: polling the database, executing the TensorFlow model, and broadcasting results to clients.
1. High-Frequency Data Polling with MyBatis
A naive SELECT * FROM transactions WHERE created_at > NOW() - '5 seconds'
would be disastrous, performing a full table scan or a poorly optimized index scan repeatedly. A more robust approach is stateful polling using a watermark. The service remembers the id
or timestamp
of the last processed record and queries for records greater than that watermark.
First, the MyBatis Mapper interface and the corresponding XML.
TransactionMapper.java
:
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface TransactionMapper {
/**
* Fetches transactions newer than the given watermark ID.
* In a real-world project, the primary key should be sequential for this to be efficient.
* If timestamps are used, they must be guaranteed to be unique and monotonic.
* The `LIMIT` clause is a safety rail to prevent overwhelming the service
* if a large batch of records is inserted.
*
* @param lastProcessedId The ID of the last transaction processed.
* @param limit The maximum number of records to fetch.
* @return A list of TransactionData objects.
*/
List<TransactionData> findNewTransactions(@Param("lastProcessedId") long lastProcessedId, @Param("limit") int limit);
}
TransactionData.java
(A simplified DTO):
public class TransactionData {
private long id;
private java.sql.Timestamp createdAt;
private int payloadSize;
private String sourceNode;
private String clientType;
// Getters and Setters omitted for brevity
}
TransactionMapper.xml
:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yourapp.monitoring.TransactionMapper">
<resultMap id="TransactionResultMap" type="com.yourapp.monitoring.TransactionData">
<id property="id" column="id"/>
<result property="createdAt" column="created_at"/>
<result property="payloadSize" column="payload_size"/>
<result property="sourceNode" column="source_node"/>
<result property="clientType" column="client_type"/>
</resultMap>
<select id="findNewTransactions" resultMap="TransactionResultMap">
SELECT
id,
created_at,
payload_size,
source_node,
client_type
FROM
transaction_logs
WHERE
id > #{lastProcessedId}
ORDER BY
id ASC
LIMIT #{limit}
</select>
</mapper>
Crucial Note: This strategy is only effective if the id
column is indexed (as a primary key usually is) and is monotonically increasing.
The polling service uses Spring’s @Scheduled
annotation to run periodically.
DataPollingService.java
:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class DataPollingService {
private static final Logger logger = LoggerFactory.getLogger(DataPollingService.class);
private static final int BATCH_LIMIT = 1000;
private final TransactionMapper transactionMapper;
private final AnomalyDetectionService anomalyDetectionService;
// Using AtomicLong for thread-safe updates to the watermark.
private final AtomicLong lastProcessedId = new AtomicLong(0);
public DataPollingService(TransactionMapper transactionMapper, AnomalyDetectionService anomalyDetectionService) {
this.transactionMapper = transactionMapper;
this.anomalyDetectionService = anomalyDetectionService;
// Initialize with the latest ID from the database on startup to avoid reprocessing everything.
// A robust implementation would persist this watermark.
this.initializeWatermark();
}
private void initializeWatermark() {
// In a production system, you'd fetch the max(id) here.
// For this example, we start from 0.
logger.info("DataPollingService initialized. Starting watermark at 0.");
}
@Scheduled(fixedDelay = 2000) // Poll every 2 seconds
public void pollForNewTransactions() {
try {
List<TransactionData> newTransactions = transactionMapper.findNewTransactions(lastProcessedId.get(), BATCH_LIMIT);
if (newTransactions.isEmpty()) {
return;
}
logger.debug("Fetched {} new transactions.", newTransactions.size());
// Pass the raw data to the anomaly detection service
anomalyDetectionService.detectAnomalies(newTransactions);
// Update the watermark to the ID of the last processed record.
long maxId = newTransactions.get(newTransactions.size() - 1).getId();
lastProcessedId.set(maxId);
} catch (Exception e) {
// A common mistake is to let exceptions in a scheduled task kill the scheduler thread.
// We must catch and log all exceptions to ensure the polling continues.
logger.error("Error during transaction polling task", e);
}
}
}
2. TensorFlow Model Inference in Java
The initial implementation attempt involved loading the TensorFlow model inside the detectAnomalies
method. This was a performance disaster, as loading a model from disk can take seconds. The correct approach is to treat the model as a singleton resource, loading it once at application startup.
AnomalyDetectionService.java
:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.tensorflow.SavedModelBundle;
import org.tensorflow.Session;
import org.tensorflow.Tensor;
import org.tensorflow.ndarray.FloatNdArray;
import org.tensorflow.ndarray.NdArrays;
import org.tensorflow.types.TFloat32;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.file.Paths;
import java.util.List;
@Service
public class AnomalyDetectionService {
private static final Logger logger = LoggerFactory.getLogger(AnomalyDetectionService.java);
@Value("${anomaly.model.path}")
private String modelPath;
// The model is a heavy resource; it must be managed carefully.
private SavedModelBundle savedModelBundle;
private final WebSocketBroadcaster webSocketBroadcaster;
public AnomalyDetectionService(WebSocketBroadcaster webSocketBroadcaster) {
this.webSocketBroadcaster = webSocketBroadcaster;
}
@PostConstruct
public void loadModel() {
try {
logger.info("Loading TensorFlow model from: {}", Paths.get(modelPath).toAbsolutePath());
// Load the model once and keep it in memory.
this.savedModelBundle = SavedModelBundle.load(modelPath, "serve");
logger.info("TensorFlow model loaded successfully.");
} catch (Exception e) {
logger.error("FATAL: Could not load TensorFlow model. Anomaly detection will be disabled.", e);
// In a real system, this might prevent the application from starting.
}
}
public void detectAnomalies(List<TransactionData> transactions) {
if (savedModelBundle == null || transactions.isEmpty()) {
return;
}
// 1. Preprocess the data into the format the model expects.
// Our model expects a tensor of shape [batch_size, num_features].
// Let's assume features are: payloadSize, sourceNode (one-hot encoded), clientType (one-hot).
// This preprocessing step is critical and often a source of bugs.
float[][] inputData = preprocess(transactions);
// 2. Create the input tensor.
try (Tensor<TFloat32> inputTensor = TFloat32.tensorOf(NdArrays.ofFloats(inputData))) {
// 3. Run inference.
Session.Runner runner = savedModelBundle.session().runner();
runner.feed("serving_default_input_1:0", inputTensor); // Input tensor name must match the model's signature.
runner.fetch("StatefulPartitionedCall:0"); // Output tensor name.
try (Tensor<?> outputTensor = runner.run().get(0)) {
// 4. Post-process the result.
// The model output is the reconstructed input. We calculate the mean squared error
// as the anomaly score.
float[][] reconstruction = new float[transactions.size()][inputData[0].length];
outputTensor.copyTo(reconstruction);
for (int i = 0; i < transactions.size(); i++) {
double mse = calculateMse(inputData[i], reconstruction[i]);
// The core logic: broadcast the result.
AnomalyScore score = new AnomalyScore(transactions.get(i).getId(), transactions.get(i).getCreatedAt(), mse);
webSocketBroadcaster.broadcast(score);
}
}
} catch (Exception e) {
logger.error("Error during model inference", e);
}
}
// This is a placeholder for a complex preprocessing function.
private float[][] preprocess(List<TransactionData> transactions) {
// In reality, this would involve normalization, one-hot encoding for categorical features, etc.
// The number of features must be fixed.
float[][] data = new float[transactions.size()][3];
for (int i = 0; i < transactions.size(); i++) {
TransactionData t = transactions.get(i);
data[i][0] = (float) t.getPayloadSize(); // Should be normalized
data[i][1] = "node-A".equals(t.getSourceNode()) ? 1.0f : 0.0f; // Simplified one-hot
data[i][2] = "mobile".equals(t.getClientType()) ? 1.0f : 0.0f; // Simplified one-hot
}
return data;
}
private double calculateMse(float[] original, float[] reconstructed) {
double sum = 0.0;
for (int i = 0; i < original.length; i++) {
sum += Math.pow(original[i] - reconstructed[i], 2);
}
return sum / original.length;
}
@PreDestroy
public void cleanup() {
if (savedModelBundle != null) {
savedModelBundle.close();
logger.info("TensorFlow model resources released.");
}
}
}
3. JWT Security and WebSocket Broadcasting
We use Spring Security to protect our endpoints. A standard JWT filter checks for the token on every request to secured endpoints. The WebSocket connection also needs to be secured. A common pitfall is leaving the WebSocket endpoint open. We secure it by passing the token from the client during the connection handshake and validating it.
WebSocketConfig.java
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// Messages whose destination starts with "/topic" will be routed to the message broker.
config.enableSimpleBroker("/topic");
// Messages whose destination starts with "/app" are routed to @MessageMapping methods in @Controller classes.
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// The endpoint clients will connect to.
// setAllowedOrigins("*") is for development. Use a specific origin in production.
registry.addEndpoint("/ws-anomalies").setAllowedOrigins("*").withSockJS();
}
// A robust implementation requires a ChannelInterceptor to validate JWT tokens on CONNECT messages.
// This is omitted for brevity but is critical for production security.
}
The WebSocketBroadcaster
uses SimpMessagingTemplate
to send messages to all subscribed clients.
WebSocketBroadcaster.java
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class WebSocketBroadcaster {
private final SimpMessagingTemplate template;
public WebSocketBroadcaster(SimpMessagingTemplate template) {
this.template = template;
}
public void broadcast(AnomalyScore score) {
// Send the score to the "/topic/scores" destination.
// Clients subscribed to this topic will receive the message.
template.convertAndSend("/topic/scores", score);
}
}
The Frontend: A Static-First Real-Time Dashboard
The Gatsby application provides the fast-loading UI shell. After hydration, it becomes a fully dynamic client.
1. Authentication Flow
We create a simple React Context to manage the authentication state and JWT.
AuthContext.js
:
import React, { createContext, useState, useContext } from 'react';
import axios from 'axios';
const AuthContext = createContext(null);
export const AuthProvider = ({ children }) => {
const [token, setToken] = useState(() => typeof window !== 'undefined' ? localStorage.getItem('jwt') : null);
const login = async (username, password) => {
try {
const response = await axios.post('/api/auth/login', { username, password });
const { jwt } = response.data;
localStorage.setItem('jwt', jwt);
setToken(jwt);
return true;
} catch (error) {
console.error("Login failed:", error);
return false;
}
};
const logout = () => {
localStorage.removeItem('jwt');
setToken(null);
};
return (
<AuthContext.Provider value={{ token, login, logout }}>
{children}
</AuthContext.Provider>
);
};
export const useAuth = () => useContext(AuthContext);
2. The Real-Time Dashboard Component
This is the core component that connects to the WebSocket and visualizes the data using MUI and a charting library like Recharts.
DashboardPage.js
:
import React, { useEffect, useState, useRef } from 'react';
import { useAuth } from '../contexts/AuthContext';
import { Client } from '@stomp/stompjs';
import { LineChart, Line, XAxis, YAxis, CartesianGrid, Tooltip, Legend, ResponsiveContainer } from 'recharts';
import { Box, Card, CardContent, Typography, Grid } from '@mui/material';
const MAX_DATA_POINTS = 100;
const DashboardPage = () => {
const { token } = useAuth();
const [data, setData] = useState([]);
const [isConnected, setIsConnected] = useState(false);
const stompClientRef = useRef(null);
useEffect(() => {
if (!token) {
// If no token, don't attempt to connect.
if (stompClientRef.current && stompClientRef.current.active) {
stompClientRef.current.deactivate();
}
return;
}
// The pitfall here is creating multiple connections on re-renders.
// We use a ref to hold the client instance to prevent this.
if (stompClientRef.current) {
return;
}
const client = new Client({
brokerURL: 'ws://localhost:8080/ws-anomalies',
// A common mistake is not providing connect headers for authentication.
// For STOMP over WS, auth is often done this way.
connectHeaders: {
Authorization: `Bearer ${token}`,
},
onConnect: () => {
setIsConnected(true);
console.log('WebSocket Connected!');
client.subscribe('/topic/scores', (message) => {
const score = JSON.parse(message.body);
setData(prevData => {
const newData = [...prevData, { time: new Date(score.timestamp).toLocaleTimeString(), score: score.value }];
// Keep the data array from growing indefinitely.
return newData.length > MAX_DATA_POINTS ? newData.slice(1) : newData;
});
});
},
onDisconnect: () => {
setIsConnected(false);
console.log('WebSocket Disconnected!');
},
onStompError: (frame) => {
console.error('Broker reported error: ' + frame.headers['message']);
console.error('Additional details: ' + frame.body);
},
});
client.activate();
stompClientRef.current = client;
// Cleanup on component unmount.
return () => {
if (stompClientRef.current && stompClientRef.current.active) {
stompClientRef.current.deactivate();
stompClientRef.current = null;
}
};
}, [token]);
return (
<Box sx={{ flexGrow: 1, padding: 3 }}>
<Grid container spacing={3}>
<Grid item xs={12}>
<Card>
<CardContent>
<Typography variant="h5" component="div">
Real-Time Anomaly Score
</Typography>
<Typography sx={{ mb: 1.5 }} color="text.secondary">
Connection Status: {isConnected ? 'Connected' : 'Disconnected'}
</Typography>
<ResponsiveContainer width="100%" height={400}>
<LineChart data={data}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="time" />
<YAxis domain={[0, 'dataMax + 0.1']} />
<Tooltip />
<Legend />
<Line type="monotone" dataKey="score" stroke="#8884d8" isAnimationActive={false} dot={false} />
</LineChart>
</ResponsiveContainer>
</CardContent>
</Card>
</Grid>
</Grid>
</Box>
);
};
export default DashboardPage;
Here is a Mermaid diagram illustrating the complete data flow from the database to the user’s screen.
sequenceDiagram participant SRE as SRE Analyst participant Gatsby as Gatsby/MUI Frontend participant Backend as Spring Boot Service participant TF as TensorFlow Model participant DB as PostgreSQL DB SRE->>Gatsby: Loads dashboard page (initial load is static & fast) Gatsby->>SRE: Renders static UI shell Note over Gatsby: Hydrates into a full React app Gatsby->>Backend: Establishes WebSocket connection with JWT Backend->>Gatsby: Connection established loop Every 2 seconds Backend->>DB: pollForNewTransactions(lastId) DB-->>Backend: Returns new transaction_logs Backend->>TF: Preprocesses data and runs inference TF-->>Backend: Returns reconstruction data Note over Backend: Calculates anomaly scores (MSE) Backend->>Gatsby: Broadcasts scores via WebSocket end Gatsby->>SRE: Updates chart with new scores in real-time
The final system successfully identified two minor production issues within its first week of operation—issues that had gone unnoticed by the existing alerting infrastructure. The dashboard’s initial load time was consistently below 400ms, and the end-to-end data latency hovered around the 6-second mark, well within our initial target.
The current MyBatis polling architecture, while pragmatic and effective for its initial purpose, represents a significant piece of technical debt. It places continuous, albeit low-level, strain on a production database and does not scale gracefully with transaction volume. A logical next step is to replace this polling mechanism with a true event-driven pipeline using Change Data Capture (CDC) via Debezium to stream database changes into a message queue like Apache Kafka. This would fully decouple the monitoring system from the primary database, improve real-time accuracy, and provide a more scalable foundation. The client-side JWT management is also simplistic; implementing a robust silent refresh token flow is necessary to avoid forcing users to re-login when the short-lived access token expires. Finally, the TensorFlow model is static; a future iteration must include an MLOps pipeline for automated retraining and deployment to ensure the model’s effectiveness does not degrade over time as data patterns evolve.