The initial problem wasn’t a catastrophic failure; it was a slow, morale-draining bleed of engineering time. Our core business process automation is driven by a Kanban board. A card moves from ‘Review’ to ‘Deploy’, and a flurry of serverless functions are meant to kick off a CI/CD pipeline. A card moves to ‘Done’, and another function updates our billing system. The architecture worked, but it was a black box. When a deployment failed to start, the question “why?” triggered a painful scavenger hunt. We’d check the OpenFaaS function logs on our EKS cluster. Then we’d check the logs of the downstream GraphQL service it called. Then we’d check the SQS queue metrics in AWS. Each system had its own log stream, its own timestamp format, and absolutely no shared context. A five-minute fix was preceded by a two-hour investigation, trying to stitch together a story from disconnected fragments.
The concept we settled on was deceptively simple: traceability through a single, shared identifier. If we could generate a unique correlation_id
the moment a Kanban event entered our system and ensure every subsequent action—every function invocation, every API call, every log message—carried this ID, we could transform our chaotic log swamp into a coherent, searchable narrative. This wasn’t about adding more logging; it was about adding context to the logging we already had.
Our technology stack was mostly fixed, a result of prior architectural decisions. The Kanban system sent a webhook. We needed a resilient endpoint to catch it, so AWS API Gateway pushing to an SQS queue was the obvious choice. The queue provides a crucial buffer, protecting our system from webhook storms and allowing for retries. For compute, we were already invested in Kubernetes, so OpenFaaS on EKS gave us serverless semantics without abandoning our existing infrastructure and expertise. The functions themselves interacted with a central business logic service exposed via GraphQL. And for logging, a company-wide ELK stack was the destination. The challenge was not in choosing the components, but in weaving a thread of context through them.
graph TD subgraph "External" A[Kanban System] end subgraph "AWS Infrastructure" B[API Gateway] C[SQS Queue] D[EKS Cluster] end subgraph "EKS Cluster" E[OpenFaaS Gateway] F[Function Pod: Kanban Processor] G[Log Shipper: Filebeat DaemonSet] end subgraph "Downstream Services" H[GraphQL API] end subgraph "Observability Platform" I[ELK Stack] end A -- Webhook --> B B -- Injects correlation_id --> C E -- Polls --> C E -- Invokes --> F F -- Python GraphQL Client w/ correlation_id header --> H F -- Writes JSON logs --> G G -- Forwards logs --> I style F fill:#f9f,stroke:#333,stroke-width:2px style I fill:#ccf,stroke:#333,stroke-width:2px
The first point of implementation was the entry point: the API Gateway. This is our one chance to mint the correlation_id
that the rest of the system will inherit. We can’t trust the incoming webhook to provide a unique identifier. A common mistake is to rely on an external system for this; it’s better to own the context generation yourself. We used an AWS VTL (Velocity Template Language) mapping template to transform the incoming request before it hits the SQS integration.
This template does three critical things: it extracts the raw webhook body, generates a UUID using $context.requestId
(which is unique per request), and wraps it all in a structured JSON message for SQS.
Here is the core Terraform for this setup. In a real-world project, you never configure this stuff by hand in the console. Infrastructure must be code.
# main.tf
resource "aws_sqs_queue" "kanban_events_queue" {
name = "kanban-events-queue"
}
resource "aws_api_gateway_rest_api" "webhook_api" {
name = "Webhook Ingestion API"
description = "Receives webhooks and forwards to SQS"
}
resource "aws_api_gateway_resource" "webhook_resource" {
rest_api_id = aws_api_gateway_rest_api.webhook_api.id
parent_id = aws_api_gateway_rest_api.webhook_api.root_resource_id
path_part = "kanban-event"
}
resource "aws_api_gateway_method" "webhook_post_method" {
rest_api_id = aws_api_gateway_rest_api.webhook_api.id
resource_id = aws_api_gateway_resource.webhook_resource.id
http_method = "POST"
authorization = "NONE"
}
resource "aws_iam_role" "api_gateway_sqs_role" {
name = "api-gateway-sqs-integration-role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "apigateway.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy" "api_gateway_sqs_policy" {
name = "api-gateway-sqs-send-message-policy"
role = aws_iam_role.api_gateway_sqs_role.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Action = "sqs:SendMessage",
Effect = "Allow",
Resource = aws_sqs_queue.kanban_events_queue.arn
}]
})
}
resource "aws_api_gateway_integration" "webhook_sqs_integration" {
rest_api_id = aws_api_gateway_rest_api.webhook_api.id
resource_id = aws_api_gateway_resource.webhook_resource.id
http_method = aws_api_gateway_method.webhook_post_method.http_method
type = "AWS"
integration_http_method = "POST"
uri = "arn:aws:apigateway:${data.aws_region.current.name}:sqs:path/${data.aws_account_id.current.account_id}/${aws_sqs_queue.kanban_events_queue.name}"
credentials = aws_iam_role.api_gateway_sqs_role.arn
request_parameters = {
"integration.request.header.Content-Type" = "'application/x-www-form-urlencoded'"
}
# This is the critical part for injecting context
request_templates = {
"application/json" = <<TEMPLATE
#set($body = $input.json('$'))
#set($encodedBody = $util.urlEncode($body))
Action=SendMessage&MessageBody={
"correlation_id": "$context.requestId",
"source": "kanban_webhook",
"timestamp": "$context.requestTimeEpoch",
"payload": $input.json('$')
}
TEMPLATE
}
}
With the correlation_id
now safely inside the SQS message, the next step was the OpenFaaS function. The pitfall here is simply extracting the ID and then passing it around as a parameter to every helper function and logging call. That’s brittle and clutters the business logic. A much cleaner approach is to use a mechanism that makes the ID ambiently available throughout the request’s lifecycle within the function. Python’s logging
module provides a perfect hook for this: Filters.
We defined a custom filter that, upon initialization for a given request, stores the correlation ID. Then, for every log record it processes, it injects that ID. We also switched to a structured JSON logger to make the output immediately parsable by our ELK stack.
Here’s the handler for the OpenFaaS function. It’s written in Python, and its dependencies are managed via a requirements.txt
file.
# kanban-processor/handler.py
import os
import sys
import json
import logging
from pythonjsonlogger import jsonlogger
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
# --- Logging Setup ---
# This is the core of our contextual logging strategy.
# It should be set up once when the module is loaded.
class CorrelationIdFilter(logging.Filter):
"""
A logging filter that injects a correlation_id into the log record.
The ID is set on a per-request basis.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._correlation_id = "unset"
def filter(self, record):
record.correlation_id = self._correlation_id
return True
def set_correlation_id(self, corr_id):
self._correlation_id = corr_id
# Instantiate the filter and logger globally.
correlation_filter = CorrelationIdFilter()
log = logging.getLogger(__name__)
log.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())
logHandler = logging.StreamHandler(sys.stdout)
# Use our custom filter
logHandler.addFilter(correlation_filter)
# Use JSON formatter for structured logs consumable by ELK
formatter = jsonlogger.JsonFormatter('%(asctime)s %(name)s %(levelname)s %(correlation_id)s %(message)s')
logHandler.setFormatter(formatter)
if not log.handlers:
log.addHandler(logHandler)
# --- GraphQL Client Setup ---
# The client is configured to accept a correlation_id to pass in headers.
def get_graphql_client(correlation_id: str):
"""
Initializes and returns a GraphQL client with the correlation ID in the headers.
In a real-world project, the URL and any auth tokens would come from environment variables.
"""
graphql_api_url = os.environ.get("GRAPHQL_API_URL", "http://my-api/graphql")
headers = {
"Content-Type": "application/json",
"X-Correlation-ID": correlation_id # Propagate the ID downstream
}
transport = AIOHTTPTransport(url=graphql_api_url, headers=headers)
return Client(transport=transport, fetch_schema_from_transport=False)
# --- Main Handler ---
async def handle(event, context):
"""
Handles an SQS event from OpenFaaS.
"""
try:
# SQS events from OpenFaaS might be wrapped. Let's find the actual message.
if "Records" in event:
message_body_str = event["Records"][0]["body"]
else:
# For direct invocation/testing
message_body_str = event
message_body = json.loads(message_body_str)
correlation_id = message_body.get("correlation_id", "not-provided")
# CRITICAL: Set the correlation ID for the logger context for this specific invocation.
correlation_filter.set_correlation_id(correlation_id)
log.info("Processing Kanban event.")
payload = message_body.get("payload", {})
card_id = payload.get("card", {}).get("id")
new_status = payload.get("new_status")
if not card_id or not new_status:
log.error("Missing card_id or new_status in payload.", extra={"payload": payload})
return {"statusCode": 400, "body": "Invalid payload"}
log.info(f"Card {card_id} moved to status {new_status}.")
# Business Logic: Based on status, call a GraphQL mutation.
if new_status == "Deploying":
client = get_graphql_client(correlation_id)
mutation = gql("""
mutation StartDeployment($cardId: String!) {
startDeployment(cardId: $cardId) {
success
deploymentId
message
}
}
""")
params = {"cardId": str(card_id)}
log.info("Executing StartDeployment mutation.", extra={"gql_params": params})
result = await client.execute_async(mutation, variable_values=params)
log.info("GraphQL mutation completed successfully.", extra={"gql_result": result})
else:
log.warning(f"No action defined for status: {new_status}")
return {
"statusCode": 200,
"body": json.dumps({"status": "processed", "correlation_id": correlation_id})
}
except json.JSONDecodeError as e:
# Don't set correlation_id here as we couldn't parse it.
correlation_filter.set_correlation_id("parse-error")
log.exception("Failed to decode JSON from event body.")
return {"statusCode": 400, "body": "JSON decode error"}
except Exception as e:
# The correlation_id should already be set if parsing was successful.
log.exception("An unexpected error occurred during processing.")
return {"statusCode": 500, "body": "Internal server error"}
The function code now handles propagation both into the logs and into the downstream GraphQL call via the X-Correlation-ID
header. The downstream service must, in turn, be configured to read this header and continue the chain, but our function has done its part.
The final piece of the puzzle was getting these structured logs from the function’s pod stdout into Elasticsearch. A common mistake is to have the application code try to log directly to a service like Logstash. This creates a tight coupling and is unreliable; if the logging endpoint is down, you lose logs or the application could block. The standard, robust pattern in Kubernetes is to log to stdout/stderr and have a cluster-level agent collect, process, and forward these logs. We used a Filebeat DaemonSet, which runs one Filebeat pod on every node in the cluster.
The configuration for Filebeat is critical. It needs to know how to find the container logs, how to parse the JSON, and how to ship them to Logstash or Elasticsearch.
# filebeat-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: filebeat
namespace: kube-system
labels:
k8s-app: filebeat
spec:
selector:
matchLabels:
k8s-app: filebeat
template:
metadata:
labels:
k8s-app: filebeat
spec:
serviceAccountName: filebeat
terminationGracePeriodSeconds: 30
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: filebeat
image: docker.elastic.co/beats/filebeat:7.17.5 # Use a specific version
args: [
"-c", "/etc/filebeat.yml",
"-e",
]
env:
- name: ELASTICSEARCH_HOST
value: "elasticsearch-host.example.com"
- name: ELASTICSEARCH_PORT
value: "9200"
- name: ELASTICSEARCH_USERNAME
value: "filebeat_writer"
- name: ELASTICSEARCH_PASSWORD
valueFrom:
secretKeyRef:
name: filebeat-credentials
key: password
securityContext:
runAsUser: 0
# privileged: true # Necessary if using autodiscover on Docker
resources:
limits:
memory: 200Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: config
mountPath: /etc/filebeat.yml
readOnly: true
subPath: filebeat.yml
- name: data
mountPath: /usr/share/filebeat/data
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: varlog
mountPath: /var/log/
readOnly: true
volumes:
- name: config
configMap:
defaultMode: 0640
name: filebeat-config
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: varlog
hostPath:
path: /var/log/
- name: data
hostPath:
# When filebeat stops it containers persits the state in this file
path: /var/lib/filebeat-data
type: DirectoryOrCreate
---
# filebeat-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
processors:
- add_kubernetes_metadata:
in_cluster: true
# The key processor for our structured logs
- decode_json_fields:
fields: ["log", "message"] # 'log' or 'message' is where docker/containerd puts the raw log line
target: "json"
overwrite_keys: true
add_error_key: true
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}']
username: '${ELASTICSEARCH_USERNAME}'
password: '${ELASTICSEARCH_PASSWORD}'
index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
# To reduce noise, filter only logs from our functions namespace
filebeat.autodiscover:
providers:
- type: kubernetes
hints.enabled: true
templates:
- condition:
equals:
kubernetes.namespace: "openfaas-fn"
config:
- type: container
paths:
- /var/log/containers/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}-*.log
The decode_json_fields
processor is the lynchpin. It tells Filebeat to inspect the raw log line, and if it’s a valid JSON string, to parse it and promote its fields to the top level of the Elasticsearch document. Our custom correlation_id
field, added by the Python logger, becomes a first-class, searchable field.
The result was transformative. When an alert fired for a failed deployment, the process was no longer a frantic search. We’d grab the card_id
from the alert, find the initial SQS message for it in our logs, and extract the correlation_id
. A single query in Kibana, json.correlation_id: "the-uuid-we-found"
, would then instantly paint the entire picture. We could see the event arriving at the API Gateway, the SQS message details, the FaaS function spinning up, its log messages showing the payload it processed, the exact GraphQL mutation it sent, and finally, the error it received. What once took hours now took seconds.
This system isn’t without its limitations. Its effectiveness hinges on the discipline of every developer to propagate the context. If a new downstream service is added and its client doesn’t pass the X-Correlation-ID
header, the trace is broken. This points towards a future iteration where context propagation is handled more automatically, perhaps by a service mesh or by adopting a standardized framework like OpenTelemetry, which treats traces as a primary concern rather than something bolted onto logging. Furthermore, this only traces the “happy path” and expected errors within our own systems. We still lack deep visibility into the managed services themselves, like the internals of SQS or API Gateway, but for debugging our own application logic, the leap forward was immense.