Constructing a Low-Latency Streaming RAG Interface Using Weaviate, Vercel Functions, and Zustand


Our initial attempt at a Retrieval-Augmented Generation (RAG) feature was a performance disaster. The user experience was defined by a loading spinner that persisted for an agonizing 5 to 10 seconds. The architecture was textbook: a React frontend making a request to a standard Vercel Serverless Function, which would in turn query a Weaviate vector database for context, pass that context to an LLM, and finally return the complete generated answer. In a real-world project, this latency is unacceptable. It breaks the flow of interaction and makes the feature feel sluggish and unreliable. The core problem was the series of blocking, sequential network requests inherent in this simple model. This is the log of how we tore it down and rebuilt it for perceived-instantaneous interaction using a streaming-first approach.

The Initial Flawed Architecture and The Bottleneck

The first implementation was straightforward, which was its primary failing. A user submits a query, and we fire off a request.

Here’s the original Vercel Serverless Function. It’s a clear illustration of the problem.

// /api/rag-blocking.ts
// DO NOT USE THIS CODE - IT IS THE SLOW, BLOCKING EXAMPLE

import { NextApiRequest, NextApiResponse } from 'next';
import weaviate, { WeaviateClient } from 'weaviate-ts-client';
import OpenAI from 'openai';

const weaviateClient: WeaviateClient = weaviate.client({
  scheme: 'https',
  host: process.env.WEAVIATE_HOST || '',
  apiKey: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY || ''),
});

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  const { query } = req.body;

  if (!query || typeof query !== 'string') {
    return res.status(400).json({ error: 'Query is required' });
  }

  try {
    // STAGE 1: Get embedding for the query
    console.time('openai-embedding');
    const embeddingResponse = await openai.embeddings.create({
      model: 'text-embedding-3-small',
      input: query,
    });
    const queryVector = embeddingResponse.data[0].embedding;
    console.timeEnd('openai-embedding'); // Typically ~150-300ms

    // STAGE 2: Query Weaviate
    console.time('weaviate-query');
    const searchResult = await weaviateClient.graphql
      .get()
      .withClassName('MyDocument')
      .withFields('content title')
      .withNearVector({ vector: queryVector })
      .withLimit(5)
      .do();
    console.timeEnd('weaviate-query'); // Typically ~200-500ms

    const context = searchResult.data.Get.MyDocument.map(
      (doc: any) => doc.content
    ).join('\n\n---\n\n');

    // STAGE 3: Call OpenAI for generation
    console.time('openai-generation');
    const completion = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [
        {
          role: 'system',
          content: `You are a helpful assistant. Use the following context to answer the user's question. Context: ${context}`,
        },
        { role: 'user', content: query },
      ],
    });
    console.timeEnd('openai-generation'); // Typically ~2000-8000ms

    res.status(200).json({
      answer: completion.choices[0].message.content,
      sources: searchResult.data.Get.MyDocument.map((doc: any) => ({
        title: doc.title,
      })),
    });
  } catch (error) {
    console.error('RAG pipeline failed:', error);
    res.status(500).json({ error: 'Internal Server Error' });
  }
}

Running this reveals the obvious:

  1. Embedding Latency: ~300ms.
  2. Weaviate Search Latency: ~500ms.
  3. LLM Generation Latency (Time to First Token + Full Generation): 4000ms+.

The total time is the sum of these sequential, blocking operations. The user sees nothing until the final byte is ready. This approach fundamentally misunderstands the user’s perception of speed. A user can start reading and processing information far faster than the LLM can generate it. The goal, therefore, shifted from “make it faster” to “make it feel faster” by delivering value incrementally.

Architectural Pivot: Edge Functions and ReadableStream

The solution required two significant changes: moving the compute to the edge and re-implementing the entire flow using streams.

  1. Vercel Edge Functions: Unlike standard Serverless Functions which can have significant cold start delays (1-3 seconds), Edge Functions run on a V8 isolate runtime with near-zero cold starts. For an interactive, latency-sensitive endpoint like this, every millisecond shaved off the initial connection matters.
  2. Web ReadableStream API: This is the core of the new architecture. Instead of building a complete JSON response in memory and sending it, we open a stream to the client immediately. We can then push data chunks down the pipe as they become available.

