Automating Graph-Enhanced RAG Pipeline Deployments Using Spinnaker ArangoDB and NumPy


The initial proof-of-concept for our Retrieval-Augmented Generation system worked well enough on a static dataset. A simple vector similarity search over document chunks was sufficient. But moving this to a production environment, with a corpus of millions of interconnected legal documents updated hourly, exposed a fundamental flaw in the design. Retrieving isolated text chunks based purely on semantic similarity was missing the essential context encoded in document citations, amendments, and legal precedents. A query about a specific regulation might return the correct text snippet but fail to include the critical amendment that supersedes it, simply because the amendment’s text wasn’t a close vector match. This wasn’t a feature gap; it was a source of dangerously incorrect outputs. The production reality demanded a system that understood not just content, but also context and relationships.

Our revised architecture centered on a graph model. Documents, sections, and amendments would be vertices. Citations and relationships would be edges. This allows a query to first find semantically relevant starting points via vector search, then traverse the graph to gather a richer, more complete context. The technical pain point shifted from model accuracy to operational complexity. How do we reliably and repeatedly update this complex data structure—which involves embedding generation, graph construction, and data loading—and then safely deploy the query service that depends on it? A manual, multi-step process involving data scientists and operations engineers was a recipe for disaster. We needed a fully automated, single-source-of-truth pipeline.

Technology selection was driven by this operational requirement. For the database, we needed a system that could handle both graph traversal and vector storage efficiently, without the synchronization nightmare of two separate databases. ArangoDB, with its native multi-model capabilities, was the logical choice. Its Arango Query Language (AQL) could express complex graph traversals and data manipulations in a single, atomic transaction. For the heavy numerical lifting—processing embeddings, running custom similarity metrics, and eventually implementing a reranking algorithm—NumPy was the non-negotiable standard in the Python ecosystem. The final, critical piece was the orchestration engine. We needed something more powerful than a simple CI script. We needed to manage a sequence of heterogeneous tasks: running a batch processing job, executing database migrations, deploying a Kubernetes service, and inserting manual verification gates. Spinnaker, with its extensible pipeline model and deep integration with Kubernetes, was selected to be the backbone of this MLOps process.

Phase 1: The Graph Data Ingestion and Processing Core

The foundation of the entire system is the Python service responsible for processing raw documents, generating embeddings, and structuring the data for ArangoDB. In a real-world project, this would be a robust, fault-tolerant application, but for clarity, we’ll focus on the core logic. This component is designed to be executed as a containerized job orchestrated by Spinnaker.

The first step is establishing a clear data model for ArangoDB. We’ll use two vertex collections (documents, chunks) and one edge collection (contains).

# graph_processor/arangodb_connector.py
import os
import logging
from arango import ArangoClient
from arango.exceptions import DocumentInsertError

