Implementing a Verifiable Delta Lake Commit Hook with Vault's Transit Engine and AWS SNS


The central problem was decoupling and security. Our core data platform revolves around a set of critical Delta Lake tables, updated by dozens of disparate Spark jobs. Downstream consumers—ranging from cache invalidation services to machine learning model retraining pipelines and regulatory audit loggers—needed to react to these changes. The initial approach of having consumers poll the Delta transaction log (_delta_log) was brittle and inefficient. It placed a significant load on our storage layer and created a tight coupling between the data producers and consumers.

A direct push model, where the Spark job itself would notify downstream systems, seemed like a logical next step. However, this introduced a severe security challenge. Giving every Spark job credentials to access various downstream systems (message queues, APIs, etc.) was a non-starter. Our security policy mandates zero long-lived credentials within the compute environment. Ephemeral instance profiles were a partial solution but lacked the granular control required, especially in a multi-tenant cluster where jobs with different privilege levels run side-by-side. We needed a mechanism that was secure, auditable, and truly decoupled.

The concept that emerged was a transactional outbox pattern, implemented as a CommitHook within Delta Lake itself. The hook would fire after a transaction successfully committed to the Delta log. Its sole responsibility would be to broadcast a notification of the commit. But how could we do this without embedding credentials? And how could consumers trust that a notification was legitimate and not spoofed? This led us to a tripartite architecture: Delta Lake for the transactional context, AWS SNS for scalable, asynchronous message broadcasting, and HashiCorp Vault as the linchpin for security and trust. Vault’s Transit Secrets Engine became the core of the solution, allowing us to sign commit metadata without the Spark job ever touching a private key.

The Architectural Blueprint: Trust Through Delegation

Before diving into the implementation, it’s crucial to understand the flow of control and trust. The goal is to have a Spark job, which is considered untrusted from a credential-management perspective, trigger a verifiable notification.

sequenceDiagram
    participant Spark Job
    participant Delta Lake
    participant Custom CommitHook
    participant HashiCorp Vault
    participant AWS SNS
    participant Downstream Consumer

    Spark Job ->> Delta Lake: Commit Transaction (df.write.save())
    Delta Lake ->> Custom CommitHook: invoke(commitInfo)
    Custom CommitHook ->> HashiCorp Vault: 1. Authenticate (AppRole)
    Note over Custom CommitHook, HashiCorp Vault: Receives Vault Token
    Custom CommitHook ->> HashiCorp Vault: 2. Sign commitInfo JSON (Transit Engine)
    HashiCorp Vault -->> Custom CommitHook: Returns Signature
    Custom CommitHook ->> HashiCorp Vault: 3. Request temporary AWS Creds (AWS Engine)
    HashiCorp Vault -->> Custom CommitHook: Returns temp AccessKey, SecretKey, SessionToken
    Custom CommitHook ->> AWS SNS: 4. Publish (signed commitInfo) using temp creds
    AWS SNS -->> Custom CommitHook: Acknowledge
    Custom CommitHook -->> Delta Lake: Return (Success)
    Delta Lake -->> Spark Job: Transaction Complete

    Note right of AWS SNS: Message delivered to SQS queue
    Downstream Consumer ->> AWS SNS: Polls message from SQS
    Downstream Consumer ->> HashiCorp Vault: 5. Verify Signature (Transit Engine)
    HashiCorp Vault -->> Downstream Consumer: Verification Result (valid/invalid)
    alt Signature is Valid
        Downstream Consumer ->> Downstream Consumer: Process Event
    else Signature is Invalid
        Downstream Consumer ->> Downstream Consumer: Discard/Alert
    end

This architecture achieves our goals:

  1. Zero Credentials in Spark: The Spark job only needs a Vault AppRole ID and Secret ID to authenticate. It never sees AWS credentials or signing keys.
  2. Verifiability: The downstream consumer can cryptographically verify that the notification originated from an authorized process (one that could talk to Vault) and that the payload hasn’t been tampered with.
  3. Decoupling: The Spark job knows nothing about the consumers. It just fires a notification into the void via SNS. We can add or remove consumers without any changes to the data-producing jobs.