The new data flow is designed to deliver UI-renderable content at each stage of the process:

  1. Client sends a query to the Edge Function.
  2. Edge Function immediately queries Weaviate.
  3. As soon as Weaviate responds, the function formats the source documents into a JSON object, stringifies it, and pushes it down the stream as the first chunk of data.
  4. The client receives this first chunk, parses it, and immediately renders the “Sources” section of the UI. The user can see the relevant context while the LLM is still thinking.
  5. Simultaneously, the Edge Function makes the call to the LLM API, but this time requesting a streaming response.
  6. As the LLM generates tokens, the Edge Function receives them and pushes each token down the same stream to the client.
  7. The client receives these token chunks and appends them to the answer being displayed, creating the “live typing” effect.

This is the refined, production-ready Edge Function. Note the careful handling of the stream and the custom protocol to differentiate data types.

// /pages/api/rag-streaming.ts
// This is the production-grade streaming implementation

import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';
import weaviate from 'weaviate-ts-client';

// Note: We are using the Vercel AI SDK for convenience in this example,
// but the underlying principle is a standard ReadableStream.

export const config = {
  runtime: 'edge',
};

// IMPORTANT: In a real-world scenario, you would want to manage clients better.
// Creating clients on every invocation in the edge is not ideal due to overhead.
// Consider a singleton pattern or other strategies if your platform supports them.
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

const weaviateClient = weaviate.client({
    scheme: 'https',
    host: process.env.WEAVIATE_HOST || '',
    apiKey: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY || ''),
});

// A custom stream transformer to inject our context data at the beginning of the stream.
function createCustomTransformer(sources: any[]) {
  let sourcesSent = false;
  const sourceDataPrefix = 'SOURCES::';
  const encoder = new TextEncoder();

  return new TransformStream({
    transform(chunk, controller) {
      // The first time transform is called, prepend the sources.
      if (!sourcesSent) {
        const sourcesJson = JSON.stringify(sources);
        const sourcesChunk = encoder.encode(`${sourceDataPrefix}${sourcesJson}\n`);
        controller.enqueue(sourcesChunk);
        sourcesSent = true;
      }
      // Pass through the original LLM token chunks.
      controller.enqueue(chunk);
    },
  });
}

export default async function handler(req: Request) {
  try {
    const { messages } = await req.json();
    const userQuery = messages[messages.length - 1].content;

    if (!userQuery || typeof userQuery !== 'string') {
      return new Response(JSON.stringify({ error: 'Invalid query' }), { status: 400 });
    }

    // Stage 1: Embed the query
    const embeddingResponse = await openai.embeddings.create({
      model: 'text-embedding-3-small',
      input: userQuery,
    });
    const queryVector = embeddingResponse.data[0].embedding;

    // Stage 2: Query Weaviate
    const searchResult = await weaviateClient.graphql
      .get()
      .withClassName('MyDocument')
      .withFields('content title url')
      .withNearVector({ vector: queryVector })
      .withLimit(5)
      .do();
      
    const contextSources = searchResult.data.Get.MyDocument;
    const contextText = contextSources.map(
      (doc: any) => `<doc>\n${doc.content}\n</doc>`
    ).join('\n');

    // Stage 3: Call OpenAI with streaming enabled
    const response = await openai.chat.completions.create({
      model: 'gpt-4o',
      stream: true,
      messages: [
          {
              role: 'system',
              content: `You are an expert Q&A system. Your responses must be grounded in the provided context documents. Answer the user's question using only the information from the following documents. Cite the sources you use. Context: ${contextText}`,
          },
          ...messages,
      ],
    });

    // Create a stream from the OpenAI response.
    const stream = OpenAIStream(response);

    // Create our custom transformer to inject the sources.
    const customTransformer = createCustomTransformer(contextSources.map((doc:any) => ({ title: doc.title, url: doc.url })));
    
    // Pipe the OpenAI stream through our custom transformer.
    const finalStream = stream.pipeThrough(customTransformer);

    // Return the streaming response.
    return new StreamingTextResponse(finalStream);

  } catch (error: any) {
    console.error('Error in streaming RAG pipeline:', error);
    // Be careful not to leak sensitive error details to the client
    return new Response(JSON.stringify({ error: 'An unexpected error occurred.' }), { status: 500 });
  }
}

This backend code solves the latency problem by restructuring the data flow. However, it introduces a new challenge on the frontend: how to consume and manage the state of this complex, multi-part stream in a clean, robust, and performant way.