# Configure logging for production
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ArangoDBManager:
    """
    Manages connection and schema for our RAG knowledge graph.
    Designed to be idempotent for safe re-running in a pipeline.
    """
    def __init__(self):
        self.host = os.environ.get("ARANGO_HOST", "http://localhost:8529")
        self.user = os.environ.get("ARANGO_USER", "root")
        self.password = os.environ.get("ARANGO_PASSWORD")
        
        if not self.password:
            raise ValueError("ARANGO_PASSWORD environment variable not set.")
            
        self.client = ArangoClient(hosts=self.host)
        self.db = self._connect_and_get_db()
        
        self.doc_collection_name = "documents"
        self.chunk_collection_name = "chunks"
        self.edge_collection_name = "contains"

    def _connect_and_get_db(self):
        try:
            sys_db = self.client.db('_system', username=self.user, password=self.password)
            db_name = "LegalRAG"
            if not sys_db.has_database(db_name):
                logging.info(f"Database '{db_name}' not found. Creating...")
                sys_db.create_database(db_name)
            return self.client.db(db_name, username=self.user, password=self.password)
        except Exception as e:
            logging.error(f"Failed to connect to ArangoDB: {e}")
            raise

    def setup_schema(self):
        """
        Ensures all necessary collections and indexes exist.
        """
        if not self.db.has_collection(self.doc_collection_name):
            logging.info(f"Creating document collection: {self.doc_collection_name}")
            self.db.create_collection(self.doc_collection_name)
        
        if not self.db.has_collection(self.chunk_collection_name):
            logging.info(f"Creating chunk collection: {self.chunk_collection_name}")
            chunks = self.db.create_collection(self.chunk_collection_name)
            # This is critical for performance. The vector index is a new feature,
            # but for demonstration we use a persistent index on a property
            # that might be used in pre-filtering. A true vector index would be
            # configured here.
            chunks.add_persistent_index(fields=["doc_key"], unique=False)
        
        if not self.db.has_collection(self.edge_collection_name):
            logging.info(f"Creating edge collection: {self.edge_collection_name}")
            self.db.create_collection(self.edge_collection_name, edge=True)
        
        logging.info("Schema setup complete.")

    def insert_graph_data(self, document_key, document_text, chunks_data):
        """
        Inserts a document and its chunks into the database.
        
        Args:
            document_key (str): A unique identifier for the document.
            document_text (str): The full text of the document.
            chunks_data (list of dicts): Each dict contains 'chunk_text' and 'embedding'.
        """
        docs_coll = self.db.collection(self.doc_collection_name)
        chunks_coll = self.db.collection(self.chunk_collection_name)
        edges_coll = self.db.collection(self.edge_collection_name)

        try:
            # Insert document vertex
            if not docs_coll.has(document_key):
                doc_meta = docs_coll.insert({'_key': document_key, 'text': document_text})
                doc_id = doc_meta['_id']
            else:
                doc_id = f"{self.doc_collection_name}/{document_key}"
                logging.warning(f"Document {document_key} already exists. Skipping document insertion.")

            # Batch insert chunk vertices
            chunk_docs_to_insert = []
            for i, chunk in enumerate(chunks_data):
                chunk_docs_to_insert.append({
                    '_key': f"{document_key}_chunk_{i}",
                    'doc_key': document_key,
                    'text': chunk['chunk_text'],
                    'embedding': chunk['embedding'].tolist() # NumPy array must be converted to list for JSON
                })
            
            chunk_insert_results = chunks_coll.insert_many(chunk_docs_to_insert, overwrite=True)
            
            # Batch insert edges
            edges_to_insert = []
            for result in chunk_insert_results:
                if not result.get('error', False):
                    chunk_id = result['_id']
                    edges_to_insert.append({'_from': doc_id, '_to': chunk_id})
            
            if edges_to_insert:
                edges_coll.insert_many(edges_to_insert, overwrite=True)
            
            logging.info(f"Successfully inserted graph data for document {document_key}.")

        except DocumentInsertError as e:
            logging.error(f"Failed to insert data for {document_key}. Error: {e}")
        except Exception as e:
            logging.error(f"An unexpected error occurred during insertion for {document_key}: {e}")
            raise

The core processing logic leverages NumPy for efficient vector manipulation. The pitfall here is memory management. Loading an entire dataset and its embeddings into memory is not scalable. A real-world project would use generators and batch processing to handle data streams efficiently.

# graph_processor/processing.py
import logging
import numpy as np
from sentence_transformers import SentenceTransformer
from graph_processor.arangodb_connector import ArangoDBManager

# It's good practice to initialize models once and reuse them.
# In a containerized job, this happens at startup.
MODEL_NAME = 'all-MiniLM-L6-v2'
logging.info(f"Loading sentence transformer model: {MODEL_NAME}")
model = SentenceTransformer(MODEL_NAME)
EMBEDDING_DIM = model.get_sentence_embedding_dimension()


def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
    """A simple text chunking function."""
    tokens = text.split()
    chunks = []
    for i in range(0, len(tokens), chunk_size - overlap):
        chunks.append(" ".join(tokens[i:i + chunk_size]))
    return chunks

def process_and_load_document(doc_key: str, doc_text: str, db_manager: ArangoDBManager):
    """
    Main processing function for a single document.
    1. Chunks text.
    2. Generates embeddings in a batch.
    3. Uses NumPy for any vector normalization or manipulation.
    4. Loads data into ArangoDB.
    """
    try:
        logging.info(f"Processing document: {doc_key}")
        text_chunks = chunk_text(doc_text)
        if not text_chunks:
            logging.warning(f"Document {doc_key} produced no chunks. Skipping.")
            return

        # Batch embedding generation is far more efficient
        embeddings = model.encode(text_chunks, convert_to_numpy=True, show_progress_bar=False)
        
        # Example of a NumPy operation: L2 normalization
        # This is crucial for ensuring cosine similarity calculations are accurate.
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        # Avoid division by zero for empty vectors
        norms[norms == 0] = 1e-9
        normalized_embeddings = embeddings / norms

        chunks_data = []
        for i, chunk_text in enumerate(text_chunks):
            chunks_data.append({
                'chunk_text': chunk_text,
                'embedding': normalized_embeddings[i]
            })

        db_manager.insert_graph_data(doc_key, doc_text, chunks_data)

    except Exception as e:
        logging.error(f"Failed to process document {doc_key}: {e}")
        # In a real pipeline, we might push this to a dead-letter queue.
        # For now, we just log and continue.

