Building a Real-Time Vision Analytics Pipeline with a Custom Tyk Go Plugin for gRPC Stream Enrichment


The initial requirement seemed straightforward: display real-time object detection data from a network of factory floor cameras on a centralized monitoring dashboard. The proof-of-concept, built on a predictable stack of Python REST APIs and HTTP long-polling, collapsed during the first load test. Latency skyrocketed and the server buckled under the connection churn. It became clear that a request-response model was fundamentally unsuitable for a continuous stream of analytical data. The only viable path forward was a streaming architecture.

Our technology landscape was constrained. We had an established Tyk API Gateway managing ingress, a legacy SQL Server instance holding all device metadata, and a front-end team comfortable with React and MobX. The processing itself would naturally fall to Python with OpenCV. The challenge was to stitch these components into a high-throughput, low-latency pipeline. The critical flaw in our architecture was the need to enrich the raw detection data (bounding boxes, object class) with static metadata from SQL Server (camera location, operational status, installation date) before it reached the client. Performing this join on the client-side was inefficient, and having the Python processor call the database directly would tightly couple it to a legacy system we aimed to isolate.

The solution settled upon was to leverage our Tyk Gateway as an active component in the data path, not just a passive proxy. We would build a custom gRPC middleware as a Tyk Go plugin. This plugin would intercept the gRPC stream from the OpenCV processor, enrich each message with data fetched from SQL Server, and then forward the augmented message to the client. This centralized the business logic in the gateway layer, kept the high-performance processor dumb and focused, and delivered a ready-to-render data stream to the front end.

The gRPC Processing Service: OpenCV and Python

The foundation of the pipeline is a Python service that consumes a video feed, performs object detection with OpenCV, and streams the results via gRPC. The contract must be defined first.

A .proto file establishes the data structures for the stream. The DetectionRequest specifies which camera feed to process, and the DetectionEvent represents a single frame’s analysis, including the crucial camera_id for later enrichment.

// vision.proto
syntax = "proto3";

package vision;

option go_package = "vision/visionpb";

message BoundingBox {
    int32 x_min = 1;
    int32 y_min = 2;
    int32 x_max = 3;
    int32 y_max = 4;
}

message DetectedObject {
    string label = 1;
    float confidence = 2;
    BoundingBox box = 3;
}

message DetectionRequest {
    string camera_id = 1;
    // Potentially other params like model name, confidence threshold, etc.
}

message DetectionEvent {
    string camera_id = 1; // Used as the key for enrichment
    int64 timestamp_ms = 2;
    repeated DetectedObject objects = 3;

    // These fields will be populated by the Tyk plugin
    string camera_location = 4;
    string camera_status = 5;
    string install_date = 6;
}

service VisionService {
    rpc StreamDetections(DetectionRequest) returns (stream DetectionEvent) {}
}

The Python server implementation uses grpc.aio for asynchronous operation, which is critical for I/O-bound tasks like reading video frames and sending network data without blocking the main thread. A real-world project wouldn’t hardcode a video file; it would connect to an RTSP stream or a camera device. For reproducibility, we’ll simulate this with a video file.

# processor/server.py
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor

import cv2
import grpc
from grpc.aio import server

# Assuming protoc has been run to generate these files
import vision_pb2
import vision_pb2_grpc

# In a real application, this would be a more sophisticated model.
# For this example, we'll simulate detection logic.
def perform_detection(frame, camera_id):
    """
    Placeholder for actual OpenCV model inference.
    Returns a list of DetectedObject messages.
    """
    # Simulate finding two objects
    simulated_objects = [
        vision_pb2.DetectedObject(
            label="widget",
            confidence=0.98,
            box=vision_pb2.BoundingBox(x_min=10, y_min=10, x_max=100, y_max=100),
        ),
        vision_pb2.DetectedObject(
            label="gadget",
            confidence=0.91,
            box=vision_pb2.BoundingBox(x_min=150, y_min=40, x_max=280, y_max=200),
        ),
    ]
    logging.info(f"Detection complete for camera {camera_id}")
    return simulated_objects