Phase 1: Configuring the Infrastructure with Terraform

The foundation of this system is correctly configured infrastructure. Using Terraform ensures it’s repeatable and auditable.

HashiCorp Vault Configuration

First, we enable the Vault Transit and AWS secrets engines and configure the necessary roles and policies.

# main.tf

# Enable the Transit Secrets Engine for cryptographic operations
resource "vault_mount" "transit" {
  path = "transit"
  type = "transit"
  description = "Handles cryptographic signing of Delta commit metadata"
}

# Create a key specifically for signing Delta commits
resource "vault_transit_secret_backend_key" "delta_commit_key" {
  backend = vault_mount.transit.path
  name    = "delta-commit-signer"
  type    = "rsa-2048" # A suitable signing algorithm
}

# Enable the AWS Secrets Engine to generate temporary credentials
resource "vault_mount" "aws" {
  path = "aws"
  type = "aws"
  description = "Generates dynamic AWS credentials for Spark jobs"
}

# Configure Vault's access to AWS. Assumes Vault server has an IAM role.
resource "vault_aws_secret_backend" "config" {
  backend     = vault_mount.aws.path
  access_key  = var.aws_access_key # Vault's own AWS credentials
  secret_key  = var.aws_secret_key
  region      = var.aws_region
}

# Define an IAM policy in JSON that will be attached to the temporary credentials.
# This policy grants the minimum required permission: publishing to a specific SNS topic.
data "aws_iam_policy_document" "sns_publish_policy" {
  statement {
    effect    = "Allow"
    actions   = ["sns:Publish"]
    resources = [var.sns_topic_arn]
  }
}

# Create a role in the Vault AWS secrets engine. When Spark requests credentials
# for this role, Vault will create a user with the specified policy.
resource "vault_aws_secret_backend_role" "sns_publisher_role" {
  backend         = vault_mount.aws.path
  name            = "delta-sns-publisher"
  credential_type = "iam_user"
  policy_document = data.aws_iam_policy_document.sns_publish_policy.json
  default_sts_ttl = "5m"  # Credentials are valid for only 5 minutes
  max_sts_ttl     = "10m"
}

# --- AppRole Authentication for Spark ---

resource "vault_auth_backend" "approle" {
  type = "approle"
}

# Create a Vault policy that grants access to the transit sign and AWS creds endpoints
resource "vault_policy" "spark_commit_hook_policy" {
  name = "spark-commit-hook-policy"

  policy = <<EOT
# Allow signing operations using the specific key
path "transit/sign/delta-commit-signer" {
  capabilities = ["update"]
}

# Allow reading the public part of the key for verification purposes (useful for consumers)
path "transit/keys/delta-commit-signer" {
  capabilities = ["read"]
}

# Allow retrieving temporary AWS credentials from the specific role
path "aws/sts/delta-sns-publisher" {
  capabilities = ["read"]
}
EOT
}

# Create an AppRole for Spark jobs to use for authentication
resource "vault_approle_auth_backend_role" "spark_approle" {
  backend        = vault_auth_backend.approle.path
  role_name      = "spark-job-role"
  token_policies = [vault_policy.spark_commit_hook_policy.name]
  token_ttl      = "20m"
  token_max_ttl  = "30m"
}

# Output the RoleID and generate a SecretID for the application
resource "vault_approle_auth_backend_role_secret_id" "id" {
  backend   = vault_auth_backend.approle.path
  role_name = vault_approle_auth_backend_role.spark_approle.role_name
}

output "spark_approle_role_id" {
  value = vault_approle_auth_backend_role.spark_approle.role_id
}

output "spark_approle_secret_id" {
  value     = vault_approle_auth_backend_role_secret_id.id.secret_id
  sensitive = true
}

A critical point here is the principle of least privilege. The Vault policy spark-commit-hook-policy only grants the permissions to sign with one specific key and generate credentials from one specific AWS role. The IAM policy attached to that AWS role only allows publishing to one specific SNS topic.

AWS SNS Topic Configuration