# Main execution block for the container job
if __name__ == "__main__":
    # In a real job, this list of documents would come from a message queue,
    # a file in S3, or a database query.
    sample_documents = {
        "REG-101": "The quick brown fox jumps over the lazy dog. This is section one of the regulation.",
        "REG-102": "A lazy dog was jumped over by a quick brown fox. This is the first article of a new law."
    }

    try:
        db_manager = ArangoDBManager()
        db_manager.setup_schema()

        for key, text in sample_documents.items():
            process_and_load_document(key, text, db_manager)
        
        logging.info("Processing job finished successfully.")
    except Exception as e:
        logging.critical(f"Data processing job failed fatally: {e}")
        exit(1)

This code is packaged into a Docker container. The Dockerfile is straightforward, installing Python dependencies and setting the entry point. The key is that it’s configured entirely via environment variables (ARANGO_HOST, ARANGO_PASSWORD), making it perfectly suited for Spinnaker’s Run Job stage.

Phase 2: The Spinnaker Orchestration Pipeline

The core challenge is translating the multi-step process into a declarative Spinnaker pipeline. The goal is a “push-button” operation that takes new data from a source (e.g., a git repository), processes it, updates the database, and deploys the corresponding application update, all with safety checks.

The pipeline looks like this:

graph TD
    A[Trigger: Git Commit to Data Repo] --> B{Run K8s Job: Process Data};
    B --> C{Manual Judgment: Verify Staging Data};
    C -- Approve --> D{Run K8s Job: Promote Data};
    D --> E[Deploy Canary: Query Service];
    E --> F{Automated Canary Analysis};
    F -- Success --> G[Deploy Production: Query Service];
    F -- Failure --> H[Rollback Canary];
    C -- Reject --> I[Fail Pipeline];

Here’s the breakdown and snippets of the Spinnaker pipeline JSON.

Stage 1: Configuration & Trigger

This stage defines the trigger (e.g., a commit to the main branch of a specific git repository containing the raw document data) and any pipeline parameters.

Stage 2: Process Data into Staging

This is the first active stage. We use Spinnaker’s Run Job (Manifest) stage to run our Docker container on a Kubernetes cluster. A common mistake is to process data directly into the live collections. This is extremely risky. Instead, we process data into a new, versioned set of collections (e.g., chunks_v2, documents_v2).

A simplified Kubernetes Job manifest embedded in the Spinnaker pipeline:

# spinnaker-job-manifest.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: rag-data-processor-{{- T(java.util.UUID).randomUUID().toString().substring(0,8) }}
  namespace: data-processing
spec:
  template:
    spec:
      containers:
      - name: processor
        image: my-registry/rag-graph-processor:latest # The image we built
        env:
        - name: ARANGO_HOST
          value: "http://arangodb-coordinator.database.svc.cluster.local:8529"
        - name: ARANGO_USER
          value: "processor"
        - name: ARANGO_PASSWORD
          valueFrom:
            secretKeyRef:
              name: arango-db-secrets
              key: password
        # We would pass the source of the new data here, e.g., git commit hash
        # or S3 bucket path as another environment variable.
      restartPolicy: Never
  backoffLimit: 1

Stage 3: Manual Judgment

Before touching the production data, the pipeline pauses. This is a critical safety gate. An engineer receives a notification and is expected to run validation queries against the new _v2 collections in ArangoDB. They might check document counts, graph integrity, or run sample queries to ensure the processing was successful. The pitfall here is making this step too onerous. The goal is a quick sanity check, not a full QA cycle.

Stage 4: Promote Data to Production

Upon approval, another Run Job stage is triggered. This job doesn’t process data; it executes a script that performs the “promotion.” The most reliable way to do this in ArangoDB is by renaming collections, which is a near-atomic operation.

Example promotion script (promote.py run by the job):

# promote_data/promote.py
import logging
from arango import ArangoClient
import os

# Assume new collections are named with a suffix, e.g., from the git commit hash
VERSION_SUFFIX = os.environ.get("DATA_VERSION") # Passed by Spinnaker