class VisionServiceImpl(vision_pb2_grpc.VisionServiceServicer):
    def __init__(self, executor):
        self.executor = executor
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    async def StreamDetections(self, request: vision_pb2.DetectionRequest, context: grpc.aio.ServicerContext):
        camera_id = request.camera_id
        logging.info(f"Received detection stream request for camera: {camera_id}")

        # In production, this path would be an RTSP URL or device index.
        video_path = "assets/sample_video.mp4" 
        cap = cv2.VideoCapture(video_path)

        if not cap.isOpened():
            logging.error(f"Failed to open video source for camera {camera_id}")
            await context.abort(grpc.StatusCode.NOT_FOUND, "Video source not found")
            return

        loop = asyncio.get_running_loop()

        try:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    logging.info(f"End of video stream for camera {camera_id}")
                    break

                # Offload the CPU-bound OpenCV work to a thread pool
                # to avoid blocking the asyncio event loop.
                detected_objects = await loop.run_in_executor(
                    self.executor, perform_detection, frame, camera_id
                )

                event = vision_pb2.DetectionEvent(
                    camera_id=camera_id,
                    timestamp_ms=int(time.time() * 1000),
                    objects=detected_objects,
                )

                yield event
                
                # Simulate frame rate
                await asyncio.sleep(1/30) 

        except asyncio.CancelledError:
            logging.warning(f"Stream cancelled by client for camera {camera_id}")
        except Exception as e:
            logging.error(f"Error during stream for {camera_id}: {e}", exc_info=True)
            await context.abort(grpc.StatusCode.INTERNAL, "Processing error")
        finally:
            cap.release()
            logging.info(f"Cleaned up resources for camera {camera_id}")


async def serve():
    # It's crucial to use a thread pool for blocking or CPU-intensive tasks
    # like OpenCV processing, otherwise the entire gRPC server stalls.
    executor = ThreadPoolExecutor(max_workers=4)
    grpc_server = server(executor)
    vision_pb2_grpc.add_VisionServiceServicer_to_server(VisionServiceImpl(executor), grpc_server)
    grpc_server.add_insecure_port('[::]:50051')
    
    logging.info("Starting gRPC vision processor on port 50051...")
    await grpc_server.start()
    await grpc_server.wait_for_termination()

if __name__ == '__main__':
    asyncio.run(serve())

This service is intentionally lean. Its only job is to perform detections and stream protobuf messages. It has no knowledge of SQL Server or any other external systems. This separation of concerns is paramount for maintainability.

The Tyk Go Plugin: Middleware for Stream Enrichment

This is the architectural lynchpin. A custom Go plugin allows us to inject arbitrary logic into the Tyk request lifecycle. For gRPC, this means we can implement a stream interceptor.

The project requires a specific structure to be built as a Go plugin (.so file). The core logic resides in a HandleStream function, which is called by Tyk for every message that passes through the gRPC stream.

First, we need the Go protobuf bindings.
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative vision.proto

Next, the plugin implementation. A common pitfall here is managing database connections. Opening a new connection for every single message on a 30 FPS video stream would be catastrophic. The solution is to initialize a thread-safe, pooled database connection when the plugin is loaded and reuse it throughout the plugin’s lifecycle.

// enricher/plugin.go
package main

import (
	"context"
	"database/sql"
	"errors"
	"log"
	"os"
	"sync"
	"time"

	"github.com/TykTechnologies/tyk/coprocess"
	// SQL Server driver
	_ "github.com/denisenkom/go-mssqldb" 
	"google.golang.org/protobuf/proto"

	pb "enricher/vision/visionpb" // Import generated protobuf code
)

var (
	db   *sql.DB
	once sync.Once
	dbErr error
)

// initDB is called once to set up the database connection pool.
// The connection string is read from plugin configuration.
func initDB(dsn string) {
	once.Do(func() {
		log.Println("Initializing database connection pool...")
		if dsn == "" {
			dbErr = errors.New("database DSN is not configured in the plugin")
			log.Println(dbErr)
			return
		}
		
		db, dbErr = sql.Open("sqlserver", dsn)
		if dbErr != nil {
			log.Printf("Failed to open SQL Server connection: %v", dbErr)
			return
		}

		db.SetMaxOpenConns(25)
		db.SetMaxIdleConns(25)
		db.SetConnMaxLifetime(5 * time.Minute)

		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		dbErr = db.PingContext(ctx)
		if dbErr != nil {
			log.Printf("Failed to ping SQL Server: %v", dbErr)
			db.Close() // Close if ping fails
		} else {
			log.Println("Database connection pool successfully initialized.")
		}
	})
}

