Deploying a Verifiable Pandas Data-Analysis Agent with LangChain and Terraform


The core pain point was enabling non-technical stakeholders to query complex financial datasets without giving them direct database access or relying on a perpetually backlogged analytics team. Initial experiments with general-purpose LLM chatbots were a disaster. For a query like “What was the total revenue for the ‘Widgets’ category in Q2, excluding promotional discounts?”, the models would confidently hallucinate incorrect figures, misinterpret column names, or fail at the simple arithmetic. The problem is that LLMs are language guessers, not deterministic calculators. We needed a system that could leverage the natural language understanding of an LLM but delegate the actual computation to a reliable, verifiable tool.

Our concept was an agent-based system where the LLM’s role is not to answer the question directly, but to generate Python code that can answer the question. This generated code would be exclusively pandas operations executed against a pre-loaded DataFrame. This architecture provides the best of both worlds: a natural language interface and deterministic, auditable data processing. However, a real-world project demands more than a Jupyter Notebook proof-of-concept. The entire environment must be secure, reproducible, and manageable as a single unit, especially when dealing with sensitive financial CSVs.

This led to our technology selection. Terraform became non-negotiable for infrastructure. Manually configuring a VPC, subnets, security groups, S3 buckets, and IAM roles for each deployment is a recipe for configuration drift and security holes. With Terraform, we define the entire secure enclave in code, guaranteeing consistency and allowing us to spin up and tear down the environment on demand. LangChain provides the agentic framework, specifically the AgentExecutor loop, saving us from reinventing the core Thought -> Action -> Observation logic. And Pandas, of course, is the execution engine—the ground truth for our data manipulation. We explicitly decided against the default create_pandas_dataframe_agent as it offered too little control over code execution and error handling for a production setting.

The Infrastructure Foundation with Terraform

Before writing a single line of Python, we defined the sandboxed environment. The principle of least privilege was paramount. The EC2 instance running our agent should only be able to access what it absolutely needs and be sealed off from public traffic as much as possible.

Here’s the Terraform configuration that defines our secure environment. In a real-world project, this would be broken into modules, but for clarity, it’s presented here in a single main.tf file.

# main.tf

# --- Provider and Backend Configuration ---
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }

  # In a production setup, use a remote backend like S3 for state locking and team collaboration.
  # backend "s3" {
  #   bucket         = "our-terraform-state-bucket"
  #   key            = "financial-agent/terraform.tfstate"
  #   region         = "us-east-1"
  #   encrypt        = true
  #   dynamodb_table = "terraform-lock-table"
  # }
}

provider "aws" {
  region = "us-east-1"
}

# --- Networking ---
resource "aws_vpc" "agent_vpc" {
  cidr_block = "10.0.0.0/16"
  tags = {
    Name = "FinancialAgentVPC"
  }
}

resource "aws_subnet" "agent_subnet" {
  vpc_id     = aws_vpc.agent_vpc.id
  cidr_block = "10.0.1.0/24"
  tags = {
    Name = "FinancialAgentSubnet"
  }
}

resource "aws_internet_gateway" "gw" {
  vpc_id = aws_vpc.agent_vpc.id
  tags = {
    Name = "FinancialAgentIGW"
  }
}

resource "aws_route_table" "agent_rt" {
  vpc_id = aws_vpc.agent_vpc.id

  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = aws_internet_gateway.gw.id
  }

  tags = {
    Name = "FinancialAgentRouteTable"
  }
}

resource "aws_route_table_association" "a" {
  subnet_id      = aws_subnet.agent_subnet.id
  route_table_id = aws_route_table.agent_rt.id
}


# --- Security ---
resource "aws_security_group" "agent_sg" {
  name        = "financial-agent-sg"
  description = "Allow SSH and App traffic"
  vpc_id      = aws_vpc.agent_vpc.id

  # Ingress: Allow SSH only from a specific Bastion Host or trusted IP range.
  # A common mistake is leaving this open to 0.0.0.0/0.
  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = ["YOUR_TRUSTED_IP/32"] # IMPORTANT: Change this!
  }

  # Ingress: Allow traffic to our application port from a trusted source.
  ingress {
    from_port   = 8000
    to_port     = 8000
    protocol    = "tcp"
    cidr_blocks = ["YOUR_TRUSTED_IP/32"] # IMPORTANT: Change this!
  }

  # Egress: Allow all outbound traffic for package installation and API calls.
  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "FinancialAgentSG"
  }
}

# --- Data Storage (S3) ---
resource "aws_s3_bucket" "data_bucket" {
  bucket = "financial-agent-data-bucket-${random_id.bucket_suffix.hex}"
  tags = {
    Name = "FinancialAgentData"
  }
}