# ... ArangoDBManager setup as before ...

def promote_collections(db_manager, suffix):
    db = db_manager.db
    collections_to_promote = {
        f"documents_{suffix}": "documents",
        f"chunks_{suffix}": "chunks",
        f"contains_{suffix}": "contains"
    }

    for temp_name, final_name in collections_to_promote.items():
        if db.has_collection(temp_name):
            if db.has_collection(final_name):
                logging.info(f"Dropping old collection: {final_name}")
                db.delete_collection(final_name)
            
            logging.info(f"Renaming {temp_name} to {final_name}")
            temp_coll = db.collection(temp_name)
            temp_coll.rename(final_name)
        else:
            raise RuntimeError(f"Staging collection {temp_name} not found. Promotion failed.")

# Main logic
if __name__ == "__main__":
    if not VERSION_SUFFIX:
        logging.critical("DATA_VERSION not provided.")
        exit(1)
    
    manager = ArangoDBManager()
    promote_collections(manager, VERSION_SUFFIX)
    logging.info("Promotion successful.")

This approach provides a clean rollback path: if the new application deployment fails, another job can simply rename the old collections back into place.

Stage 5 & 6: Deploy Query Service with Canary

Now that the data is live, Spinnaker proceeds with a standard application deployment. We deploy a new version of our FastAPI-based query service. This service contains the AQL queries that perform the graph-enhanced retrieval.

Example AQL query within the Python service:

# query_service/retriever.py

# A query vector, coming from the user's question
query_embedding = [...] 

# This AQL combines approximate nearest neighbor search with graph traversal.
# Note: ArangoDB's native vector search is evolving. This is a conceptual representation.
# We might use ArangoSearch with a `COSINE_SIMILARITY` function.
aql_query = """
    LET query_vector = @query_embedding

    // Step 1: Find top N closest chunk vertices using vector similarity
    // This part would use a vector index for performance.
    LET similar_chunks = (
        FOR chunk IN chunks
            SORT V8::COSINE_SIMILARITY(chunk.embedding, query_vector) DESC
            LIMIT 10
            RETURN chunk
    )

    // Step 2: From these chunks, traverse the graph to find parent documents
    // and other related chunks within the same document.
    FOR start_chunk IN similar_chunks
        // Traverse up to the parent document, then back down to all its chunks
        FOR v, e, p IN 1..2 ANY start_chunk._id contains
            OPTIONS { uniqueVertices: "global" }
            FILTER IS_SAME_COLLECTION("chunks", v._id) OR IS_SAME_COLLECTION("documents", v._id)
            COLLECT doc_id = p.vertices[1]._id INTO context_group
            
            LET document = DOCUMENT(doc_id)
            LET context_chunks = (
                FOR chunk_in_doc IN 1..1 OUTBOUND doc_id contains
                    RETURN { text: chunk_in_doc.text, score: V8::COSINE_SIMILARITY(chunk_in_doc.embedding, query_vector) }
            )

            // Return a consolidated context block per document
            RETURN {
                document_id: doc_id,
                full_text: document.text,
                retrieved_chunks: context_chunks
            }

"""
# The application would then execute this query with the embedding as a bind parameter.

Spinnaker deploys a small percentage of traffic to this new version (the canary). It then monitors key metrics (e.g., latency, error rate) for a set period. If the metrics remain healthy, it automatically promotes the canary to 100% of traffic. If they degrade, it automatically rolls back. This automated safety net is crucial when deploying changes that have a complex dependency on an underlying data store.

The final pipeline provides a single, auditable workflow for a complex operation that was previously a multi-day, error-prone manual task. It codifies the entire process, from data ingestion to production deployment, ensuring consistency and safety.

The primary limitation of this architecture is the atomicity of the data promotion step. While renaming collections is fast, it’s not a true database transaction that spans the entire set of collections. A failure midway through the renaming script could leave the database in an inconsistent state, with some v2 collections promoted and others not. A more resilient, albeit more complex, approach would involve a blue-green deployment at the database level, where the query service is pointed to an entirely separate, replicated database instance. This, however, significantly increases infrastructure cost and operational overhead. Furthermore, the performance of the complex AQL query remains a point of concern that requires continuous monitoring and index tuning. Future iterations will focus on building automated data quality checks to replace the manual judgment step, using a “golden dataset” of queries to score the retrieval quality of the new graph before promotion, moving us closer to a fully autonomous CD pipeline for our RAG system.


  TOC