The AWS side is simpler. We just need an SNS topic.

# aws.tf

variable "sns_topic_name" {
  description = "Name of the SNS topic for Delta commit notifications"
  type        = string
  default     = "delta-commit-notifications"
}

resource "aws_sns_topic" "delta_commits" {
  name = var.sns_topic_name
}

# For testing, we can create an SQS queue and subscribe it to the topic
resource "aws_sqs_queue" "commit_consumer_queue" {
  name = "delta-commit-consumer-queue"
}

resource "aws_sns_topic_subscription" "queue_subscription" {
  topic_arn = aws_sns_topic.delta_commits.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.commit_consumer_queue.arn
}

# The queue needs a policy to allow SNS to send messages to it.
data "aws_iam_policy_document" "sqs_policy" {
  statement {
    effect  = "Allow"
    actions = ["sqs:SendMessage"]
    resources = [aws_sqs_queue.commit_consumer_queue.arn]
    principals {
      type        = "Service"
      identifiers = ["sns.amazonaws.com"]
    }
    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"
      values   = [aws_sns_topic.delta_commits.arn]
    }
  }
}

resource "aws_sqs_queue_policy" "default" {
  queue_url = aws_sqs_queue.commit_consumer_queue.id
  policy    = data.aws_iam_policy_document.sqs_policy.json
}

output "sns_topic_arn" {
    value = aws_sns_topic.delta_commits.arn
}

With the infrastructure defined, we can now build the application logic that runs inside Spark.

Phase 2: The Scala Commit Hook Implementation

This is the heart of the solution. We’ll create a Scala class that implements Delta Lake’s io.delta.hooks.CommitHook trait. This requires packaging it into a JAR and making it available on the Spark classpath.

Let’s break down the build.sbt dependencies first.

// build.sbt
name := "delta-secure-commithook"
version := "0.1.0"
scalaVersion := "2.12.15" // Match your Spark version's Scala version

libraryDependencies ++= Seq(
  "io.delta" %% "delta-core" % "2.4.0" % "provided", // Delta Core is provided by Spark
  "org.apache.spark" %% "spark-core" % "3.4.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.4.1" % "provided",
  
  // Vault Java Client for communication with HashiCorp Vault
  "com.bettercloud" % "vault-java-driver" % "5.1.0",

  // AWS SDK v2 for SNS. We need the BOM to manage transitive dependencies.
  "software.amazon.awssdk" % "bom" % "2.20.28" % "import",
  "software.amazon.awssdk"s" "sns",
  "software.amazon.awssdk" % "sts",

  // JSON library for creating payloads
  "com.lihaoyi" %% "upickle" % "3.1.0",
  
  // Logging framework
  "org.slf4j" % "slf4j-api" % "1.7.32",
  "ch.qos.logback" % "logback-classic" % "1.2.9" % "runtime"
)

Now, for the core implementation of the hook. A real-world project would use a proper dependency injection framework, but for clarity, we’ll instantiate clients directly. Error handling is paramount here; a failure in the hook should not crash the Spark job, but it must be logged aggressively.

// src/main/scala/com/example/delta/hooks/VaultSnsCommitHook.scala
package com.example.delta.hooks

import com.bettercloud.vault.Vault
import com.bettercloud.vault.api.Auth
import com.bettercloud.vault.VaultConfig
import com.bettercloud.vault.response.LogicalResponse
import io.delta.hooks.CommitHook
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.CommitInfo
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsSessionCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.PublishRequest

import java.util.Base64
import scala.util.{Failure, Success, Try}

case class CommitNotification(
  tableName: String,
  tableVersion: Long,
  operation: String,
  operationParameters: Map[String, String],
  commitTimestamp: Long,
  operationMetrics: Map[String, String],
  isBlindAppend: Option[Boolean]
)

// Define implicit readers and writers for JSON serialization
object CommitNotification {
  implicit val rw: upickle.default.ReadWriter[CommitNotification] = upickle.default.macroRW
}

case class SignedCommitNotification(
  payload: String, // Base64 encoded CommitNotification JSON
  signature: String // Vault signature
)