Frontend State Management: Why Zustand Was The Right Choice

A naive frontend implementation using useState would quickly devolve into an unmanageable mess of state variables: isLoadingSources, isStreamingAnswer, sources, answer, error, etc. The logic for parsing the stream and updating these states would be entangled within a useEffect hook, leading to bugs and re-render issues.

We chose Zustand for several key reasons in this high-performance context:

  • Minimal Boilerplate: No providers, actions, or reducers in the traditional sense. It’s a simple hook that gives you state and setters.
  • Performance: Zustand updates components by subscription. If a component only subscribes to state.sources, it will not re-render when state.answer is updated hundreds of times by the streaming tokens. This is critical for preventing UI jank.
  • Decoupled Logic: The complex logic for fetching and parsing the stream can live entirely inside the store’s actions, completely separate from the React components. Components become simple, declarative consumers of state.

Here is the design of our RAG store. It’s the brain of the frontend operation.

// store/rag-store.ts
import { create } from 'zustand';

// Define the shape of our retrieved source documents
interface Source {
  title: string;
  url: string;
}

// Define the possible states of our RAG process
type RagStatus = 'idle' | 'retrieving' | 'streaming' | 'done' | 'error';

// Define the complete state shape for the store
interface RagState {
  // Input
  query: string;
  // Status and data
  status: RagStatus;
  sources: Source[];
  answer: string;
  error: string | null;
  // Actions
  startRAG: (query: string) => Promise<void>;
  reset: () => void;
}

const initialState = {
  query: '',
  status: 'idle' as RagStatus,
  sources: [],
  answer: '',
  error: null,
};

export const useRagStore = create<RagState>((set, get) => ({
  ...initialState,

  reset: () => set(initialState),

  startRAG: async (query: string) => {
    // 1. Immediately update state to reflect the new process starting
    set({ 
      query, 
      status: 'retrieving', 
      answer: '', 
      sources: [], 
      error: null 
    });

    try {
      const response = await fetch('/api/rag-streaming', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        // We send a minimal "messages" array compatible with the Vercel AI SDK
        body: JSON.stringify({ messages: [{ role: 'user', content: query }] }),
      });

      if (!response.ok || !response.body) {
        const errorText = await response.text();
        throw new Error(`Request failed: ${response.status} ${errorText}`);
      }

      // 2. Prepare to process the stream
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      const sourceDataPrefix = 'SOURCES::';
      let buffer = '';

      // Main stream processing loop
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        
        // This is the critical part for handling our custom protocol.
        // We look for the newline that terminates our JSON blob.
        const endOfSourcesIndex = buffer.indexOf('\n');
        if (get().status === 'retrieving' && endOfSourcesIndex !== -1) {
          const rawSources = buffer.substring(sourceDataPrefix.length, endOfSourcesIndex);
          try {
            const parsedSources = JSON.parse(rawSources);
            set({ sources: parsedSources, status: 'streaming' });
          } catch (e) {
            console.error("Failed to parse sources JSON:", e);
            // Handle malformed JSON from the stream
          }
          // Remove the processed part from the buffer
          buffer = buffer.substring(endOfSourcesIndex + 1);
        }

        // If we are in the streaming state, any new data in the buffer is part of the answer.
        if (get().status === 'streaming') {
            set((state) => ({ answer: state.answer + buffer }));
            buffer = ''; // Clear buffer after appending
        }
      }

      // 3. Finalize the state
      set({ status: 'done' });
    } catch (error: any) {
      console.error('RAG stream failed:', error);
      set({ status: 'error', error: error.message || 'An unknown error occurred.' });
    }
  },
}));

The logic is now perfectly encapsulated. The startRAG action handles the entire lifecycle: setting initial state, fetching, reading the stream, parsing our custom SOURCES:: prefix, updating state incrementally, and handling finalization and errors.

The Declarative UI Component

With the complex state logic handled by Zustand, the React component becomes remarkably clean and declarative. It’s solely responsible for rendering the current state, not managing it.

// components/RAGInterface.tsx
import React, { useState } from 'react';
import { useRagStore } from '../store/rag-store';

