Implementing a Scalable Stateful Computer Vision Pipeline with Flutter OpenCV and Immutable Terraform Infrastructure


The initial proof-of-concept was a disaster. We had a single, powerful EC2 instance running a Python Flask server. A Flutter application on Android would stream video, and the server would use OpenCV to perform stateful object tracking. It worked perfectly with one, maybe two, concurrent clients. At ten, the server’s CPU was pinned, memory swapping became audible, and tracking states for different streams started bleeding into each other due to subtle race conditions in the state management logic. The entire system collapsed. It became clear that a monolithic architecture for stateful, long-running computational tasks is a direct path to operational failure. Each video stream required its own isolated CPU, memory, and state. The problem wasn’t the OpenCV logic; it was the entire architectural paradigm.

Our second concept was simple in theory: one stream, one dedicated processing environment. This isolation would solve the resource contention and state corruption issues. The challenge shifted from application logic to infrastructure orchestration. How do we provision, manage, and decommission thousands of these isolated environments on demand, without an army of operations engineers? The answer had to be automation, and not just scripting. We needed a declarative, reproducible, and immutable approach. This led us down the path of Infrastructure as Code with Terraform, managing a fleet of ephemeral workers that our Flutter client would communicate with indirectly.

The core technology selection was a negotiation between business requirements and production realities.

  • Flutter on Android: The requirement was a cross-platform mobile client. Flutter provided excellent performance for camera access via platform channels and a UI toolkit that allowed for rapid iteration. We would focus on the Android implementation first, but the groundwork for iOS would be there from day one.
  • OpenCV in a Python Container: Python remains the lingua franca of computer vision. The wealth of libraries and community support for OpenCV makes it the pragmatic choice. Encapsulating the processing logic within a Docker container was non-negotiable. It decouples the application from the underlying machine image, which is a cornerstone of immutable infrastructure.
  • Terraform: We explicitly chose Terraform over direct cloud provider SDKs or configuration management tools like Ansible. A common mistake is to use configuration management tools for provisioning. Terraform’s strength is in declaring the desired state of the infrastructure itself—VPCs, subnets, security groups, compute instances. It manages the lifecycle of these resources, which is precisely what we needed for our ephemeral workers. We would build our system on AWS, leveraging its mature ecosystem of services.

The architecture we settled on is an event-driven, edge-to-cloud pipeline. It’s not a simple request-response model. The mobile client is the edge, initiating a processing job. The cloud is a factory floor, spinning up a dedicated assembly line (a worker) for each job and tearing it down when finished.

graph TD
    subgraph Mobile Client [Android Device]
        A[Flutter App] -- 1. Start Stream Request --> B{API Gateway};
        A -- 3. Upload Video Chunks --> D[S3 Bucket: video-chunks];
    end

    subgraph AWS Control Plane
        B -- 2. Triggers --> C[Lambda: Stream Dispatcher];
        C -- Publishes Job --> E[SQS: Job Queue];
    end

    subgraph AWS Data Plane
        F[Worker Fleet Manager] -- Polls --> E;
        F -- Launches Worker --> G((EC2/ECS Worker));
        G -- Reads Chunks --> D;
        G -- R/W State --> H[S3 Bucket: processing-state];
        G -- Writes Results --> I[S3 Bucket: analysis-results];
    end

    A -- Polls for Results --> I;

    style Mobile Client fill:#dae8fc,stroke:#6c8ebf,stroke-width:2px
    style G fill:#f8cecc,stroke:#b85450,stroke-width:2px

This post-mortem documents the build-out of this architecture, focusing on the production-grade code for each component, from the Flutter client’s streaming logic to the Terraform modules that define the immutable infrastructure.

Part 1: The Resilient Flutter Client

The client’s responsibility is more than just capturing video. It must be a reliable data producer in an unreliable environment (mobile networks). A common pitfall is to treat the video stream as a single, atomic upload. This is fragile. We implemented a chunking mechanism. The client breaks the video into 5-second segments, uploads each segment, and only proceeds to the next upon confirmation.