object SignedCommitNotification {
  implicit val rw: upickle.default.ReadWriter[SignedCommitNotification] = upickle.default.macroRW
}


class VaultSnsCommitHook extends CommitHook {
  
  private val logger = LoggerFactory.getLogger(classOf[VaultSnsCommitHook])

  // All configuration is read from SparkConf for maximum flexibility.
  // This avoids hardcoding values.
  private lazy val vaultAddr = spark.conf.get("spark.delta.commitHook.vault.addr")
  private lazy val vaultRoleId = spark.conf.get("spark.delta.commitHook.vault.approle.roleId")
  private lazy val vaultSecretId = spark.conf.get("spark.delta.commitHook.vault.approle.secretId")
  private lazy val vaultTransitKey = spark.conf.get("spark.delta.commitHook.vault.transitKeyName")
  private lazy val vaultAwsRole = spark.conf.get("spark.delta.commitHook.vault.awsRoleName")
  private lazy val awsRegion = spark.conf.get("spark.delta.commitHook.aws.region")
  private lazy val snsTopicArn = spark.conf.get("spark.delta.commitHook.aws.snsTopicArn")
  
  @transient private var spark: SparkSession = _
  @transient private var vaultClient: Vault = _

  override def name(): String = "VaultSnsCommitHook"

  /**
   * This method is called once per transaction on the driver.
   * @param name The name of the hook specified in the configuration.
   * @param commitInfo Information about the commit.
   */
  override def execute(name: String, commitInfo: CommitInfo): Unit = {
    logger.info(s"VaultSnsCommitHook triggered for table version ${commitInfo.getVersion}")
    
    // Lazily initialize Spark session and Vault client on first execution
    if (spark == null) {
      spark = SparkSession.active
    }
    if (vaultClient == null) {
      initializeVaultClient()
    }

    Try {
      // Step 1: Construct the notification payload
      val payload = createPayload(commitInfo)
      val payloadJson = upickle.default.write(payload)
      val payloadBase64 = Base64.getEncoder.encodeToString(payloadJson.getBytes("UTF-8"))

      // Step 2: Sign the payload using Vault's Transit Engine
      val signature = signPayload(payloadBase64)
      logger.debug("Payload successfully signed by Vault Transit engine.")

      // Step 3: Create the final signed message
      val signedMessage = SignedCommitNotification(payloadBase64, signature)
      val messageBody = upickle.default.write(signedMessage)

      // Step 4: Get temporary AWS credentials from Vault
      val awsCredentials = getTemporaryAwsCredentials()
      logger.debug("Successfully retrieved temporary AWS credentials from Vault.")
      
      // Step 5: Publish to SNS using the temporary credentials
      publishToSns(messageBody, awsCredentials)
      logger.info(s"Successfully published signed notification for table version ${commitInfo.getVersion} to SNS topic ${snsTopicArn}")

    } match {
      case Success(_) => // All good
      case Failure(e) => 
        // A common mistake is to let exceptions from the hook propagate.
        // This could fail the entire Spark job. We must catch and log.
        logger.error("Failed to execute VaultSnsCommitHook. The Delta transaction has already committed, but the notification failed.", e)
        // In a production system, this error should be sent to a dead-letter queue or an alerting system.
    }
  }
  
  private def initializeVaultClient(): Unit = {
    logger.info(s"Initializing Vault client for address: $vaultAddr")
    val config = new VaultConfig()
      .address(vaultAddr)
      .build()
    val vault = new Vault(config)

    // Authenticate using AppRole
    val authResponse = vault.auth().loginByAppRole(Auth.LoginMethod.APPROLE, vaultRoleId, vaultSecretId)
    config.token(authResponse.getAuthClientToken).build()
    
    this.vaultClient = vault
    logger.info("Vault client initialized and authenticated via AppRole.")
  }

