An Immutable Infrastructure Approach to Polyglot Kafka Stream Processing


A data corruption bug that only manifests in the production environment is one of the most challenging problems in distributed systems. We recently spent nearly a week chasing one. Our system, a financial transaction processing pipeline, involved a core stateful stream processor written in Scala using Kafka Streams and a lightweight Node.js/TypeScript-based service for auditing and metrics. Both interacted with the same Kafka topics. Intermittently, under high load, the auditor would flag transactions as invalid, while the Scala service had processed them correctly. The logs were clean, and reproducing the issue in staging was impossible.

The root cause was maddeningly subtle: a discrepancy in the Glibc version between the production Amazon Linux 2 AMI and the Debian-based developer Docker images. This caused a native dependency in one of the Node.js libraries to behave differently in its floating-point arithmetic under specific edge cases. The application code was identical, but the environment it ran in was not. This incident solidified our decision to eradicate environment drift entirely by adopting a strict immutable infrastructure model. Instead of provisioning instances and then configuring them, we would bake a single, versioned “golden” Amazon Machine Image (AMI) with every dependency—from the OS up to our application binaries—and deploy this identical artifact across all environments.

Our chosen tool for this was HashiCorp Packer. It allows us to define our image as code, version it in Git, and integrate it into our CI/CD pipeline. The core idea is to shift the complexity of environment setup from runtime (via configuration management tools like Ansible on a live server) to build time. The resulting AMI is an immutable, self-contained unit of deployment.

The architecture we are building looks like this:

graph TD
    subgraph "CI/CD Pipeline"
        A[Git Commit] --> B{Build Artifacts};
        B -- Scala JAR --> C[Packer Build];
        B -- TS Bundle --> C;
        C -- "Creates Golden AMI" --> D[AMI Registry];
    end

    subgraph "AWS Environment (Dev/Staging/Prod)"
        D --> E{Auto Scaling Group 1};
        E -- "Launches EC2 from AMI" --> F1[EC2: Scala Processor];
        F1 --> G[Apache Kafka];
        F1 --> H[Output Topic];
        I[Input Topic] --> G --> F1;

        D --> J{Auto Scaling Group 2};
        J -- "Launches EC2 from AMI" --> K1[EC2: TypeScript Auditor];
        G --> K1;
        H --> K1;
    end

    style F1 fill:#dff,stroke:#333,stroke-width:2px
    style K1 fill:#fdf,stroke:#333,stroke-width:2px

This diagram shows how both the Scala and TypeScript services are launched from the exact same AMI, ensuring absolute environmental consistency. The only difference at runtime will be the command used to start the respective service.

The Packer Foundation: A Universal Runtime Image

The first step is to create a Packer template that builds an AMI containing everything needed for both our Scala and TypeScript applications. This includes the operating system, the Amazon Corretto JDK for the Scala service, the Node.js runtime for the TypeScript service, and any other shared system-level dependencies. In a real-world project, you’d pull application artifacts from a repository like Artifactory or S3; for this example, we will copy them from the local machine where Packer runs.

Here is the complete Packer build configuration file, stream-processor.pkr.hcl.

// stream-processor.pkr.hcl
packer {
  required_plugins {
    amazon = {
      version = ">= 1.2.1"
      source  = "github.com/hashicorp/amazon"
    }
  }
}

variable "aws_region" {
  type    = string
  default = "us-east-1"
}

variable "app_version" {
  type    = string
  default = "1.0.0"
}

// Define the source AMI to build upon. We start with a standard Amazon Linux 2 image.
source "amazon-ebs" "stream-processor-base" {
  ami_name      = "polyglot-stream-processor-${var.app_version}-${timestamp()}"
  instance_type = "t3.medium"
  region        = var.aws_region
  source_ami_filter {
    filters = {
      name                = "amzn2-ami-hvm-*-x86_64-gp2"
      root-device-type    = "ebs"
      virtualization-type = "hvm"
    }
    most_recent = true
    owners      = ["amazon"]
  }
  ssh_username = "ec2-user"
  tags = {
    Name    = "Polyglot Stream Processor"
    Version = var.app_version
    Source  = "Packer"
  }
}