Here are the key dependencies in pubspec.yaml:

# pubspec.yaml
dependencies:
  flutter:
    sdk: flutter

  camera: ^0.10.5+5 # Core camera access
  path_provider: ^2.1.1 # For temporary file storage
  http: ^1.1.0 # For network requests
  uuid: ^4.2.1 # To generate unique stream IDs

The core logic resides in a StreamingService. It initializes the camera, starts recording, and manages the chunking and upload process in a loop.

// lib/streaming_service.dart
import 'dart:async';
import 'dart:io';
import 'package:camera/camera.dart';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart' as http;
import 'package:path_provider/path_provider.dart';
import 'package:uuid/uuid.dart';

class StreamingService {
  CameraController? _controller;
  String? _streamId;
  bool _isStreaming = false;
  final int _chunkDurationSeconds = 5;
  Timer? _recordingTimer;

  // Endpoint for our dispatcher API
  final String _startStreamEndpoint = "https://api.example.com/start_stream";

  Future<void> initialize() async {
    final cameras = await availableCameras();
    final firstCamera = cameras.first;
    _controller = CameraController(firstCamera, ResolutionPreset.medium);
    await _controller!.initialize();
  }

  Future<void> startStreaming() async {
    if (_controller == null || !_controller!.value.isInitialized) {
      throw Exception("Camera not initialized");
    }
    if (_isStreaming) {
      debugPrint("Streaming already in progress.");
      return;
    }

    _isStreaming = true;
    _streamId = const Uuid().v4();

    try {
      // 1. Notify the backend to provision a worker for this streamId
      final response = await http.post(
        Uri.parse(_startStreamEndpoint),
        body: {'streamId': _streamId},
      );

      if (response.statusCode != 200) {
        throw Exception("Failed to initiate stream with backend. Status: ${response.statusCode}");
      }

      final responseBody = response.body; // Assume it returns presigned URL info
      debugPrint("Backend acknowledged stream $_streamId. Response: $responseBody");

      // 2. Start the chunking and recording loop
      _recordingTimer = Timer.periodic(Duration(seconds: _chunkDurationSeconds), (timer) {
        _captureAndUploadChunk();
      });
      // Initial capture
      _captureAndUploadChunk();

    } catch (e) {
      _isStreaming = false;
      debugPrint("Error starting stream: $e");
      // Propagate error to the UI
      rethrow;
    }
  }

  Future<void> _captureAndUploadChunk() async {
    if (!_isStreaming || _controller == null || _controller!.value.isRecordingVideo) {
      return;
    }

    try {
      await _controller!.startVideoRecording();
      // Wait for the chunk duration
      await Future.delayed(Duration(seconds: _chunkDurationSeconds));
      
      final file = await _controller!.stopVideoRecording();
      
      // A common mistake is not handling the upload in a fire-and-forget fashion,
      // which would block the next recording cycle. We spawn an isolate or use
      // compute for a real production app, but for simplicity, we use an async gap.
      _uploadFile(file, _streamId!);

    } on CameraException catch (e) {
      debugPrint("Camera Error during chunk capture: $e");
      // Implement retry logic or stop streaming
      stopStreaming();
    }
  }
  
  // In a real app, this would use the AWS SDK for Flutter to get presigned URLs.
  // Using a simple HTTP PUT for demonstration.
  Future<void> _uploadFile(XFile file, String streamId) async {
    final chunkName = "${DateTime.now().millisecondsSinceEpoch}.mp4";
    final uploadUrl = "https://api.example.com/upload/$streamId/$chunkName"; // This would be a pre-signed S3 URL

    try {
      final request = http.StreamedRequest('PUT', Uri.parse(uploadUrl));
      request.headers['Content-Type'] = 'video/mp4';
      final fileStream = http.ByteStream(file.openRead());
      final fileLength = await file.length();
      
      request.contentLength = fileLength;
      request.sink.addStream(fileStream);

      final response = await request.send();

      if (response.statusCode == 200) {
        debugPrint("Successfully uploaded chunk: $chunkName");
      } else {
        debugPrint("Failed to upload chunk. Status: ${response.statusCode}");
        // Implement retry logic with exponential backoff.
      }
    } catch (e) {
        debugPrint("Network error during upload: $e");
        // Handle network failure, potentially cache chunk locally for later upload.
    } finally {
        // Clean up the temporary file
        try {
          final localFile = File(file.path);
          if (await localFile.exists()) {
            await localFile.delete();
          }
        } catch (e) {
          debugPrint("Failed to delete temporary file: ${e.path}");
        }
    }
  }