// CameraMetadata holds the enrichment data from our legacy database.
type CameraMetadata struct {
	Location   sql.NullString
	Status     sql.NullString
	InstallDate sql.NullString
}


// queryCameraMetadata fetches data for a given camera ID.
// This is the function that will be called for each relevant message.
func queryCameraMetadata(ctx context.Context, cameraID string) (*CameraMetadata, error) {
	if db == nil {
		return nil, errors.New("database connection is not available")
	}

	// Use a timeout for all database queries to prevent the plugin from hanging.
	queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()

	row := db.QueryRowContext(queryCtx, 
		"SELECT location, status, install_date FROM dbo.CameraRegistry WHERE camera_id = @p1", 
		cameraID)

	var metadata CameraMetadata
	err := row.Scan(&metadata.Location, &metadata.Status, &metadata.InstallDate)
	if err != nil {
		if err == sql.ErrNoRows {
			log.Printf("No metadata found for camera_id: %s", cameraID)
			return nil, nil // Not a fatal error, we can proceed without enrichment.
		}
		log.Printf("Error querying metadata for camera_id %s: %v", cameraID, err)
		return nil, err
	}

	return &metadata, nil
}


// HandleStream is the entry point for Tyk's gRPC stream interception.
func HandleStream(frame *coprocess.StreamFrame, event coprocess.Event) (*coprocess.StreamFrame, coprocess.Event, error) {
	// We only care about the payload from the upstream service (the processor).
	if frame.Direction != coprocess.StreamDirection_UPSTREAM {
		return frame, event, nil
	}

	// Unmarshal the protobuf message from the raw frame data.
	var eventPayload pb.DetectionEvent
	if err := proto.Unmarshal(frame.Payload, &eventPayload); err != nil {
		log.Printf("Failed to unmarshal protobuf payload: %v", err)
		// Let the message pass through without modification on error.
		return frame, event, nil
	}

	cameraID := eventPayload.GetCameraId()
	if cameraID == "" {
		return frame, event, nil // Nothing to enrich.
	}
	
	metadata, err := queryCameraMetadata(context.Background(), cameraID)
	if err != nil {
		log.Printf("Failed to retrieve enrichment data for %s, passing frame as-is.", cameraID)
		return frame, event, nil
	}

	if metadata != nil {
		// A common mistake is to forget to handle NULL database values.
		// sql.NullString helps manage this gracefully.
		if metadata.Location.Valid {
			eventPayload.CameraLocation = metadata.Location.String
		}
		if metadata.Status.Valid {
			eventPayload.CameraStatus = metadata.Status.String
		}
		if metadata.InstallDate.Valid {
			eventPayload.InstallDate = metadata.InstallDate.String
		}
		log.Printf("Enriched data for camera %s", cameraID)
	}

	// Marshal the modified protobuf message back into raw bytes.
	newPayload, err := proto.Marshal(&eventPayload)
	if err != nil {
		log.Printf("Failed to marshal enriched protobuf payload: %v", err)
		return frame, event, nil
	}

	// Replace the frame's payload with our new, enriched payload.
	frame.Payload = newPayload

	return frame, event, nil
}

// MyInit is called by Tyk when the plugin is loaded.
// It's the perfect place to initialize our DB connection pool.
func MyInit(event coprocess.Event) (coprocess.Event, error) {
	config, ok := event.GetConfig().(map[string]interface{})
	if !ok {
		return event, errors.New("failed to parse plugin configuration")
	}

	dsn, _ := config["sql_server_dsn"].(string)
	initDB(dsn)
	
	if dbErr != nil {
		log.Printf("Initialization failed: %v", dbErr)
		return event, dbErr
	}

	return event, nil
}

func main() {}

To build this, you run go build -buildmode=plugin -o enricher.so enricher/plugin.go. This produces a shared object file that Tyk can load.

Tyk Gateway Configuration

The Tyk API definition wires everything together. It defines the upstream gRPC service, enables the gRPC-to-gRPC-Web translation for browser clients, and loads our custom Go plugin.