  private def createPayload(commitInfo: CommitInfo): CommitNotification = {
    CommitNotification(
      tableName = commitInfo.getTableName.getOrElse("UNKNOWN"),
      tableVersion = commitInfo.getVersion.getOrElse(-1L),
      operation = commitInfo.getOperation,
      operationParameters = commitInfo.getOperationParameters.getOrElse(Map.empty),
      commitTimestamp = commitInfo.getTimestamp,
      operationMetrics = commitInfo.getOperationMetrics.getOrElse(Map.empty),
      isBlindAppend = commitInfo.getIsBlindAppend
    )
  }

  private def signPayload(payloadBase64: String): String = {
    Try(vaultClient.logical().write(s"transit/sign/$vaultTransitKey", Map("input" -> payloadBase64)))
      .map(_.getData.get("signature"))
      .getOrElse(throw new RuntimeException("Failed to sign payload with Vault"))
  }

  private def getTemporaryAwsCredentials(): AwsSessionCredentials = {
    val response: LogicalResponse = Try(vaultClient.logical().read(s"aws/sts/$vaultAwsRole"))
      .getOrElse(throw new RuntimeException(s"Failed to get temporary AWS credentials for role $vaultAwsRole from Vault"))
    
    val accessKey = response.getData.get("access_key")
    val secretKey = response.getData.get("secret_key")
    val sessionToken = response.getData.get("security_token")

    if (accessKey == null || secretKey == null || sessionToken == null) {
      throw new RuntimeException("Vault response for AWS credentials did not contain all required keys.")
    }
    AwsSessionCredentials.create(accessKey, secretKey, sessionToken)
  }

  private def publishToSns(messageBody: String, creds: AwsSessionCredentials): Unit = {
    val snsClient: SnsClient = SnsClient.builder()
      .region(Region.of(awsRegion))
      .credentialsProvider(StaticCredentialsProvider.create(creds))
      .build()
      
    try {
      val request = PublishRequest.builder()
        .topicArn(snsTopicArn)
        .message(messageBody)
        .build()
      snsClient.publish(request)
    } finally {
      // It's crucial to close the client to release resources.
      snsClient.close()
    }
  }
}

Phase 3: Deployment and Execution

To use this hook, we need to package it into a JAR and configure our Spark job to use it.

  1. Package the JAR: Run sbt package in the project directory. This will create delta-secure-commithook_2.12-0.1.0.jar in the target/scala-2.12/ folder.

  2. Configure the Spark Job: When submitting the Spark application, you need to provide the JAR and the necessary configuration properties.

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --jars /path/to/delta-secure-commithook_2.12-0.1.0.jar \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf "spark.databricks.delta.commitHook.enabled=true" \
  --conf "spark.databricks.delta.commitHook.vault.class=com.example.delta.hooks.VaultSnsCommitHook" \
  --conf "spark.delta.commitHook.vault.addr=https://vault.example.com" \
  --conf "spark.delta.commitHook.vault.approle.roleId=..." \
  --conf "spark.delta.commitHook.vault.approle.secretId=..." \
  --conf "spark.delta.commitHook.vault.transitKeyName=delta-commit-signer" \
  --conf "spark.delta.commitHook.vault.awsRoleName=delta-sns-publisher" \
  --conf "spark.delta.commitHook.aws.region=us-east-1" \
  --conf "spark.delta.commitHook.aws.snsTopicArn=arn:aws:sns:us-east-1:123456789012:delta-commit-notifications" \
  my_spark_application.py

A common mistake here is a misconfiguration of the class name or forgetting to enable the hook globally with spark.databricks.delta.commitHook.enabled=true. The spark.databricks.delta.commitHook.vault.class tells Delta which hook implementation to load, using vault as the hook’s name.

Phase 4: The Verifying Consumer

The final piece is the downstream consumer. This service subscribes to the SNS topic (likely via an SQS queue for durability) and processes the notifications. Its most important job is to verify the signature before acting on the message. Here is a Python example using the hvac library for Vault.

# consumer.py
import boto3
import hvac
import json
import base64
import os
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Configuration from environment variables ---
VAULT_ADDR = os.environ.get("VAULT_ADDR")
VAULT_TOKEN = os.environ.get("VAULT_TOKEN") # For simplicity; use AppRole in production
VAULT_TRANSIT_KEY_NAME = "delta-commit-signer"
SQS_QUEUE_URL = os.environ.get("SQS_QUEUE_URL")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")