// The 'build' block orchestrates the image creation process.
build {
  name    = "polyglot-stream-processor"
  sources = ["source.amazon-ebs.stream-processor-base"]

  // Provisioners are the heart of the build. They install and configure software inside the temporary EC2 instance.
  provisioner "shell" {
    inline = [
      "sudo yum update -y",
      "sudo yum install -y java-11-amazon-corretto-devel", // Install Corretto 11
      "curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.1/install.sh | bash", // Install NVM for Node.js
      ". ~/.nvm/nvm.sh && nvm install 18 && nvm alias default 18" // Install and set default Node.js version
    ]
  }

  // Create directories for our applications
  provisioner "shell" {
    inline = [
      "sudo mkdir -p /app/scala-processor",
      "sudo chown -R ec2-user:ec2-user /app",
      "sudo mkdir -p /app/ts-auditor",
      "sudo chown -R ec2-user:ec2-user /app"
    ]
  }

  // A common mistake is not specifying the destination path correctly.
  // Packer runs as a temporary user, but our app needs a permanent home.
  // We'll upload our pre-built artifacts to these directories.
  // In a real CI/CD pipeline, these would be downloaded from an artifact repository.
  provisioner "file" {
    source      = "../scala-processor/target/scala-2.13/scala-processor-assembly-0.1.0-SNAPSHOT.jar"
    destination = "/tmp/scala-processor.jar"
  }

  provisioner "file" {
    source      = "../ts-auditor/dist/"
    destination = "/tmp/ts-auditor-dist"
  }
  
  provisioner "file" {
    source      = "../ts-auditor/node_modules/"
    destination = "/tmp/ts-auditor-node_modules"
  }

  provisioner "file" {
    source      = "../ts-auditor/package.json"
    destination = "/tmp/package.json"
  }

  // Move the artifacts from the temporary location to the final application directory.
  provisioner "shell" {
    inline = [
      "sudo mv /tmp/scala-processor.jar /app/scala-processor/app.jar",
      "sudo mv /tmp/ts-auditor-dist/* /app/ts-auditor/",
      "sudo mv /tmp/ts-auditor-node_modules /app/ts-auditor/",
      "sudo mv /tmp/package.json /app/ts-auditor/"
    ]
  }
}

This Packer configuration defines a repeatable process. It starts a base Amazon Linux 2 instance, installs a specific JDK and Node.js version, creates application directories, and copies our pre-compiled application artifacts into the image. The pitfall here is managing permissions and temporary file locations; it’s critical to ensure the final artifacts land in the correct place with the right ownership. Running packer build . will now produce a single, versioned AMI that serves as a consistent foundation for all our deployments.

The Scala Core: Stateful Fraud Detection

The Scala service is the heart of our pipeline. It uses Kafka Streams to perform stateful analysis of financial transactions to calculate a real-time fraud score. We’re looking for patterns like a high number of transactions from the same card in a short time window.

First, the build.sbt dependencies:

// build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.10"

lazy val root = (project in file("."))
  .settings(
    name := "scala-processor",
    libraryDependencies ++= Seq(
      "org.apache.kafka" %% "kafka-streams-scala" % "3.4.0",
      "org.slf4j" % "slf4j-api" % "2.0.7",
      "ch.qos.logback" % "logback-classic" % "1.4.7",
      "io.circe" %% "circe-core" % "0.14.5",
      "io.circe" %% "circe-generic" % "0.14.5",
      "io.circe" %% "circe-parser" % "0.14.5"
    ),
    assembly / assemblyJarName := s"${name.value}-assembly-${version.value}.jar"
  )

Now, the core application logic. We will define case classes for our data model, create custom Serdes (Serializer/Deserializer) using Circe for JSON handling, and implement the stream processing topology.

// src/main/scala/com/example/processor/FraudDetector.scala
package com.example.processor

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
import org.apache.kafka.common.serialization.{Serde, Serdes => JSerdes}

import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._

import java.time.Duration
import java.util.Properties
import java.nio.charset.StandardCharsets

object JsonSerde {
  // A generic JSON Serde for any case class with an implicit Circe encoder/decoder
  def apply[T >: Null : Encoder : Decoder]: Serde[T] = {
    val serializer = (data: T) => data.asJson.noSpaces.getBytes(StandardCharsets.UTF_8)
    val deserializer = (bytes: Array[Byte]) => {
      val string = new String(bytes, StandardCharsets.UTF_8)
      decode[T](string).toOption
    }
    JSerdes.fromFn(serializer, deserializer)
  }
}

// --- Data Models ---
case class Transaction(transactionId: String, cardId: String, amount: Double, timestamp: Long)
case class EnrichedTransaction(transactionId: String, cardId: String, amount: Double, isFlagged: Boolean, reason: String)

object FraudDetector {