{
  "name": "Vision Analytics Stream",
  "api_id": "vision-api",
  "org_id": "1",
  "use_keyless": true,
  "auth": {
    "auth_header_name": "Authorization"
  },
  "version_data": {
    "not_versioned": true,
    "versions": {
      "Default": {
        "name": "Default",
        "expires": ""
      }
    }
  },
  "proxy": {
    "listen_path": "/vision.VisionService/",
    "target_url": "grpc://processor-service:50051",
    "strip_listen_path": true,
    "enable_grpc": true,
    "enable_grpc_web": true
  },
  "custom_middleware": {
    "driver": "goplugin",
    "pre": [
      {
        "name": "MyInit",
        "path": "/opt/tyk-gateway/middleware/enricher.so",
        "symbol_name": "MyInit"
      }
    ],
    "post": [],
    "response": [],
    "id_extractor": {
      "extractor_type": "",
      "extractor_config": {}
    },
    "driver_specific_middleware": {
      "goplugin_streams": [
        {
          "path": "/opt/tyk-gateway/middleware/enricher.so",
          "symbol_name": "HandleStream"
        }
      ]
    }
  },
  "custom_middleware_bundle": "enricher_bundle",
  "middleware_global_config": {
    "enricher_bundle": {
      "sql_server_dsn": "sqlserver://user:password@sql-server-host:1433?database=DeviceDB"
    }
  }
}

A few key points in this configuration:

  • "target_url": "grpc://processor-service:50051" points Tyk to our Python gRPC service.
  • "enable_grpc": true and "enable_grpc_web": true are essential. The former tells Tyk to proxy gRPC traffic, and the latter enables the protocol translation required for browsers.
  • The custom_middleware section loads our enricher.so plugin. MyInit is called on startup, and HandleStream is registered to intercept stream data.
  • middleware_global_config is how we securely pass the database connection string to the plugin, avoiding hardcoded credentials.

The Frontend: MobX for Reactive State Management

Finally, the frontend consumes the stream. gRPC-Web generates a client library from the .proto file that makes the gRPC stream look like a simple event emitter to JavaScript. MobX is perfectly suited for this; we can create a store that listens to the stream and updates its @observable properties, causing any React components that use them to re-render automatically.

First, generate the JavaScript/TypeScript client:
protoc -I=. vision.proto --js_out=import_style=commonjs,binary:./src/proto --grpc-web_out=import_style=typescript,mode=grpcwebtext:./src/proto

Next, the MobX store manages the connection and state.

// src/stores/DetectionStore.ts
import { makeAutoObservable, observable, action, runInAction } from "mobx";
import { VisionServiceClient } from "../proto/VisionServiceClientPb";
import { DetectionRequest, DetectionEvent } from "../proto/VisionPb";
import { grpc } from "@improbable-eng/grpc-web";

// Define a type for our local state
export interface EnrichedDetection extends DetectionEvent.AsObject {
  // We can add client-side properties here if needed
  localReceivedTimestamp: number;
}

class DetectionStore {
  // Use a map for efficient updates and lookups
  detections = observable.map<string, EnrichedDetection>();
  connectionState: "idle" | "connecting" | "streaming" | "error" | "closed" = "idle";
  errorMessage: string | null = null;
  
  private client: VisionServiceClient;
  private currentStream: grpc.Client<DetectionRequest, DetectionEvent> | null = null;

  constructor() {
    makeAutoObservable(this);
    // The URL points to our Tyk Gateway, not the backend gRPC service.
    this.client = new VisionServiceClient("http://localhost:8080"); 
  }

  @action
  startStreaming(cameraId: string) {
    if (this.currentStream) {
      console.warn("Stream already in progress. Call stopStreaming first.");
      return;
    }
    
    this.connectionState = "connecting";
    this.detections.clear();
    this.errorMessage = null;

    const request = new DetectionRequest();
    request.setCameraId(cameraId);

    const stream = this.client.streamDetections(request);
    this.currentStream = stream;

    stream.on("data", (response: DetectionEvent) => {
      // runInAction is used to batch state updates from async operations.
      runInAction(() => {
        const enrichedEvent = {
          ...response.toObject(),
          localReceivedTimestamp: Date.now(),
        };
        // Use camera ID + timestamp as a unique key or simply update by camera ID
        this.detections.set(enrichedEvent.cameraId, enrichedEvent);
        if (this.connectionState !== "streaming") {
          this.connectionState = "streaming";
        }
      });
    });

    stream.on("end", () => {
      runInAction(() => {
        console.log("Stream ended.");
        this.connectionState = "closed";
        this.currentStream = null;
      });
    });

    stream.on("status", (status: grpc.Status) => {
      if (status.code !== grpc.Code.OK) {
        runInAction(() => {
          console.error("gRPC stream error:", status.details);
          this.errorMessage = `Error: ${status.details} (code: ${status.code})`;
          this.connectionState = "error";
          this.currentStream = null;
        });
      }
    });
  }