  void stopStreaming() {
    _recordingTimer?.cancel();
    if (_controller?.value.isRecordingVideo ?? false) {
      _controller?.stopVideoRecording();
    }
    _isStreaming = false;
    _streamId = null;
    debugPrint("Streaming stopped.");
  }

  void dispose() {
    _recordingTimer?.cancel();
    _controller?.dispose();
  }
}

This client-side logic is defensive. It assumes the network will fail and that the backend might be slow. The use of a unique streamId generated client-side is crucial for idempotency. If the start_stream call fails and is retried, the backend can recognize it’s for the same session.

Part 2: The Terraform Foundation for Immutable Workers

The heart of the backend is the infrastructure. We will define everything required to run a single worker in a reusable Terraform module. This module becomes our unit of scale.

Our project structure looks like this:

terraform/
├── environments/
│   └── prod/
│       └── main.tf
├── modules/
│   └── cv_worker/
│       ├── main.tf
│       ├── variables.tf
│       └── outputs.tf
└── main.tf

The cv_worker module is the key component. It defines an IAM role for the worker, a security group, and an ECS task definition. We chose ECS Fargate to avoid managing EC2 instances ourselves.

modules/cv_worker/variables.tf

# modules/cv_worker/variables.tf

variable "project_name" {
  description = "The name of the project."
  type        = string
}

variable "environment" {
  description = "The environment (e.g., 'prod', 'staging')."
  type        = string
}

variable "ecs_cluster_arn" {
  description = "The ARN of the ECS cluster to deploy tasks into."
  type        = string
}

variable "vpc_id" {
  description = "The ID of the VPC."
  type        = string
}

variable "private_subnet_ids" {
  description = "A list of private subnet IDs for the ECS tasks."
  type        = list(string)
}

variable "docker_image_url" {
  description = "The URL of the OpenCV worker Docker image in ECR."
  type        = string
}

variable "video_chunks_bucket_name" {
  description = "The name of the S3 bucket for video chunks."
  type        = string
}

variable "processing_state_bucket_name" {
  description = "The name of the S3 bucket for tracking state."
  type        = string
}

modules/cv_worker/main.tf

This is where the real definition happens. Note the IAM policy: it grants least-privilege access to exactly the resources the worker needs.

# modules/cv_worker/main.tf

locals {
  name_prefix = "${var.project_name}-${var.environment}"
}

# IAM Role for the ECS Task
resource "aws_iam_role" "task_role" {
  name = "${local.name_prefix}-task-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "ecs-tasks.amazonaws.com"
      }
    }]
  })
}