  val INPUT_TOPIC = "financial-transactions"
  val OUTPUT_TOPIC = "enriched-transactions"
  val BOOTSTRAP_SERVERS = sys.env.getOrElse("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-detector-processor")
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    // For production, this should be higher for resilience
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1) 
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)

    val topology = createTopology()
    val streams = new KafkaStreams(topology, props)

    // A critical piece for production-grade apps: handle uncaught exceptions.
    streams.setUncaughtExceptionHandler { (thread: Thread, throwable: Throwable) =>
      println(s"FATAL: Uncaught exception in stream thread ${thread.getName}. Shutting down. Error: ${throwable.getMessage}")
      // This should trigger an alert and a graceful shutdown/restart policy (e.g., via systemd or a container orchestrator)
      System.exit(1)
    }

    // Add a shutdown hook for graceful exit
    sys.addShutdownHook {
      println("Shutdown hook called. Closing Kafka Streams.")
      streams.close(Duration.ofSeconds(10))
    }

    try {
      streams.start()
      println("Fraud Detector stream processor started.")
    } catch {
      case e: Throwable =>
        println(s"Error starting streams: ${e.getMessage}")
        System.exit(1)
    }
  }

  def createTopology(): Topology = {
    implicit val transactionSerde: Serde[Transaction] = JsonSerde[Transaction]
    implicit val enrichedTransactionSerde: Serde[EnrichedTransaction] = JsonSerde[EnrichedTransaction]

    val builder = new StreamsBuilder()

    val transactionStream = builder.stream[String, Transaction](INPUT_TOPIC)

    // The core logic: flag transactions if a card is used more than 3 times in a 1-minute window.
    // This is a stateful operation. Kafka Streams manages the state store internally.
    val flaggedStream = transactionStream
      .groupBy((_, transaction) => transaction.cardId) // Group by credit card ID
      .windowedBy(org.apache.kafka.streams.scala.kstream.TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
      .count() // Count occurrences within the window
      .toStream
      .filter((_, count) => count > 3)
      .map { (windowedCardId, count) =>
        // When a card is flagged, we create a simple key-value pair to join against the original stream.
        // A more complex implementation might use a KTable for this state.
        (windowedCardId.key(), s"Flagged: $count transactions in 1 minute")
      }

    // Now, we join the original stream with our stream of flagged card IDs.
    // This is a stream-stream join with a time window to bound the state required.
    transactionStream
      .join(flaggedStream)(
        (transaction, reason) => EnrichedTransaction(transaction.transactionId, transaction.cardId, transaction.amount, isFlagged = true, reason),
        org.apache.kafka.streams.scala.kstream.JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30))
      )
      .to(OUTPUT_TOPIC)

    builder.build()
  }
}

This application demonstrates a non-trivial, stateful stream processing job. Key production considerations are included:

  1. Configuration via Environment Variables: KAFKA_BOOTSTRAP_SERVERS is read from the environment, which is standard practice for containerized/cloud deployments.
  2. Exactly-Once Semantics: EXACTLY_ONCE_V2 is enabled for data integrity, a critical requirement for financial systems.
  3. Error Handling: A robust uncaught exception handler and a shutdown hook are implemented to ensure the application behaves predictably on failure and can shut down gracefully.
  4. Custom Serdes: We use Circe for reliable JSON serialization, encapsulated in a generic Serde implementation.

The TypeScript Auditor: A Lightweight Polyglot Consumer

The auditor service is built with Node.js and TypeScript. Its purpose is to consume from both the input and output topics to perform validation or generate metrics. It’s much lighter than the Scala service and represents a different kind of component in our polyglot system. We use the popular kafkajs library.

First, package.json:

{
  "name": "ts-auditor",
  "version": "1.0.0",
  "description": "TypeScript Kafka Auditor Service",
  "main": "dist/index.js",
  "scripts": {
    "build": "tsc",
    "start": "node dist/index.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4"
  },
  "devDependencies": {
    "@types/node": "^18.15.11",
    "typescript": "^5.0.4"
  }
}

The application logic establishes a connection to Kafka and runs two consumers in the same consumer group.

// src/index.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';

// --- Configuration ---
const KAFKA_BROKERS = process.env.KAFKA_BOOTSTRAP_SERVERS?.split(',') || ['localhost:9092'];
const INPUT_TOPIC = 'financial-transactions';
const OUTPUT_TOPIC = 'enriched-transactions';
const GROUP_ID = 'ts-auditor-group';

