The transition to passwordless authentication using WebAuthn eliminated entire classes of credential stuffing and phishing attacks, but introduced a new monitoring challenge. Our batch-oriented security information and event management (SIEM) system, which processed logs with a 2-hour delay, was inadequate for detecting sophisticated, low-and-slow attacks against our WebAuthn registration and authentication flows. An attacker could probe assertion options or attempt to register numerous malicious security keys over a period of minutes, and we wouldn’t know until significant time had passed. We required a detection and response time measured in seconds, not hours.
This necessitated a move to a real-time stream processing architecture. The initial concept was straightforward: ingest every WebAuthn event into a pipeline, maintain state for suspicious indicators (e.g., authentication failures per IP, registration velocity per user agent), and trigger alerts on predefined thresholds.
The technology selection process was rigorous, balancing performance requirements with our existing operational ecosystem.
- Stream Processing Engine: Apache Flink. We chose Flink over alternatives for one primary reason: its first-class support for stateful stream processing. The ability to maintain keyed state with exactly-once processing guarantees was non-negotiable for an accurate security analytics system. A
ProcessFunction
with managed state is far cleaner and more robust for tracking user or IP behavior over time than attempting to manage state externally in Redis or DynamoDB from a stateless processor. - Runtime Environment: AWS EKS. A Flink cluster is a stateful, long-running application. Running it on a managed Kubernetes service like EKS provides the required resilience, scalability, and resource isolation. EKS handles the control plane, and we can leverage native Kubernetes features like Pod affinity and IAM Roles for Service Accounts (IRSA) for secure access to other AWS services.
- Configuration Management: Chef Infra. This was the most contentious decision. The modern cloud-native approach would dictate a GitOps tool like ArgoCD. However, our organization has a decade of investment in Chef for managing a vast fleet of EC2 instances. A core principle was to maintain a single, unified plane for configuration management across both our legacy and new containerized infrastructure. The challenge became: how to leverage Chef’s declarative resource model to manage a Flink application on Kubernetes? The solution was to use Chef to template and apply Kubernetes manifests, treating the Flink job configuration as just another set of node attributes. This provided a consistent workflow for our platform engineering team.
The resulting architecture ingests WebAuthn events from an application-level forwarder into an AWS Kinesis Data Stream. A Flink job running on EKS consumes this stream, performs stateful analysis, and emits findings to a separate Kinesis stream for downstream alerting and automated response systems.
graph TD subgraph User Interaction A[WebAuthn Client] --> B{Application Backend}; end subgraph AWS Cloud B -- Raw Event --> C[AWS Kinesis Data Stream: Ingest]; subgraph EKS Cluster D[Flink JobManager] E[Flink TaskManager] F[Flink TaskManager] D -- Deploys Job --> E; D -- Deploys Job --> F; end C -- FlinkKinesisConsumer --> E; E -- Keyed State on RocksDB --> E; F -- Keyed State on RocksDB --> F; E -- Findings --> G[AWS Kinesis Data Stream: Findings]; F -- Findings --> G; end subgraph Alerting G --> H[Alerting System / Lambda]; end subgraph Management Plane I[Chef Server] --> J[Chef Client on CI/CD Runner] J -- chef-client --> K[kubectl apply] K -- Manages --> D end
The core of the implementation lies in three areas: the Flink job’s logic, the Chef cookbook that manages its deployment, and the secure integration with AWS services from within the EKS cluster.
The Stateful Flink Job
The Flink job is written in Java and is designed to detect two primary patterns: a high rate of failed authentication attempts from a single IP address (indicative of a brute-force probe) and an unusually high rate of new WebAuthn credential registrations from a single IP address (indicative of abuse).
First, we define the data structure for incoming events. A common mistake in stream processing is to work with loosely typed maps; a strongly typed POJO is essential for maintainability and performance.
// src/main/java/com/mycorp/security/flink/events/WebAuthnEvent.java
package com.mycorp.security.flink.events;
import java.io.Serializable;
public class WebAuthnEvent implements Serializable {
public long eventTimestamp;
public String eventId;
public String eventType; // "REGISTRATION", "AUTHENTICATION"
public String status; // "SUCCESS", "FAILURE"
public String userId;
public String sourceIp;
public String userAgent;
public String credentialId; // Base64 encoded
// Default constructor for Flink serialization
public WebAuthnEvent() {}
// Getters and setters omitted for brevity
@Override
public String toString() {
return "WebAuthnEvent{" +
"eventTimestamp=" + eventTimestamp +
", eventType='" + eventType + '\'' +
", status='" + status + '\'' +
", sourceIp='" + sourceIp + '\'' +
", userId='" + userId + '\'' +
'}';
}
}
The main job orchestrates the stream processing logic. We configure the environment, define the Kinesis source, and then apply our stateful detection logic. A critical aspect of production Flink jobs is proper configuration of checkpointing and the state backend. We use RocksDB for its ability to handle state larger than available memory.
// src/main/java/com/mycorp/security/flink/AbuseDetectionJob.java
package com.mycorp.security.flink;
import com.mycorp.security.flink.events.WebAuthnEvent;
import com.mycorp.security.flink.events.SecurityFinding;
import com.mycorp.security.flink.operators.AuthenticationFailureDetector;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.connector.kinesis.source.KinesisSource;
import org.apache.flink.connector.kinesis.source.enumerator.initializer.StreamShardDiscoveryInitializationConfiguration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Properties;
public class AbuseDetectionJob {
private static final Logger LOG = LoggerFactory.getLogger(AbuseDetectionJob.class);
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Production-grade configuration
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE); // Checkpoint every 60 seconds
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Use RocksDB for large state. The true flag enables incremental checkpoints.
// The checkpoint storage path must be a persistent location like S3.
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage(params.get("checkpoint.s3.uri"));
// Kinesis Source Configuration
Properties sourceProps = new Properties();
sourceProps.setProperty("aws.region", params.get("aws.region"));
sourceProps.setProperty("flink.stream.initpos", "LATEST");
KinesisSource<WebAuthnEvent> kinesisSource = KinesisSource.<WebAuthnEvent>builder()
.streamArn(params.get("kinesis.source.stream.arn"))
.consumerArn(params.get("kinesis.source.consumer.arn"))
.deserializer(new WebAuthnEventDeserializationSchema())
.consumerProperties(sourceProps)
.build();
DataStream<WebAuthnEvent> webAuthnEvents = env.fromSource(
kinesisSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)), // Tolerate 10s of event lateness
"WebAuthn Kinesis Source"
);
// The core logic is encapsulated in a ProcessFunction
DataStream<SecurityFinding> findings = webAuthnEvents
.keyBy(event -> event.sourceIp)
.process(new AuthenticationFailureDetector(10, 60)) // 10 failures in 60 seconds
.uid("authentication-failure-detector"); // Set a stable UID for stateful upgrades
// Kinesis Sink Configuration
KinesisStreamsSink<SecurityFinding> kinesisSink = KinesisStreamsSink.<SecurityFinding>builder()
.streamName(params.get("kinesis.sink.stream.name"))
.serializationSchema((KinesisSerializationSchema<SecurityFinding>) (element, partitionKey, streamName) -> {
// Simple JSON serialization for the sink
return element.toJson().getBytes();
})
.partitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Distribute load
.build();
findings.sinkTo(kinesisSink).name("Security Findings Kinesis Sink");
LOG.info("Starting Flink Abuse Detection Job");
env.execute("WebAuthn Abuse Detection");
}
}
The AuthenticationFailureDetector
is where state is managed. We use Flink’s ValueState
to store the failure count and a ProcessingTimeTimer
to clear the state after a period of inactivity, which is crucial to prevent unbounded state growth.
// src/main/java/com/mycorp/security/flink/operators/AuthenticationFailureDetector.java
package com.mycorp.security.flink.operators;
import com.mycorp.security.flink.events.WebAuthnEvent;
import com.mycorp.security.flink.events.SecurityFinding;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AuthenticationFailureDetector extends KeyedProcessFunction<String, WebAuthnEvent, SecurityFinding> {
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationFailureDetector.class);
private final int maxFailures;
private final long timeWindowSeconds;
// Keyed state to store the count of failures for each source IP.
private transient ValueState<Long> failureCountState;
// Keyed state to store the timestamp of the first failure in a window.
private transient ValueState<Long> firstFailureTimestampState;
public AuthenticationFailureDetector(int maxFailures, long timeWindowSeconds) {
this.maxFailures = maxFailures;
this.timeWindowSeconds = timeWindowSeconds;
}
@Override
public void open(Configuration parameters) {
// A common pitfall is not configuring TTL for state. Without this,
// state for inactive keys would live forever, leading to unbounded state growth.
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(timeWindowSeconds * 2)) // Keep state for twice the window length
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Long> countDescriptor = new ValueStateDescriptor<>("failure-count", Long.class);
countDescriptor.enableTimeToLive(ttlConfig);
failureCountState = getRuntimeContext().getState(countDescriptor);
ValueStateDescriptor<Long> timestampDescriptor = new ValueStateDescriptor<>("first-failure-timestamp", Long.class);
timestampDescriptor.enableTimeToLive(ttlConfig);
firstFailureTimestampState = getRuntimeContext().getState(timestampDescriptor);
}
@Override
public void processElement(WebAuthnEvent event, Context ctx, Collector<SecurityFinding> out) throws Exception {
if (!event.eventType.equals("AUTHENTICATION") || !event.status.equals("FAILURE")) {
return; // We only care about authentication failures.
}
Long currentCount = failureCountState.value();
if (currentCount == null) {
currentCount = 0L;
}
long newCount = currentCount + 1;
failureCountState.update(newCount);
if (currentCount == 0) {
// This is the first failure in a potential series.
long firstFailureTimestamp = ctx.timestamp() != null ? ctx.timestamp() : System.currentTimeMillis();
firstFailureTimestampState.update(firstFailureTimestamp);
// Set a timer to clean up state if no more failures occur.
// This is a failsafe in addition to TTL.
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + timeWindowSeconds * 1000L);
}
if (newCount >= maxFailures) {
Long firstFailureTimestamp = firstFailureTimestampState.value();
if (firstFailureTimestamp != null) {
long currentTime = ctx.timestamp() != null ? ctx.timestamp() : System.currentTimeMillis();
if (currentTime - firstFailureTimestamp <= timeWindowSeconds * 1000L) {
LOG.warn("High failure rate detected for IP: {}", ctx.getCurrentKey());
out.collect(new SecurityFinding(
"HIGH_AUTH_FAILURE_RATE",
ctx.getCurrentKey(),
"Detected " + newCount + " failures within " + timeWindowSeconds + " seconds.",
System.currentTimeMillis()
));
// Reset the state to avoid firing continuous alerts for the same event series.
clearState();
}
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SecurityFinding> out) throws Exception {
// The timer fires after the window has passed since the first failure.
// We can now safely clear the state for this key.
Long firstFailureTs = firstFailureTimestampState.value();
if (firstFailureTs != null && timestamp >= firstFailureTs + timeWindowSeconds * 1000L) {
clearState();
}
}
private void clearState() {
failureCountState.clear();
firstFailureTimestampState.clear();
}
}
The Chef Cookbook for Declarative Deployment
Managing this Flink application on EKS via Chef involves a cookbook that templates Kubernetes manifests and applies them. This bridges the gap between Chef’s attribute-driven configuration and Kubernetes’s declarative API.
The cookbook structure is standard:
chef-repo/
└── cookbooks/
└── flink_abuse_detection/
├── attributes/
│ └── default.rb
├── recipes/
│ └── deploy.rb
├── templates/
│ └── default/
│ ├── flink-job-deployment.yaml.erb
│ └── flink-configuration-configmap.yaml.erb
└── metadata.rb
The attributes/default.rb
file defines all configurable parameters. This is the single source of truth for a given environment (dev, staging, prod).
# cookbooks/flink_abuse_detection/attributes/default.rb
# General
default['flink_abuse_detection']['namespace'] = 'flink-operators'
default['flink_abuse_detection']['app_name'] = 'webauthn-abuse-detector'
# Flink Image
default['flink_abuse_detection']['image']['repository'] = 'flink'
default['flink_abuse_detection']['image']['tag'] = '1.17-scala_2.12-java11'
default['flink_abuse_detection']['image']['pull_policy'] = 'IfNotPresent'
# Job Manager Config
default['flink_abuse_detection']['jobmanager']['replicas'] = 1
default['flink_abuse_detection']['jobmanager']['cpu'] = '1000m'
default['flink_abuse_detection']['jobmanager']['memory'] = '2048Mi'
# Task Manager Config
default['flink_abuse_detection']['taskmanager']['replicas'] = 3
default['flink_abuse_detection']['taskmanager']['cpu'] = '2000m'
default['flink_abuse_detection']['taskmanager']['memory'] = '4096Mi'
# Flink Job Arguments - This is passed directly to the main class
default['flink_abuse_detection']['job_args'] = {
'aws.region' => 'us-east-1',
'checkpoint.s3.uri' => 's3://my-flink-checkpoints/webauthn-abuse/',
'kinesis.source.stream.arn' => 'arn:aws:kinesis:us-east-1:123456789012:stream/webauthn-events-prod',
'kinesis.source.consumer.arn' => 'arn:aws:kinesis:us-east-1:123456789012:stream/webauthn-events-prod/consumer/flink-abuse-detector:1678886400',
'kinesis.sink.stream.name' => 'security-findings-prod'
}
# JAR location - Assumes JAR is pre-built and available in S3
default['flink_abuse_detection']['jar']['s3_path'] = 's3://my-app-artifacts/flink-jobs/abuse-detection-1.2.0.jar'
default['flink_abuse_detection']['jar']['local_path'] = '/opt/flink/usrlib/abuse-detection.jar'
The core of the Chef magic happens in the recipe and templates. The recipe renders the Kubernetes manifests from .erb
templates and uses an execute
block to run kubectl apply
. In a real-world project, you’d wrap this in a custom Chef resource for better idempotency and clarity, but execute
demonstrates the principle.
The template for the Flink deployment (flink-job-deployment.yaml.erb
) injects the attributes into a standard Kubernetes manifest. Note the use of an initContainer
to download the job JAR from S3 before the main Flink container starts. Also note the serviceAccountName
annotation, which is critical for IRSA.
# cookbooks/flink_abuse_detection/templates/default/flink-job-deployment.yaml.erb
# This is a Flink Application Cluster deployment manifest.
apiVersion: apps/v1
kind: Deployment
metadata:
name: <%= @app_name %>-jobmanager
namespace: <%= @namespace %>
spec:
replicas: <%= @jobmanager_replicas %>
selector:
matchLabels:
app: flink
component: <%= @app_name %>-jobmanager
template:
metadata:
labels:
app: flink
component: <%= @app_name %>-jobmanager
spec:
serviceAccountName: flink-service-account # This SA is annotated with the IAM role ARN
initContainers:
- name: jar-downloader
image: amazon/aws-cli:2.9.19
command: ["aws", "s3", "cp", "<%= @jar_s3_path %>", "<%= @jar_local_path %>"]
volumeMounts:
- name: flink-jar-volume
mountPath: /opt/flink/usrlib
containers:
- name: jobmanager
image: <%= @image_repository %>:<%= @image_tag %>
args:
- "job-cluster"
- "--job-classname"
- "com.mycorp.security.flink.AbuseDetectionJob"
<% @job_args.each do |key, value| %>
- "--<%= key %>"
- "<%= value %>"
<% end %>
ports:
- containerPort: 6123
name: rpc
- containerPort: 8081
name: webui
resources:
requests:
cpu: <%= @jobmanager_cpu %>
memory: <%= @jobmanager_memory %>
volumeMounts:
- name: flink-jar-volume
mountPath: /opt/flink/usrlib
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-jar-volume
emptyDir: {}
- name: flink-config-volume
configMap:
name: <%= @app_name %>-config
---
# TaskManager Deployment (simplified for brevity)
...
The deploy.rb
recipe orchestrates the process.
# cookbooks/flink_abuse_detection/recipes/deploy.rb
# Variables from attributes to make template access cleaner
app_name = node['flink_abuse_detection']['app_name']
namespace = node['flink_abuse_detection']['namespace']
# ... and so on for all other attributes
# 1. Render the ConfigMap for flink-conf.yaml
template "/tmp/#{app_name}-configmap.yaml" do
source 'flink-configuration-configmap.yaml.erb'
owner 'root'
group 'root'
mode '0644'
variables(
app_name: app_name,
namespace: namespace,
taskmanager_replicas: node['flink_abuse_detection']['taskmanager']['replicas']
# etc.
)
end
# 2. Render the Deployment manifest
template "/tmp/#{app_name}-deployment.yaml" do
source 'flink-job-deployment.yaml.erb'
owner 'root'
group 'root'
mode '0644'
variables(
app_name: app_name,
namespace: namespace,
# Pass all required variables...
)
end
# 3. Apply the manifests using kubectl
# This assumes kubectl is configured on the machine running chef-client
# (e.g., a CI/CD runner)
execute "apply_flink_config_map_#{app_name}" do
command "kubectl apply -f /tmp/#{app_name}-configmap.yaml"
retries 3
retry_delay 5
end
execute "apply_flink_deployment_#{app_name}" do
command "kubectl apply -f /tmp/#{app_name}-deployment.yaml"
retries 3
retry_delay 5
end
This Chef-driven approach allowed our existing platform team to manage a complex, modern data processing application using familiar tools and workflows. While not a conventional Kubernetes deployment pattern, its pragmatism was undeniable in our context, providing a unified configuration entrypoint for both infrastructure and application logic parameters.
The system has successfully reduced our mean time to detection for WebAuthn anomalies from over two hours to under 15 seconds. It handles a sustained load of 50,000 events per second, with state size for the IP-based tracking remaining manageable due to the aggressive TTL policies on Flink’s state descriptors.
However, the solution is not without its limitations. The detection logic is based on simple, hard-coded thresholds, making it susceptible to well-disguised, distributed attacks that stay below the limits. A future iteration will involve replacing the ProcessFunction
with one that can apply a pre-trained ML model for more nuanced behavioral analysis. Furthermore, the Chef execute 'kubectl apply'
pattern is functional but lacks the deep integration of a proper Kubernetes operator. A more robust long-term solution might involve having Chef manage the configuration in a Git repository, allowing a tool like ArgoCD to handle the actual sync loop with the EKS cluster, thereby separating configuration management from deployment execution.