  @action
  stopStreaming() {
    if (this.currentStream) {
      this.currentStream.cancel(); // This triggers the 'end' or 'status' event
      this.currentStream = null;
      this.connectionState = "idle";
      console.log("Stream cancelled by client.");
    }
  }
}

export const detectionStore = new DetectionStore();

A React component can now observe this store and render the data.

// src/components/DetectionViewer.tsx
import React from 'react';
import { observer } from 'mobx-react-lite';
import { detectionStore } from '../stores/DetectionStore';

const DetectionViewer: React.FC<{ cameraId: string }> = observer(({ cameraId }) => {
  const detection = detectionStore.detections.get(cameraId);

  React.useEffect(() => {
    detectionStore.startStreaming(cameraId);
    return () => {
      detectionStore.stopStreaming();
    };
  }, [cameraId]);

  return (
    <div>
      <h3>Live Feed for Camera: {cameraId}</h3>
      <p>Connection Status: {detectionStore.connectionState}</p>
      {detectionStore.errorMessage && <p style={{ color: 'red' }}>{detectionStore.errorMessage}</p>}
      
      {detection && (
        <div style={{ border: '1px solid #ccc', padding: '10px', marginTop: '10px' }}>
          <h4>
            Enriched Metadata (from Tyk Plugin)
          </h4>
          <p><strong>Location:</strong> {detection.cameraLocation || 'N/A'}</p>
          <p><strong>Status:</strong> {detection.cameraStatus || 'N/A'}</p>
          <p><strong>Install Date:</strong> {detection.installDate || 'N/A'}</p>
          
          <hr />

          <h4>Detected Objects ({detection.objectsList.length})</h4>
          <ul>
            {detection.objectsList.map((obj, index) => (
              <li key={index}>
                {obj.label} (Confidence: {(obj.confidence * 100).toFixed(1)}%)
              </li>
            ))}
          </ul>
        </div>
      )}
    </div>
  );
});

export default DetectionViewer;

This completes the end-to-end pipeline.

graph TD
    subgraph Browser
        A[React Component with MobX] --> B{gRPC-Web Client};
    end
    
    B -- gRPC-Web (HTTP/1.1) --> C[Tyk API Gateway];

    subgraph Tyk Gateway
        C -- Translates to gRPC --> D[Custom Go Plugin];
        D -- Intercepts Stream --> E{Enrichment Logic};
        E -- "SELECT ... FROM CameraRegistry" --> F[(SQL Server)];
        E -- Returns Enriched Message --> D;
        D -- Forwards Stream --> G[gRPC Proxy];
    end

    G -- Native gRPC (HTTP/2) --> H[Python gRPC Service];

    subgraph Processor Service
        H -- Reads from --> I[Video Source];
        H -- Processes frames with --> J[OpenCV Engine];
        J -- Streams results back to --> H;
    end

The final architecture is robust. The processing service is decoupled and scalable, the gateway handles a critical piece of business logic without compromising its role as a security and traffic management layer, and the frontend receives a data stream that is immediately ready for rendering. However, the custom plugin is a significant piece of infrastructure code. It increases the complexity of the gateway and introduces a potential performance bottleneck if the database queries are slow. A future optimization would be to introduce a Redis cache between the plugin and SQL Server to hold the mostly static metadata, reducing latency and load on the legacy database for every message on the stream. The boundary of applicability for this pattern is where enrichment logic is relatively simple and the data source is fast; for more complex, multi-step enrichments, a dedicated microservice in the data path would be a more maintainable, albeit more complex, architectural choice.


  TOC