Integrating a Go gRPC Streaming Service with a Quarkus Middleware Layer into a Laravel Application


The initial requirement seemed straightforward: add a real-time, AI-powered document analysis feature to our existing Laravel monolith. The user would submit a large block of text, and an LLM would provide a streaming, “typewriter-style” analysis. Our first proof-of-concept, a simple synchronous Guzzle request from PHP to a Python Flask service hosting the model, collapsed immediately under load testing. PHP-FPM workers were tied up for the entire duration of the LLM response, sometimes exceeding 30 seconds. The server ran out of workers, and the user experience was abysmal. It was clear that the synchronous, request-response model of our core application was fundamentally incompatible with long-lived, streaming I/O.

This led to a complete architectural rethink. Instead of forcing our Laravel application to handle a workload it wasn’t designed for, we decided to build a specialized, high-performance data plane to sit alongside it. The Laravel application would remain the user-facing orchestrator and source of business context, but the heavy lifting of real-time streaming would be offloaded to services built with more suitable technologies.

Our final architecture consists of three distinct backend services, each with a specific responsibility:

  1. gRPC-Go Service (The Streamer): A minimal, high-performance Go service responsible for the single task of communicating with the LLM API and streaming its token-by-token response over a gRPC connection. Go’s concurrency model and low overhead were ideal for this I/O-bound task.
  2. Quarkus Service (The Middleware): A Java service acting as a crucial intermediary. It consumes the raw gRPC stream from the Go service, applies business rules, handles authentication/authorization logic passed from Laravel, and performs logging. Quarkus was chosen for its combination of high performance (GraalVM native compilation) and access to the mature Java ecosystem for complex enterprise logic.
  3. Laravel Application (The Orchestrator): Our existing PHP application. It handles user authentication, initiates the analysis request to the Quarkus service, and, most importantly, acts as a transparent proxy, streaming the response from Quarkus directly to the user’s browser using Server-Sent Events (SSE).

This distributed approach created a clear separation of concerns, allowing us to use the best tool for each part of the job.

sequenceDiagram
    participant MUI as Material-UI Frontend
    participant LV as Laravel Orchestrator
    participant QK as Quarkus Middleware
    participant GO as gRPC-Go Streamer
    participant LLM as LLM API

    MUI->>LV: POST /analyze (document text)
    LV->>QK: POST /api/process (with JWT, text)
    Note over LV,QK: Laravel makes a streaming HTTP request
    QK->>GO: gRPC call: AnalyzeDocumentStream(text)
    GO->>LLM: Request stream completion
    LLM-->>GO: Stream of tokens...
    GO-->>QK: gRPC stream of AnalyzeResponse chunks
    QK-->>LV: HTTP stream of Server-Sent Events
    LV-->>MUI: HTTP stream of Server-Sent Events

The Core: The gRPC-Go Streaming Service

The foundation of this entire pipeline is the Go service. Its only job is to be an efficient bridge between a third-party LLM API and our internal systems via gRPC.

First, we define the gRPC contract in a .proto file. This is the source of truth for communication between the Go service and its clients. Using a server-side stream (stream AnalyzeResponse) is the key to the entire design.

llmstream/llm.proto:

syntax = "proto3";

package llmstream;

option go_package = "github.com/your-org/llmstream/gen/go";

service LlmStreamer {
  // Initiates a streaming analysis of the provided document.
  // The server will stream back analysis chunks as they are generated.
  rpc AnalyzeDocumentStream(AnalyzeRequest) returns (stream AnalyzeResponse) {}
}

message AnalyzeRequest {
  string request_id = 1;
  string document_content = 2;
}

message AnalyzeResponse {
  string request_id = 1;
  string content_chunk = 2;
  // In a real-world scenario, you might add metadata like token counts, timing, etc.
  // bool is_final_chunk = 3;
}

The server implementation uses Go’s channels and goroutines to manage the flow of data. For this example, we’ll mock the LLM client, but in a production system, this would be a call to the OpenAI, Anthropic, or another vendor’s streaming API client.

cmd/server/main.go:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"strings"
	"time"

	pb "github.com/your-org/llmstream/gen/go"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

type llmStreamerServer struct {
	pb.UnimplementedLlmStreamerServer
}