# IAM Policy granting access to S3 buckets
resource "aws_iam_policy" "task_policy" {
  name        = "${local.name_prefix}-task-policy"
  description = "Policy for CV worker tasks to access S3."

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "s3:GetObject",
          "s3:ListBucket"
        ],
        Resource = [
          "arn:aws:s3:::${var.video_chunks_bucket_name}",
          "arn:aws:s3:::${var.video_chunks_bucket_name}/*"
        ]
      },
      {
        Effect = "Allow",
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:DeleteObject"
        ],
        Resource = [
          "arn:aws:s3:::${var.processing_state_bucket_name}",
          "arn:aws:s3:::${var.processing_state_bucket_name}/*"
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "task_policy_attachment" {
  role       = aws_iam_role.task_role.name
  policy_arn = aws_iam_policy.task_policy.arn
}

# Security group to allow outbound traffic but no inbound
resource "aws_security_group" "task_sg" {
  name        = "${local.name_prefix}-task-sg"
  description = "Security group for CV worker ECS tasks"
  vpc_id      = var.vpc_id

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

# The core of our worker: the ECS Task Definition
resource "aws_ecs_task_definition" "worker_task" {
  family                   = "${local.name_prefix}-worker"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "1024" # 1 vCPU
  memory                   = "2048" # 2 GB
  execution_role_arn       = aws_iam_role.task_role.arn # Role for ECS agent to pull image
  task_role_arn            = aws_iam_role.task_role.arn # Role for the application itself

  container_definitions = jsonencode([{
    name  = "opencv-worker"
    image = var.docker_image_url
    essential = true
    # We pass the stream_id via environment variables. This is how a specific
    # task knows which stream to process.
    environment = [
      {
        name  = "STREAM_ID",
        value = "placeholder" # This will be overridden at runtime
      },
      {
        name  = "CHUNKS_BUCKET",
        value = var.video_chunks_bucket_name
      },
      {
        name  = "STATE_BUCKET",
        value = var.processing_state_bucket_name
      }
    ],
    logConfiguration = {
      logDriver = "awslogs"
      options = {
        "awslogs-group"         = "/ecs/${local.name_prefix}-worker"
        "awslogs-region"        = "us-east-1" # Specify region
        "awslogs-stream-prefix" = "ecs"
      }
    }
  }])
}

# Output the ARN of the task definition so our dispatcher can use it
output "task_definition_arn" {
  value = aws_ecs_task_definition.worker_task.arn
}

output "task_security_group_id" {
  value = aws_security_group.task_sg.id
}

Using this module in the production environment is now trivial. The environments/prod/main.tf file composes our modules to build the complete system.

Part 3: The Dispatcher and Orchestration Logic

The missing piece is the logic that connects a client’s request to a running worker. A Lambda function behind an API Gateway is perfect for this. It’s cheap, scalable, and stateless. Its only job is to receive a streamId, publish a job message to an SQS queue, and return a success response.

Why SQS? A common mistake is to have the Lambda function directly call the aws ecs run-task API. This couples the client’s request lifecycle to the infrastructure’s provisioning time. If ECS is slow to schedule a task, the client-facing API call will time out. By putting a queue in between, we decouple these systems. The Lambda provides immediate acknowledgment, and a separate, robust process consumes the queue to provision the workers.

Here’s a simplified Python Lambda handler for the dispatcher:

# lambda_dispatcher/main.py
import json
import boto3
import os
import logging

# Setup basic logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs_client = boto3.client("sqs")
QUEUE_URL = os.environ.get("JOB_QUEUE_URL")

def handler(event, context):
    if not QUEUE_URL:
        logger.error("JOB_QUEUE_URL environment variable not set.")
        return {"statusCode": 500, "body": json.dumps({"message": "Internal server error: configuration missing"})}

    try:
        body = json.loads(event.get("body", "{}"))
        stream_id = body.get("streamId")

        if not stream_id:
            return {"statusCode": 400, "body": json.dumps({"message": "streamId is required"})}

        message_body = {
            "action": "START_WORKER",
            "streamId": stream_id
        }

        # A real-world project would add more metadata: user ID, requested processing profile, etc.
        sqs_client.send_message(
            QueueUrl=QUEUE_URL,
            MessageBody=json.dumps(message_body),
            MessageGroupId=stream_id # Using FIFO queue for potential ordering guarantees
        )
        
        logger.info(f"Successfully queued job for streamId: {stream_id}")
        return {
            "statusCode": 202, # 202 Accepted is the correct response here
            "body": json.dumps({"message": "Processing job accepted", "streamId": stream_id})
        }

    except json.JSONDecodeError:
        return {"statusCode": 400, "body": json.dumps({"message": "Invalid JSON format"})}
    except Exception as e:
        logger.error(f"Failed to queue job for streamId {stream_id}: {e}")
        return {"statusCode": 500, "body": json.dumps({"message": "Internal server error"})}

A separate process (which could be another Lambda triggered by SQS events, or a persistent service running on EC2/ECS) is responsible for consuming from this queue and running the ECS task.

# worker_manager/main.py
# This would run as a long-running service
import boto3
import os
import json

ecs_client = boto3.client("ecs")

# These would be fetched from SSM Parameter Store or environment variables
# They are outputs from our Terraform module.
TASK_DEFINITION_ARN = os.environ.get("TASK_DEFINITION_ARN")
SUBNET_IDS = os.environ.get("SUBNET_IDS").split(',')
SECURITY_GROUP_ID = os.environ.get("SECURITY_GROUP_ID")
CLUSTER_NAME = os.environ.get("CLUSTER_NAME")

def run_worker_task(stream_id):
    """
    Invokes ECS RunTask to start a new processing worker.
    """
    try:
        response = ecs_client.run_task(
            cluster=CLUSTER_NAME,
            taskDefinition=TASK_DEFINITION_ARN,
            launchType='FARGATE',
            count=1,
            networkConfiguration={
                'awsvpcConfiguration': {
                    'subnets': SUBNET_IDS,
                    'securityGroups': [SECURITY_GROUP_ID],
                    'assignPublicIp': 'DISABLED'
                }
            },
            # This is the critical part: overriding the container environment
            overrides={
                'containerOverrides': [{
                    'name': 'opencv-worker', # Must match the name in the task definition
                    'environment': [{
                        'name': 'STREAM_ID',
                        'value': stream_id
                    }]
                }]
            }
        )
        task_arn = response['tasks'][0]['taskArn']
        print(f"Successfully launched task {task_arn} for streamId {stream_id}")
        return True
    except Exception as e:
        print(f"Error launching ECS task for streamId {stream_id}: {e}")
        return False

Part 4: The Stateful OpenCV Worker

Finally, the worker itself. This is a Python application running in a Docker container. Its lifecycle is simple: start, discover its streamId from the environment, and enter a loop to process video chunks from S3.

worker/main.py

# worker/main.py
import os
import time
import boto3
import cv2
import pickle
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Configuration from Environment Variables ---
STREAM_ID = os.environ.get("STREAM_ID")
CHUNKS_BUCKET = os.environ.get("CHUNKS_BUCKET")
STATE_BUCKET = os.environ.get("STATE_BUCKET")

if not all([STREAM_ID, CHUNKS_BUCKET, STATE_BUCKET]):
    raise ValueError("Missing one or more required environment variables.")

# --- S3 and OpenCV Initialization ---
s3_client = boto3.client("s3")
STATE_FILE_KEY = f"state/{STREAM_ID}.pkl"
PROCESSED_CHUNKS = set()

def load_state():
    """Loads the OpenCV tracker state from S3."""
    try:
        response = s3_client.get_object(Bucket=STATE_BUCKET, Key=STATE_FILE_KEY)
        state_data = pickle.loads(response['Body'].read())
        logging.info(f"[{STREAM_ID}] Successfully loaded state.")
        return state_data['tracker'], state_data['bbox']
    except s3_client.exceptions.NoSuchKey:
        logging.info(f"[{STREAM_ID}] No previous state found. Initializing new tracker.")
        return cv2.TrackerKCF_create(), None
    except Exception as e:
        logging.error(f"[{STREAM_ID}] Failed to load state: {e}. Re-initializing.")
        # In a real-world project, this error should be handled more gracefully.
        # Maybe move the corrupted state file to a quarantine location for analysis.
        return cv2.TrackerKCF_create(), None

def save_state(tracker, bbox):
    """Saves the tracker state to S3."""
    try:
        state_data = pickle.dumps({'tracker': tracker, 'bbox': bbox})
        s3_client.put_object(Bucket=STATE_BUCKET, Key=STATE_FILE_KEY, Body=state_data)
        logging.info(f"[{STREAM_ID}] Successfully saved state.")
    except Exception as e:
        logging.error(f"[{STREAM_ID}] CRITICAL: Failed to save state: {e}")

def process_chunk(video_path, tracker, initial_bbox):
    """Processes a single video chunk file."""
    cap = cv2.VideoCapture(video_path)
    is_initialized = initial_bbox is not None
    last_known_bbox = initial_bbox

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if not is_initialized:
            # For the very first frame of the first chunk, we need to select an object.
            # In production, this would come from a detection model (e.g., YOLO).
            # Here, we'll hardcode a region for demonstration.
            initial_bbox = (287, 23, 86, 320) # Dummy bounding box
            tracker.init(frame, initial_bbox)
            is_initialized = True
            logging.info(f"[{STREAM_ID}] Tracker initialized with bbox: {initial_bbox}")
        
        success, bbox = tracker.update(frame)
        if success:
            last_known_bbox = bbox
            # Here you would extract data, draw on the frame, etc.
        else:
            # Handle tracking failure. Attempt re-detection.
            logging.warning(f"[{STREAM_ID}] Tracking failed on a frame.")

    cap.release()
    return last_known_bbox

def main_loop():
    tracker, bbox = load_state()
    
    while True:
        try:
            # 1. List available chunks in S3
            response = s3_client.list_objects_v2(Bucket=CHUNKS_BUCKET, Prefix=f"uploads/{STREAM_ID}/")
            if 'Contents' not in response:
                time.sleep(5)
                continue

            # 2. Identify new, unprocessed chunks
            all_chunks = sorted([obj['Key'] for obj in response['Contents']])
            new_chunks = [key for key in all_chunks if key not in PROCESSED_CHUNKS]

            if not new_chunks:
                # Add logic here to terminate the worker if no new chunks appear for a while
                logging.info(f"[{STREAM_ID}] No new chunks found. Waiting...")
                time.sleep(5)
                continue

            # 3. Process each new chunk in order
            for chunk_key in new_chunks:
                local_path = f"/tmp/{os.path.basename(chunk_key)}"
                logging.info(f"[{STREAM_ID}] Downloading chunk: {chunk_key}")
                s3_client.download_file(CHUNKS_BUCKET, chunk_key, local_path)
                
                logging.info(f"[{STREAM_ID}] Processing chunk: {local_path}")
                bbox = process_chunk(local_path, tracker, bbox)

                os.remove(local_path)
                PROCESSED_CHUNKS.add(chunk_key)

                # 4. Persist state after each chunk. This is critical for resilience.
                # If the worker crashes, it can resume from the last completed chunk.
                save_state(tracker, bbox)

        except Exception as e:
            logging.error(f"[{STREAM_ID}] Unhandled exception in main loop: {e}")
            time.sleep(10) # Backoff before retrying

if __name__ == "__main__":
    logging.info(f"Worker starting for STREAM_ID: {STREAM_ID}")
    main_loop()

This worker is designed for resilience. By saving its state after every processed chunk, it can be terminated and restarted without losing progress. This is the essence of designing for failure in a cloud environment. The infrastructure is ephemeral, but the state is persistent.

The use of Terraform to define this entire stack, from the IAM roles to the task definitions, ensures that our infrastructure is not a liability. It’s a version-controlled, auditable, and easily reproducible asset. We can spin up an identical staging environment with a single command or tear down the entire production stack if needed, confident that we can bring it back to the exact same state.

This architecture, however, is not without its limitations. The latency introduced by chunking and uploading to S3 might be unacceptable for true real-time applications. A future iteration would likely explore a direct streaming protocol like WebRTC, which would require a significant redesign of the worker to handle a continuous stream rather than discrete files. Furthermore, the cost of one Fargate task per stream can be substantial. For very short-lived streams, the overhead of task scheduling might be too high. A potential optimization could involve a pool of “hot” workers that are assigned streams dynamically, but this reintroduces the risk of state management complexity we sought to avoid. The current model prioritizes isolation and resilience over cost and latency, a trade-off that was right for our initial production launch.


  TOC