// In a real-world project, interfaces would be shared in a common library/schema registry
interface Transaction {
    transactionId: string;
    cardId: string;
    amount: number;
    timestamp: number;
}

interface EnrichedTransaction {
    transactionId: string;
    cardId: string;
    amount: number;
    isFlagged: boolean;
    reason: string;
}

class AuditorService {
    private kafka: Kafka;
    private consumer: Consumer;

    constructor() {
        this.kafka = new Kafka({
            clientId: 'ts-auditor',
            brokers: KAFKA_BROKERS,
            // Production config would include SSL, SASL, and retry mechanisms
            retry: {
                initialRetryTime: 300,
                retries: 10
            }
        });

        this.consumer = this.kafka.consumer({ groupId: GROUP_ID });
    }

    public async start(): Promise<void> {
        try {
            await this.consumer.connect();
            console.log('Auditor service connected to Kafka.');

            await this.consumer.subscribe({ topics: [INPUT_TOPIC, OUTPUT_TOPIC], fromBeginning: true });
            console.log(`Subscribed to topics: ${INPUT_TOPIC}, ${OUTPUT_TOPIC}`);

            await this.consumer.run({
                eachMessage: this.handleMessage.bind(this),
            });
        } catch (error) {
            console.error('Failed to start auditor service:', error);
            // Implement a restart strategy or exit
            process.exit(1);
        }
    }

    private async handleMessage({ topic, partition, message }: EachMessagePayload): Promise<void> {
        if (!message.value) {
            console.warn(`Received null message on topic ${topic}`);
            return;
        }

        const messageValue = message.value.toString();

        try {
            if (topic === INPUT_TOPIC) {
                const transaction = JSON.parse(messageValue) as Transaction;
                // Simple audit logic: log high-value transactions for review
                if (transaction.amount > 10000) {
                    console.log(`[AUDIT-INPUT] High-value transaction detected: ${transaction.transactionId}, Amount: ${transaction.amount}`);
                }
            } else if (topic === OUTPUT_TOPIC) {
                const enriched = JSON.parse(messageValue) as EnrichedTransaction;
                // Alert on any flagged transaction
                if (enriched.isFlagged) {
                    console.log(`[AUDIT-OUTPUT] FRAUD FLAGGED: Transaction ${enriched.transactionId} for card ${enriched.cardId}. Reason: ${enriched.reason}`);
                }
            }
        } catch (error) {
            console.error(`Error processing message from topic ${topic}:`, error);
            // A common pitfall is to let deserialization errors crash the consumer.
            // Here we just log it, but a production system should move it to a dead-letter queue.
        }
    }

    public async shutdown(): Promise<void> {
        console.log('Shutting down auditor service...');
        await this.consumer.disconnect();
    }
}

const service = new AuditorService();
service.start();

// Graceful shutdown handling
const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];

errorTypes.forEach(type => {
  process.on(type, async e => {
    try {
      console.log(`process.on ${type}`);
      console.error(e);
      await service.shutdown();
      process.exit(1);
    } catch (_) {
      process.exit(1);
    }
  })
});

signalTraps.forEach(type => {
  process.once(type, async () => {
    try {
      await service.shutdown();
    } finally {
      process.kill(process.pid, type);
    }
  })
});

This TypeScript service also incorporates production-readiness patterns:

  1. Robust Connection and Retry: kafkajs is configured with a retry mechanism for broker connections.
  2. Graceful Shutdown: It properly handles POSIX signals (SIGINT, SIGTERM) to ensure the consumer can commit its offsets and disconnect cleanly.
  3. Error Isolation: Deserialization errors in the message handler are caught to prevent the entire consumer process from crashing.

By baking both this service and the Scala processor into the same Packer-built AMI, we guarantee that the Node.js runtime, its native dependencies, and all system libraries are identical, completely eliminating the class of bug that initiated this entire effort.

This immutable infrastructure approach is not without its trade-offs. The primary drawback is that the feedback loop during development can be slower. Creating a new AMI can take several minutes, compared to seconds for deploying a JAR or a container image. Furthermore, the resulting AMIs can be larger, and managing a registry of versioned AMIs requires disciplined lifecycle policies to clean up old images. For our use case, however, the guarantee of absolute environmental consistency and the resulting stability in our data processing pipeline far outweigh these costs. The system no longer suffers from “ghost” bugs, and our operational confidence has increased dramatically. Future work could involve optimizing the Packer build time with caching or exploring containerization on top of a base AMI for faster artifact deployment, but the core principle of an immutable, versioned base has proven its value.


  TOC