const RAGInterface = () => {
  const [inputValue, setInputValue] = useState('');
  
  // Select only the state you need. This optimizes re-renders.
  const { status, sources, answer, error, startRAG, reset } = useRagStore(
    (state) => ({
      status: state.status,
      sources: state.sources,
      answer: state.answer,
      error: state.error,
      startRAG: state.startRAG,
      reset: state.reset,
    })
  );

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (inputValue.trim() && status !== 'retrieving' && status !== 'streaming') {
      startRAG(inputValue);
    }
  };
  
  const isProcessing = status === 'retrieving' || status === 'streaming';

  return (
    <div className="rag-container">
      <form onSubmit={handleSubmit}>
        <input
          type="text"
          value={inputValue}
          onChange={(e) => setInputValue(e.target.value)}
          placeholder="Ask a question about your documents..."
          disabled={isProcessing}
        />
        <button type="submit" disabled={isProcessing}>
          {isProcessing ? 'Processing...' : 'Ask'}
        </button>
        <button type="button" onClick={reset} disabled={isProcessing}>
          Clear
        </button>
      </form>

      <div className="results-panel">
        {status === 'retrieving' && <div className="spinner">Searching sources...</div>}
        
        {sources.length > 0 && (
          <div className="sources-section">
            <h3>Sources</h3>
            <ul>
              {sources.map((source, index) => (
                <li key={index}>
                  <a href={source.url} target="_blank" rel="noopener noreferrer">
                    {source.title}
                  </a>
                </li>
              ))}
            </ul>
          </div>
        )}

        {answer && (
          <div className="answer-section">
            <h3>Answer</h3>
            <p>{answer}</p>
          </div>
        )}

        {status === 'error' && (
          <div className="error-message">
            <p>Error: {error}</p>
          </div>
        )}
      </div>
    </div>
  );
};

export default RAGInterface;

This component is simple to read and reason about. It doesn’t know or care about ReadableStream or JSON parsing. It just subscribes to the state from useRagStore and renders the appropriate UI for the current status. This separation of concerns is critical for building maintainable, complex applications.

Architectural Flow Diagram

The final, performant architecture can be visualized as follows:

sequenceDiagram
    participant Client
    participant VercelEdge as Vercel Edge Function
    participant WeaviateDB as Weaviate
    participant LLM_API as OpenAI API

    Client->>+VercelEdge: POST /api/rag-streaming (query)
    VercelEdge->>+WeaviateDB: Vector Search Request
    WeaviateDB-->>-VercelEdge: Return Context Documents
    Note over VercelEdge: Starts streaming response to Client
    VercelEdge->>Client: Stream Chunk 1 (JSON of Sources)
    Client->>Client: Parse Sources & Render UI
    
    VercelEdge->>+LLM_API: Chat Completion Request (stream=true)
    LLM_API-->>VercelEdge: Stream Token 1
    VercelEdge->>Client: Stream Chunk 2 (Token 1)
    LLM_API-->>VercelEdge: Stream Token 2
    VercelEdge->>Client: Stream Chunk 3 (Token 2)
    Note over Client: Appends tokens to UI in real-time
    
    loop Until LLM finishes
        LLM_API-->>VercelEdge: Stream Token N
        VercelEdge->>Client: Stream Chunk N+1 (Token N)
    end
    
    LLM_API-->>-VercelEdge: End of Stream
    VercelEdge-->>-Client: Close Stream

Lingering Issues and Future Considerations

This architecture is a significant improvement, but it’s not without its own set of trade-offs and potential issues in a large-scale production environment.

First, connection management in an edge environment is a persistent challenge. Creating a new Weaviate client instance on every invocation adds overhead. While edge runtimes are getting better, they lack the mature connection pooling mechanisms of traditional long-running servers. Investigating HTTP-based connection options or dedicated serverless drivers for Weaviate would be a necessary next step to reduce connection setup latency on every single request.

Second, our custom stream protocol (SOURCES:: followed by JSON) is effective but brittle. A malformed chunk or a network interruption could break the client-side parser. A more robust implementation would use Server-Sent Events (SSE), which provides a standardized format for sending named events and data payloads, along with built-in support for reconnection and error handling.

Finally, while the frontend feels fast, the backend compute is not cancellable. If a user navigates away mid-stream, the Vercel Function will continue its execution, complete the LLM call, and incur the associated costs. Implementing a proper cancellation mechanism using AbortController on the client and propagating that signal to the Edge Function is essential for optimizing resource usage and cost in a production system.


  TOC