class CommitVerifier:
    def __init__(self, vault_addr, vault_token):
        self.vault_client = hvac.Client(url=vault_addr, token=vault_token)
        if not self.vault_client.is_authenticated():
            raise Exception("Vault authentication failed.")
        logging.info("Successfully authenticated with Vault.")

    def verify_signature(self, payload_b64, signature):
        """
        Verifies the signature of the payload using Vault's Transit engine.
        """
        try:
            verify_response = self.vault_client.secrets.transit.verify_signed_data(
                name=VAULT_TRANSIT_KEY_NAME,
                signed_data=signature,
                input_data=payload_b64,
            )
            return verify_response['data']['valid']
        except hvac.exceptions.InvalidRequest as e:
            logging.error(f"Vault verification request failed: {e}")
            return False
        except Exception as e:
            logging.error(f"An unexpected error occurred during verification: {e}")
            return False

def process_message(message_body, verifier):
    """
    Processes a single message from SQS.
    """
    try:
        signed_notification = json.loads(message_body)
        payload_b64 = signed_notification['payload']
        signature = signed_notification['signature']

        logging.info("Verifying signature for incoming message...")
        is_valid = verifier.verify_signature(payload_b64, signature)

        if is_valid:
            logging.info("Signature is VALID. Processing payload.")
            # The payload itself is base64 encoded JSON
            payload_json = base64.b64decode(payload_b64).decode('utf-8')
            commit_info = json.loads(payload_json)
            logging.info(f"Processed commit for table '{commit_info['tableName']}' version {commit_info['tableVersion']}")
            # --- ADD YOUR BUSINESS LOGIC HERE ---
        else:
            logging.warning("Signature is INVALID. Discarding message.")
            # This should trigger a high-priority alert.
            
    except (json.JSONDecodeError, KeyError) as e:
        logging.error(f"Failed to parse message body: {e}. Message: {message_body}")
    except Exception as e:
        logging.error(f"An error occurred processing message: {e}")

def main():
    sqs = boto3.client('sqs', region_name=AWS_REGION)
    verifier = CommitVerifier(VAULT_ADDR, VAULT_TOKEN)
    
    logging.info(f"Starting consumer, polling SQS queue: {SQS_QUEUE_URL}")
    while True:
        response = sqs.receive_message(
            QueueUrl=SQS_QUEUE_URL,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=20 # Use long polling
        )
        
        messages = response.get('Messages', [])
        if not messages:
            continue
            
        for message in messages:
            # The actual message from SNS is nested inside the SQS message body
            sns_message = json.loads(message['Body'])
            process_message(sns_message['Message'], verifier)
            
            # Delete the message from the queue after processing
            sqs.delete_message(
                QueueUrl=SQS_QUEUE_URL,
                ReceiptHandle=message['ReceiptHandle']
            )

if __name__ == "__main__":
    main()

This consumer completes the circle. It refuses to process any notification that cannot be cryptographically verified, ensuring the integrity of the entire event-driven pipeline.

Limitations and Future Paths

While this architecture provides a robust solution for secure, asynchronous notifications, it’s not without its trade-offs. The primary drawback is the introduction of latency into the commit path of every Delta transaction. Each commit now incurs the overhead of at least three network round-trips: Vault authentication (if the token expires), payload signing, and fetching AWS credentials, followed by the call to SNS. For high-throughput streaming jobs with sub-second micro-batches, this added latency could be prohibitive.

Furthermore, the hook’s failure model is a critical consideration. If the hook fails after the Delta transaction has committed, the data is updated, but the notification is lost. The current implementation logs this failure, but this results in a state inconsistency between the data and the event stream. A more resilient design would implement the transactional outbox pattern more faithfully: the commit hook would write the notification payload to a separate, highly-available “outbox” Delta table within the same transaction. A separate, independent process would then tail this outbox table, send the notifications, and mark them as sent. This would fully decouple the commit’s success from the notification’s delivery, but at the cost of significantly increased complexity.


  TOC