resource "aws_s3_bucket_acl" "data_bucket_acl" {
  bucket = aws_s3_bucket.data_bucket.id
  acl    = "private"
}

resource "random_id" "bucket_suffix" {
  byte_length = 8
}

# --- IAM Role for EC2 ---
resource "aws_iam_role" "ec2_role" {
  name = "FinancialAgentEC2Role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "ec2.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "s3_read_policy" {
  name        = "FinancialAgentS3ReadPolicy"
  description = "Allow read-only access to the specific data bucket"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:GetObject",
          "s3:ListBucket"
        ],
        Effect   = "Allow",
        Resource = [
          aws_s3_bucket.data_bucket.arn,
          "${aws_s3_bucket.data_bucket.arn}/*"
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "s3_attach" {
  role       = aws_iam_role.ec2_role.name
  policy_arn = aws_iam_policy.s3_read_policy.arn
}

resource "aws_iam_instance_profile" "ec2_profile" {
  name = "FinancialAgentEC2InstanceProfile"
  role = aws_iam_role.ec2_role.name
}


# --- Compute (EC2) ---
data "aws_ami" "ubuntu" {
  most_recent = true
  filter {
    name   = "name"
    values = ["ubuntu/images/hvm-ssd/ubuntu-focal-20.04-amd64-server-*"]
  }
  filter {
    name   = "virtualization-type"
    values = ["hvm"]
  }
  owners = ["099720109477"] # Canonical
}

resource "aws_instance" "agent_server" {
  ami           = data.aws_ami.ubuntu.id
  instance_type = "t2.micro" # Choose instance type based on dataset size
  subnet_id     = aws_subnet.agent_subnet.id
  vpc_security_group_ids = [aws_security_group.agent_sg.id]
  iam_instance_profile = aws_iam_instance_profile.ec2_profile.name
  associate_public_ip_address = true # For initial setup and access
  key_name = "your-key-pair-name" # IMPORTANT: Use your own key pair

  # User data script to bootstrap the instance
  user_data = <<-EOF
              #!/bin/bash
              apt-get update
              apt-get install -y python3-pip python3-venv git
              
              # Setup application directory and user
              mkdir -p /app
              cd /app
              
              # NOTE: In a real CI/CD pipeline, you would pull from a specific commit hash.
              # This example uses main for simplicity.
              git clone https://github.com/your-repo/financial-agent.git .
              
              python3 -m venv venv
              source venv/bin/activate
              pip install -r requirements.txt
              
              # Set environment variables from a secure source like SSM Parameter Store or Vault
              # For this example, we'll create a .env file.
              echo 'OPENAI_API_KEY="your_openai_api_key"' > .env
              echo 'DATA_BUCKET_NAME="${aws_s3_bucket.data_bucket.bucket}"' >> .env
              echo 'DATA_FILE_KEY="transactions.csv"' >> .env
              
              # Run the application
              # A production setup would use a process manager like systemd or gunicorn.
              # uvicorn main:app --host 0.0.0.0 --port 8000
              EOF

  tags = {
    Name = "FinancialAgentServer"
  }
}

# --- Outputs ---
output "instance_public_ip" {
  value = aws_instance.agent_server.public_ip
}

output "s3_bucket_name" {
  value = aws_s3_bucket.data_bucket.bucket
}

Running terraform apply with this configuration creates a fully isolated environment. The EC2 instance can’t access anything on AWS except for reading from the designated S3 bucket, and network access is tightly controlled. This infrastructure-level security is the bedrock of the entire solution.

Building a Production-Ready LangChain Agent

With the infrastructure defined, we moved to the application logic. The core is a FastAPI service that exposes a single endpoint for queries.

graph TD
    A[User with curl] --> B{EC2 Public IP:8000};
    B --> C[FastAPI Endpoint: /query];
    C --> D[LangChain AgentExecutor];
    D -- Thought --> E{LLM API};
    E -- Action (Python Code) --> D;
    D -- Action --> F[Custom PandasTool];
    F -- Executes Code --> G[(DataFrame in Memory)];
    G -- Result/Error --> F;
    F -- Observation --> D;
    D -- Final Answer --> C;
    C --> A;

    subgraph "Terraform-Managed AWS Environment"
        B
        C
        D
        F
        G
        H[S3 Bucket]
    end

    style G fill:#f9f,stroke:#333,stroke-width:2px
    style F fill:#ccf,stroke:#333,stroke-width:2px

The crucial design decision was to build a custom tool instead of using the default agent. A custom tool gives us three critical capabilities: robust error handling, stdout capturing, and execution logging.

Here is the complete Python application (main.py):

# main.py

import os
import sys
import io
import logging
from contextlib import redirect_stdout
from typing import Dict, Any

import pandas as pd
import boto3
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv

from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain.pydantic_v1 import Field
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI

# --- Configuration and Logging ---
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Global Variables ---
# In a larger application, this would be managed more elegantly,
# but for a single-purpose agent, loading at startup is fine.
app = FastAPI()
df: pd.DataFrame = None
agent_executor: AgentExecutor = None

# --- Pydantic Models for API ---
class QueryRequest(BaseModel):
    query: str

class QueryResponse(BaseModel):
    answer: str
    generated_code: str
    intermediate_steps: list

# --- Custom LangChain Tool for Safe Pandas Execution ---
class PandasExecutorTool(BaseTool):
    """
    A tool that safely executes pandas code against a pre-loaded DataFrame.
    It captures stdout and handles exceptions to provide feedback to the LLM.
    """
    name: str = "python_pandas_executor"
    description: str = (
        "Executes python code using a pre-loaded pandas DataFrame named 'df'. "
        "The code should be a valid, single-line or multi-line script that performs "
        "read-only analysis. Use 'print()' to output results. "
        "Only pandas and standard python libraries are available."
    )
    df: pd.DataFrame = Field(default=None, exclude=True)

    def _run(self, code_snippet: str) -> str:
        """Execute the python code."""
        local_namespace = {'df': self.df}
        
        # A common pitfall is not handling stdout. The LLM relies on the output of print()
        # statements to see the result of its code. We must capture it.
        output_buffer = io.StringIO()
        
        try:
            logging.info(f"Executing code snippet:\n---\n{code_snippet}\n---")
            
            # The use of exec is a calculated risk. The prompt engineering and
            # the limited environment (no fs/network libraries imported by default)
            # are our primary layers of defense.
            with redirect_stdout(output_buffer):
                exec(code_snippet, {'pd': pd}, local_namespace)
            
            output = output_buffer.getvalue()
            
            # If there's no explicit print, the output might be empty.
            # We can provide a default success message.
            if not output:
                output = "Code executed successfully with no direct output. If you expected a result, use the print() function."
            
            logging.info(f"Execution successful. Output:\n{output}")
            return output

        except Exception as e:
            # Providing detailed error messages back to the LLM is critical for self-correction.
            error_message = f"Error executing code: {type(e).__name__} - {e}"
            logging.error(error_message)
            return error_message

    async def _arun(self, code_snippet: str) -> str:
        # This is a synchronous tool, so we can just wrap the sync version.
        return self._run(code_snippet)


# --- Application Startup Event ---
@app.on_event("startup")
def startup_event():
    global df, agent_executor
    
    bucket_name = os.getenv("DATA_BUCKET_NAME")
    file_key = os.getenv("DATA_FILE_KEY")
    
    if not bucket_name or not file_key:
        raise ValueError("S3 bucket name and file key must be set in environment.")

    try:
        logging.info(f"Loading data from S3 bucket '{bucket_name}' with key '{file_key}'...")
        s3 = boto3.client('s3')
        obj = s3.get_object(Bucket=bucket_name, Key=file_key)
        df = pd.read_csv(io.BytesIO(obj['Body'].read()))
        logging.info(f"DataFrame loaded successfully. Shape: {df.shape}")
        logging.info(f"Columns: {df.columns.tolist()}")
    except Exception as e:
        logging.error(f"Failed to load data from S3: {e}")
        # In a real app, you might want to exit or enter a degraded state.
        raise

    # --- Agent and Executor Initialization ---
    llm = ChatOpenAI(temperature=0, model="gpt-4-turbo-preview")
    tools = [PandasExecutorTool(df=df)]
    
    # This prompt is the most critical part for security and correctness.
    # It strictly defines the LLM's capabilities and constraints.
    prompt = ChatPromptTemplate.from_messages([
        (
            "system",
            "You are a highly skilled data analyst. You have access to a pandas DataFrame named 'df'. "
            "Your task is to answer user questions about this DataFrame by generating and executing Python code. "
            "You must use the 'python_pandas_executor' tool to run your code. "
            "Constraints:\n"
            "1. Only generate code for the provided tool. Do not try to call any other functions.\n"
            "2. The code must be read-only. Do not attempt to modify the DataFrame, write files, or make network requests.\n"
            "3. Use the 'print()' function to display the final result or any intermediate data you wish to inspect.\n"
            "4. The DataFrame columns are: " + ", ".join(df.columns) + ".\n"
            "5. Respond directly with the tool call. Do not add conversational fluff."
        ),
        ("user", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ])

    llm_with_tools = llm.bind_functions(tools)

    agent = (
        {
            "input": lambda x: x["input"],
            "agent_scratchpad": lambda x: format_to_openai_function_messages(x["intermediate_steps"]),
        }
        | prompt
        | llm_with_tools
        | OpenAIFunctionsAgentOutputParser()
    )

    agent_executor = AgentExecutor(
        agent=agent, 
        tools=tools, 
        verbose=True, 
        return_intermediate_steps=True,
        handle_parsing_errors=True # Crucial for production stability
    )

# --- API Endpoint ---
@app.post("/query", response_model=QueryResponse)
async def query_data(request: QueryRequest):
    if not agent_executor:
        raise HTTPException(status_code=503, detail="Agent not initialized. Check logs for errors.")

    try:
        response = await agent_executor.ainvoke({"input": request.query})
        
        # Extract the executed code from the intermediate steps for auditability
        executed_code = "No code executed."
        for step in response.get("intermediate_steps", []):
            tool_input = step[0].tool_input
            if isinstance(tool_input, dict) and 'code_snippet' in tool_input:
                executed_code = tool_input['code_snippet']
                break # Assuming one code execution step for simplicity

        return QueryResponse(
            answer=response.get("output", "No answer found."),
            generated_code=executed_code,
            intermediate_steps=response.get("intermediate_steps", [])
        )
    except Exception as e:
        logging.error(f"Error during agent invocation: {e}")
        raise HTTPException(status_code=500, detail=str(e))

The requirements.txt for this project is straightforward:

# requirements.txt
fastapi
uvicorn[standard]
python-dotenv
pandas
boto3
langchain
langchain-openai

Testing and Verification

A critical part of any production system is testing. We use pytest to write a simple integration test that verifies the endpoint and mocks the LLM to ensure our logic is sound, independent of the model’s performance.

# test_app.py

import pytest
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
import pandas as pd

# This is a trick to load the app object for testing.
# It assumes the test is run from the project root.
from main import app, startup_event

# Mock the startup event to prevent actual S3 calls during testing
@pytest.fixture(scope="module", autouse=True)
def mock_startup():
    with patch('main.boto3.client') as mock_s3_client:
        # Create a mock DataFrame
        mock_df = pd.DataFrame({
            'region': ['North', 'North', 'South'],
            'revenue': [100, 150, 200],
            'cost': [80, 120, 150]
        })
        
        # Mock the S3 get_object call
        mock_s3_object = {
            'Body': MagicMock(read=lambda: mock_df.to_csv(index=False).encode())
        }
        mock_s3_client.return_value.get_object.return_value = mock_s3_object
        
        # Manually trigger the startup logic with mocks in place
        startup_event()
        yield

client = TestClient(app)

@patch('main.agent_executor.ainvoke')
def test_query_endpoint_success(mock_ainvoke):
    # Arrange: Define the mock response from the agent executor
    mock_ainvoke.return_value = {
        "output": "The total revenue for the North region is 250.",
        "intermediate_steps": [
            (
                MagicMock(tool_input={'code_snippet': "print(df[df['region'] == 'North']['revenue'].sum())"}),
                "250"
            )
        ]
    }
    
    # Act: Call the API endpoint
    response = client.post("/query", json={"query": "total revenue for North"})
    
    # Assert: Check the HTTP status and response body
    assert response.status_code == 200
    data = response.json()
    assert data["answer"] == "The total revenue for the North region is 250."
    assert "print(df[df['region'] == 'North']['revenue'].sum())" in data["generated_code"]
    mock_ainvoke.assert_called_once_with({"input": "total revenue for North"})

def test_query_endpoint_agent_failure():
    # Arrange: Temporarily break the agent executor
    original_agent = app.dependency_overrides.get('agent_executor')
    app.dependency_overrides['agent_executor'] = None

    response = client.post("/query", json={"query": "test"})
    
    assert response.status_code == 503
    assert "Agent not initialized" in response.json()['detail']
    
    # Clean up
    app.dependency_overrides['agent_executor'] = original_agent

After deploying with terraform apply and placing a transactions.csv in the newly created S3 bucket, we can query the running service:

curl -X POST http://<EC2_PUBLIC_IP>:8000/query \
-H "Content-Type: application/json" \
-d '{
  "query": "What is the total revenue for each region? Show the result."
}'

The response is a JSON object containing not just the answer, but the exact code that was run to generate it. This provides the audit trail that was completely missing from our initial chatbot experiments. Stakeholders can see the logic, and we can deterministically verify its correctness.

The current architecture, while robust, still has limitations. The exec call, though constrained, remains a potential security concern. A more advanced implementation would parse the LLM-generated code into an Abstract Syntax Tree (AST) and validate it against an allow-list of safe nodes and function calls before execution, effectively creating a domain-specific language for pandas operations. Furthermore, the system is stateless and synchronous; it cannot handle conversational follow-ups, and concurrent requests will be queued. A logical next step would be to introduce Redis (also managed by Terraform) for caching DataFrames and managing conversation state, and to offload agent execution to a Celery worker queue for asynchronous processing, making the API endpoint itself lightweight and highly responsive.


  TOC