// mockLlmStream simulates receiving tokens from a real LLM API.
// It sends words from a predefined text to a channel.
func mockLlmStream(ctx context.Context, contentChan chan<- string) {
	defer close(contentChan) // Ensure channel is closed when done.
	
	// A more realistic simulation would involve an actual HTTP stream.
	mockResponse := "This is a detailed analysis of the provided document. The key findings suggest a strong correlation between market trends and quarterly performance. Further investigation into specific sectors is recommended for a more granular understanding."
	words := strings.Fields(mockResponse)

	for _, word := range words {
		select {
		case <-ctx.Done():
			// Client has disconnected or context was cancelled.
			log.Println("Client disconnected, stopping stream.")
			return
		case contentChan <- word + " ":
			// Simulate network latency between tokens.
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func (s *llmStreamerServer) AnalyzeDocumentStream(req *pb.AnalyzeRequest, stream pb.LlmStreamer_AnalyzeDocumentStreamServer) error {
	log.Printf("Received analysis request %s", req.RequestId)

	if req.DocumentContent == "" {
		return status.Errorf(codes.InvalidArgument, "DocumentContent cannot be empty")
	}

	// The context of the stream is used to detect client cancellation.
	ctx := stream.Context()
	contentChan := make(chan string)

	// Start the mock LLM stream in a separate goroutine.
	go mockLlmStream(ctx, contentChan)

	// Read from the channel and send to the gRPC stream.
	for chunk := range contentChan {
		response := &pb.AnalyzeResponse{
			RequestId:    req.RequestId,
			ContentChunk: chunk,
		}
		if err := stream.Send(response); err != nil {
			log.Printf("Error sending to stream for request %s: %v", req.RequestId, err)
			// This error typically means the client has closed the connection.
			return status.Errorf(codes.Canceled, "Stream send failed: %v", err)
		}
	}

	log.Printf("Finished streaming for request %s", req.RequestId)
	return nil
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	pb.RegisterLlmStreamerServer(s, &llmStreamerServer{})

	log.Println("gRPC server listening on :50051")
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

A common pitfall here is improper context handling. The stream.Context() is crucial. If the downstream client (Quarkus) cancels the request, this context will be marked as “Done,” allowing us to tear down the upstream connection to the LLM and release resources, preventing goroutine leaks.

The Bridge: The Quarkus Middleware Service

This service acts as the “smart” layer. It translates a RESTful request from Laravel into a gRPC stream request, handles business logic, and streams the response back over HTTP. Quarkus’s native support for gRPC clients and its reactive framework (Mutiny) make this surprisingly elegant.

First, the necessary dependencies in pom.xml:

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-resteasy-reactive</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-grpc</artifactId>
    </dependency>
    <!-- We need this to return a reactive stream as SSE -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
    </dependency>
</dependencies>

The configuration in application.properties points to our Go service. In a production environment like Kubernetes, this would be a headless service DNS name.

src/main/resources/application.properties:

# Configure the gRPC client to connect to our Go service
quarkus.grpc.client.llm-streamer.host=localhost
quarkus.grpc.client.llm-streamer.port=50051
quarkus.grpc.client.llm-streamer.plain-text=true # Use false in production with TLS

The core of the service is a JAX-RS resource that exposes a POST endpoint. It accepts a JSON payload, injects the gRPC client, and returns a Multi<String>, which is Quarkus’s reactive type for a stream of items. By setting the media type to text/event-stream, RESTEasy Reactive automatically handles formatting this as a Server-Sent Event stream.

src/main/java/org/acme/AnalysisResource.java:

package org.acme;

import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Multi;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.logging.Logger;

import java.util.UUID;
import llmstream.Llm;
import llmstream.LlmStreamer;
import llmstream.Llm.AnalyzeRequest;

@Path("/api")
public class AnalysisResource {

    private static final Logger LOG = Logger.getLogger(AnalysisResource.class);

    // Quarkus dependency injection for the gRPC client stub
    @GrpcClient("llm-streamer")
    LlmStreamer llmStreamer;

    @POST
    @Path("/process")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Multi<String> processDocument(AnalysisInput input) {
        String requestId = UUID.randomUUID().toString();
        LOG.infof("Starting analysis for requestId: %s", requestId);

        AnalyzeRequest request = Llm.AnalyzeRequest.newBuilder()
                .setRequestId(requestId)
                .setDocumentContent(input.documentText())
                .build();

        // This is the reactive magic. The gRPC client returns a Multi (a stream).
        // We don't block or wait for the full response. We return the stream itself.
        return llmStreamer.analyzeDocumentStream(request)
                .onItem().transform(response -> {
                    // Here, we can add business logic. For example, check for sensitive data
                    // or enrich the chunk before sending it downstream.
                    // For this example, we just log and forward.
                    LOG.infof("Received chunk for %s", response.getRequestId());
                    return response.getContentChunk();
                })
                .onFailure().invoke(throwable -> LOG.errorf(throwable, "gRPC stream failed for %s", requestId))
                .onCompletion().invoke(() -> LOG.infof("Stream completed for %s", requestId));
    }

    // A simple record to represent the incoming JSON payload.
    public record AnalysisInput(String documentText) {}
}

This non-blocking, reactive approach is critical. The Quarkus service consumes minimal threads while waiting for I/O from the gRPC stream. It simply processes each chunk as it arrives and pushes it down the HTTP response stream, creating an efficient pipeline. A common mistake is to try and collect the stream into a list (.collect().asList()) before returning, which would defeat the entire purpose of streaming.

The Glue: The Laravel Orchestrator

Laravel’s role is to bridge the secure, authenticated user session with the backend processing pipeline. It validates the user request and then makes a streaming request to the Quarkus service. The most important part of the Laravel implementation is using a StreamedResponse. This avoids buffering the entire response from Quarkus in PHP’s memory, which would reintroduce our original problem.

First, a route in routes/web.php to handle the frontend’s request:

// routes/web.php
use Illuminate\Support\Facades\Route;
use App\Http\Controllers\AnalysisController;

Route::post('/analyze', [AnalysisController::class, 'streamAnalysis']);

The controller uses Guzzle’s stream option. It opens a connection to the Quarkus service and then reads from the response stream chunk by chunk, echoing each chunk to the browser immediately.

app/Http/Controllers/AnalysisController.php:

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException;
use Illuminate\Support\Facades\Log;

class AnalysisController extends Controller
{
    public function streamAnalysis(Request $request)
    {
        $request->validate([
            'document_text' => 'required|string|max:10000',
        ]);

        $documentText = $request->input('document_text');
        $quarkusServiceUrl = config('services.quarkus.url') . '/api/process';

        // In a real project, you'd pass authentication details (e.g., a JWT)
        // from the user's session to the Quarkus service.
        $client = new Client();

        $response = new StreamedResponse(function () use ($client, $quarkusServiceUrl, $documentText) {
            
            // Set headers for Server-Sent Events
            header('Content-Type: text/event-stream');
            header('Cache-Control: no-cache');
            header('Connection: keep-alive');
            
            // This is essential. Without it, Nginx or Apache might buffer the output.
            while (ob_get_level() > 0) {
                ob_end_flush();
            }
            flush();

            try {
                $guzzleResponse = $client->post($quarkusServiceUrl, [
                    'json' => ['documentText' => $documentText],
                    'stream' => true, // This is the critical option for Guzzle
                ]);

                $body = $guzzleResponse->getBody();

                // Read from the stream line by line as it comes from Quarkus
                while (!$body->eof()) {
                    // SSE data should be prefixed with "data: " and end with "\n\n"
                    $chunk = $body->read(1024); // Read in small chunks
                    if (!empty($chunk)) {
                        echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
                        flush();
                    }
                }
            } catch (RequestException $e) {
                Log::error('Failed to connect to Quarkus service: ' . $e->getMessage());
                $errorData = json_encode(['error' => 'The analysis service is currently unavailable.']);
                echo "event: error\n";
                echo "data: " . $errorData . "\n\n";
                flush();
            }
        });
        
        // Prevent PHP from timing out on this long-running script
        set_time_limit(0);

        return $response;
    }
}

A key detail is set_time_limit(0) and the explicit calls to flush(). Without these, PHP’s execution environment or a reverse proxy like Nginx might buffer the response, delaying the data from reaching the client and ruining the real-time effect. Sending the data in the SSE format (data: ...\n\n) is also required for the browser to correctly interpret it.

The Interface: Material-UI Frontend

Finally, the frontend uses the standard EventSource browser API to consume the SSE stream from our Laravel endpoint. This API is purpose-built for this kind of one-way server-to-client communication and is simpler than WebSockets.

Here’s a sample React component using Material-UI. It maintains the streaming content in its state, appending new chunks as they arrive.

src/components/AnalysisComponent.jsx:

import React, { useState, useRef, useEffect } from 'react';
import { TextField, Button, Box, Paper, Typography, CircularProgress } from '@mui/material';

export default function AnalysisComponent() {
    const [documentText, setDocumentText] = useState('');
    const [analysisResult, setAnalysisResult] = useState('');
    const [isLoading, setIsLoading] = useState(false);
    const eventSourceRef = useRef(null);

    // Cleanup effect to close the connection when the component unmounts
    useEffect(() => {
        return () => {
            if (eventSourceRef.current) {
                eventSourceRef.current.close();
            }
        };
    }, []);

    const handleSubmit = (event) => {
        event.preventDefault();
        if (isLoading) return;

        setAnalysisResult('');
        setIsLoading(true);

        // Ensure any previous connection is closed
        if (eventSourceRef.current) {
            eventSourceRef.current.close();
        }

        const es = new EventSource('/analyze', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                // Important: Add CSRF token for Laravel
                'X-CSRF-TOKEN': document.querySelector('meta[name="csrf-token"]').getAttribute('content')
            },
            body: JSON.stringify({ document_text: documentText })
        });
        
        eventSourceRef.current = es;

        es.onmessage = (e) => {
            const data = JSON.parse(e.data);
            if (data.content) {
                setAnalysisResult(prev => prev + data.content);
            }
        };

        es.onerror = (err) => {
            console.error('EventSource failed:', err);
            // Handle error, maybe show a message to the user
            setIsLoading(false);
            es.close();
        };
        
        // The 'open' event is not always reliable for knowing when the stream is done.
        // A common pattern is for the server to send a special "end" event.
        // For now, we'll just stop the loader on error or when the component unmounts.
        // A better implementation would involve a final message from the server.
        // For simplicity, we'll just assume it ends when the connection closes.
        es.addEventListener('close', () => {
             setIsLoading(false);
             es.close();
        });
    };

    return (
        <Box sx={{ p: 2 }}>
            <Typography variant="h5" gutterBottom>Real-Time Document Analysis</Typography>
            <form onSubmit={handleSubmit}>
                <TextField
                    fullWidth
                    multiline
                    rows={10}
                    variant="outlined"
                    label="Document Text"
                    value={documentText}
                    onChange={(e) => setDocumentText(e.target.value)}
                    margin="normal"
                />
                <Button 
                    type="submit" 
                    variant="contained" 
                    disabled={isLoading || !documentText}
                >
                    {isLoading ? <CircularProgress size={24} /> : 'Analyze'}
                </Button>
            </form>
            <Paper elevation={3} sx={{ mt: 3, p: 2, minHeight: '200px', whiteSpace: 'pre-wrap', fontFamily: 'monospace' }}>
                {analysisResult}
                {isLoading && !analysisResult && <Typography color="textSecondary">Waiting for analysis...</Typography>}
            </Paper>
        </Box>
    );
}

A mistake to avoid in the frontend is forgetting to handle cleanup. The useEffect return function ensures that if the user navigates away from the page, the EventSource connection is properly terminated. This prevents memory leaks and unnecessary open connections on the server.

This polyglot architecture, while more complex to set up than a monolithic approach, solved our performance issues entirely. It allows each component to scale independently and leverages the strengths of each language and framework. The system is resilient, performant, and maintains a clean separation between the user-facing business application and the high-throughput data processing pipeline.

The solution isn’t without its own set of trade-offs. The operational complexity is higher, requiring deployment and monitoring for three separate services. Propagating a distributed trace context across PHP, Java, and Go for observability requires a disciplined approach, likely using OpenTelemetry standards throughout the stack. Furthermore, the current Server-Sent Events mechanism is unidirectional; if the user needed to send data back to the server mid-stream (e.g., a “stop generation” command), the architecture would need to be evolved to use WebSockets, adding another layer of state management complexity to the Laravel orchestrator.


  TOC