Skip to Content

Vector Retrieval Pattern

The Vector Retrieval pattern uses embedding models to convert interactions into high-dimensional vectors, enabling semantic similarity search across the agent’s memory. This approach excels at finding contextually relevant information regardless of exact keyword matches.

Overview

The Vector Retrieval pattern transforms text, images, and other data into numerical vectors that capture semantic meaning. Key components include:

  • Embedding Models: Convert raw data into dense vector representations
  • Vector Databases: Store and index embeddings for fast similarity search
  • Similarity Metrics: Compare vectors to find relevant content (cosine, dot product, L2)
  • Retrieval Strategies: Query methods and result ranking approaches

This pattern is particularly powerful for finding relevant context in large datasets where keyword matching fails to capture semantic relationships.

Architecture

import numpy as np from sentence_transformers import SentenceTransformer import faiss from typing import List, Dict, Tuple class VectorRetrievalMemory: def __init__(self, model_name="all-MiniLM-L6-v2", dimension=384): self.embedding_model = SentenceTransformer(model_name) self.dimension = dimension self.index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity self.interactions = [] self.metadata = [] def add_interaction(self, user_input: str, agent_response: str, metadata: Dict = None): # Combine input and response for embedding text = f"User: {user_input}\nAgent: {agent_response}" # Generate embedding embedding = self.embedding_model.encode([text])[0] embedding = embedding / np.linalg.norm(embedding) # Normalize for cosine similarity # Store in vector index self.index.add(embedding.reshape(1, -1).astype('float32')) # Store metadata interaction_data = { 'user_input': user_input, 'agent_response': agent_response, 'metadata': metadata or {}, 'interaction_id': len(self.interactions) } self.interactions.append(interaction_data) self.metadata.append(metadata or {}) def retrieve_relevant(self, query: str, top_k: int = 5) -> List[Dict]: if self.index.ntotal == 0: return [] # Generate query embedding query_embedding = self.embedding_model.encode([query])[0] query_embedding = query_embedding / np.linalg.norm(query_embedding) # Search for similar vectors scores, indices = self.index.search( query_embedding.reshape(1, -1).astype('float32'), min(top_k, self.index.ntotal) ) # Return ranked results results = [] for score, idx in zip(scores[0], indices[0]): if idx < len(self.interactions): result = self.interactions[idx].copy() result['similarity_score'] = float(score) results.append(result) return results

Implementation Considerations

Embedding Model Selection

Local Models

# Lightweight models for resource-constrained environments LIGHTWEIGHT_MODELS = { "all-MiniLM-L6-v2": {"dimension": 384, "size": "22MB"}, "all-distilroberta-v1": {"dimension": 768, "size": "82MB"}, "paraphrase-MiniLM-L3-v2": {"dimension": 384, "size": "61MB"} } # High-quality models for better semantic understanding HIGH_QUALITY_MODELS = { "all-mpnet-base-v2": {"dimension": 768, "size": "420MB"}, "all-roberta-large-v1": {"dimension": 1024, "size": "1.4GB"} }

API-Based Models

class OpenAIEmbeddings: def __init__(self, api_key: str): self.api_key = api_key self.dimension = 1536 # text-embedding-ada-002 def encode(self, texts: List[str]) -> np.ndarray: import openai openai.api_key = self.api_key response = openai.Embedding.create( input=texts, model="text-embedding-ada-002" ) embeddings = [item['embedding'] for item in response['data']] return np.array(embeddings)

Vector Database Options

class FAISSVectorStore: def __init__(self, dimension: int, use_gpu: bool = False): if use_gpu: res = faiss.StandardGpuResources() self.index = faiss.index_cpu_to_gpu(res, 0, faiss.IndexFlatIP(dimension)) else: self.index = faiss.IndexFlatIP(dimension) def add_vectors(self, vectors: np.ndarray): self.index.add(vectors.astype('float32')) def search(self, query: np.ndarray, k: int): return self.index.search(query.astype('float32'), k)

Pinecone (Cloud Vector Database)

class PineconeVectorStore: def __init__(self, api_key: str, environment: str, index_name: str): import pinecone pinecone.init(api_key=api_key, environment=environment) self.index = pinecone.Index(index_name) def upsert(self, vectors: List[Tuple[str, List[float], Dict]]): """Upsert vectors with IDs and metadata""" self.index.upsert(vectors=vectors) def query(self, vector: List[float], top_k: int, filter: Dict = None): return self.index.query( vector=vector, top_k=top_k, filter=filter, include_metadata=True )

Chroma (Open Source Vector Database)

class ChromaVectorStore: def __init__(self, collection_name: str): import chromadb self.client = chromadb.Client() self.collection = self.client.get_or_create_collection( name=collection_name ) def add_documents(self, documents: List[str], metadatas: List[Dict], ids: List[str]): self.collection.add( documents=documents, metadatas=metadatas, ids=ids ) def query(self, query_text: str, n_results: int = 5): return self.collection.query( query_texts=[query_text], n_results=n_results )

Advanced Retrieval Strategies

Hybrid Search (Vector + Keyword)

class HybridRetrievalMemory: def __init__(self): self.vector_memory = VectorRetrievalMemory() self.keyword_index = {} # Simple keyword index def add_interaction(self, user_input: str, agent_response: str): # Add to vector store self.vector_memory.add_interaction(user_input, agent_response) # Add to keyword index interaction_id = len(self.vector_memory.interactions) - 1 words = (user_input + " " + agent_response).lower().split() for word in words: if word not in self.keyword_index: self.keyword_index[word] = [] self.keyword_index[word].append(interaction_id) def hybrid_retrieve(self, query: str, vector_weight: float = 0.7): # Get vector results vector_results = self.vector_memory.retrieve_relevant(query, top_k=10) # Get keyword results query_words = query.lower().split() keyword_scores = {} for word in query_words: if word in self.keyword_index: for interaction_id in self.keyword_index[word]: keyword_scores[interaction_id] = keyword_scores.get(interaction_id, 0) + 1 # Combine scores combined_results = [] for result in vector_results: interaction_id = result['interaction_id'] vector_score = result['similarity_score'] keyword_score = keyword_scores.get(interaction_id, 0) combined_score = (vector_weight * vector_score + (1 - vector_weight) * keyword_score) result['combined_score'] = combined_score combined_results.append(result) # Re-rank by combined score combined_results.sort(key=lambda x: x['combined_score'], reverse=True) return combined_results

Multi-Vector Retrieval

class MultiVectorMemory: def __init__(self): self.input_embeddings = VectorRetrievalMemory() self.response_embeddings = VectorRetrievalMemory() self.context_embeddings = VectorRetrievalMemory() def add_interaction(self, user_input: str, agent_response: str, context: str = None): interaction_id = len(self.input_embeddings.interactions) # Store separate embeddings for different aspects self.input_embeddings.add_interaction(user_input, "", {"type": "input"}) self.response_embeddings.add_interaction("", agent_response, {"type": "response"}) if context: self.context_embeddings.add_interaction(context, "", {"type": "context"}) def retrieve_by_aspect(self, query: str, aspect: str = "input", top_k: int = 5): if aspect == "input": return self.input_embeddings.retrieve_relevant(query, top_k) elif aspect == "response": return self.response_embeddings.retrieve_relevant(query, top_k) elif aspect == "context": return self.context_embeddings.retrieve_relevant(query, top_k) else: raise ValueError(f"Unknown aspect: {aspect}")

Performance Characteristics

Pros

  • Semantic Understanding: Finds conceptually similar content
  • Language Agnostic: Works across different phrasings
  • Scalable Search: Efficient retrieval from large datasets
  • Fuzzy Matching: Handles typos and variations gracefully
  • Cross-Modal: Can work with text, images, audio (with appropriate models)

Cons

  • Computational Overhead: Embedding generation and storage costs
  • Model Dependency: Quality depends on embedding model choice
  • Cold Start: Requires sufficient data for meaningful similarities
  • Dimensionality Curse: High-dimensional spaces can have unintuitive properties
  • Embedding Drift: Model updates can invalidate stored embeddings

Performance Metrics

# Typical performance characteristics EMBEDDING_TIME = "O(n)" # Linear in text length INDEX_SEARCH = "O(log n)" # With approximate algorithms STORAGE_OVERHEAD = "4-8x" # Compared to raw text storage ACCURACY = "0.8-0.95" # Semantic similarity tasks LATENCY = "10-100ms" # Per query, depending on index size

When to Use

Ideal Scenarios

  • Large knowledge bases with diverse content
  • Conversational agents needing contextual understanding
  • FAQ systems where questions vary in phrasing
  • Content recommendation based on user interests
  • Multi-language applications with semantic search needs
  • Exact match requirements where precision is critical
  • Small datasets where simple keyword search suffices
  • Real-time systems with strict latency constraints
  • Resource-constrained environments without GPU/sufficient RAM

Implementation Examples

Production-Ready Vector Memory

class ProductionVectorMemory: def __init__(self, config): self.embedding_model = self._load_embedding_model(config.model_name) self.vector_store = self._init_vector_store(config.vector_db_config) self.metadata_store = self._init_metadata_store(config.metadata_db_config) self.cache = LRUCache(maxsize=config.cache_size) def add_interaction(self, interaction_data): # Generate embedding with error handling try: text = self._format_for_embedding(interaction_data) embedding = self.embedding_model.encode([text])[0] except Exception as e: logger.error(f"Embedding generation failed: {e}") return False # Store with transaction safety interaction_id = str(uuid.uuid4()) try: # Store vector self.vector_store.upsert([ (interaction_id, embedding.tolist(), {"timestamp": time.time()}) ]) # Store metadata self.metadata_store.put(interaction_id, interaction_data) # Update cache self.cache[interaction_id] = interaction_data return True except Exception as e: logger.error(f"Storage failed: {e}") return False def retrieve_with_filters(self, query: str, filters: Dict = None, top_k: int = 5) -> List[Dict]: # Check cache first cache_key = f"{query}:{filters}:{top_k}" if cache_key in self.cache: return self.cache[cache_key] # Generate query embedding query_embedding = self.embedding_model.encode([query])[0] # Search with filters results = self.vector_store.query( vector=query_embedding.tolist(), top_k=top_k, filter=filters ) # Enrich with metadata enriched_results = [] for match in results['matches']: metadata = self.metadata_store.get(match['id']) if metadata: result = metadata.copy() result['similarity_score'] = match['score'] enriched_results.append(result) # Cache results self.cache[cache_key] = enriched_results return enriched_results

Multi-Modal Vector Memory

class MultiModalVectorMemory: def __init__(self): self.text_embedder = SentenceTransformer('all-MiniLM-L6-v2') self.image_embedder = SentenceTransformer('clip-ViT-B-32') self.text_index = faiss.IndexFlatIP(384) self.image_index = faiss.IndexFlatIP(512) def add_text_interaction(self, text: str, metadata: Dict): embedding = self.text_embedder.encode([text])[0] embedding = embedding / np.linalg.norm(embedding) self.text_index.add(embedding.reshape(1, -1).astype('float32')) def add_image_interaction(self, image_path: str, description: str): from PIL import Image image = Image.open(image_path) embedding = self.image_embedder.encode([image])[0] embedding = embedding / np.linalg.norm(embedding) self.image_index.add(embedding.reshape(1, -1).astype('float32')) def search_cross_modal(self, text_query: str, search_images: bool = True): # Search text with text query text_embedding = self.text_embedder.encode([text_query])[0] text_embedding = text_embedding / np.linalg.norm(text_embedding) text_scores, text_indices = self.text_index.search( text_embedding.reshape(1, -1).astype('float32'), 5 ) results = {'text_results': list(zip(text_scores[0], text_indices[0]))} # Cross-modal search: text query -> image results if search_images: # Use CLIP's shared embedding space image_embedding = self.image_embedder.encode([text_query])[0] image_embedding = image_embedding / np.linalg.norm(image_embedding) image_scores, image_indices = self.image_index.search( image_embedding.reshape(1, -1).astype('float32'), 5 ) results['image_results'] = list(zip(image_scores[0], image_indices[0])) return results

Contextual Re-ranking

class ContextualVectorMemory: def __init__(self): self.base_memory = VectorRetrievalMemory() self.conversation_context = deque(maxlen=10) def add_interaction(self, user_input: str, agent_response: str): self.base_memory.add_interaction(user_input, agent_response) self.conversation_context.append({ 'user_input': user_input, 'agent_response': agent_response }) def contextual_retrieve(self, query: str, top_k: int = 10): # Get initial candidates candidates = self.base_memory.retrieve_relevant(query, top_k * 2) # Re-rank based on conversation context context_text = " ".join([ f"{turn['user_input']} {turn['agent_response']}" for turn in self.conversation_context ]) reranked_results = [] for candidate in candidates: # Calculate contextual relevance candidate_text = f"{candidate['user_input']} {candidate['agent_response']}" # Combine original similarity with contextual similarity context_embedding = self.base_memory.embedding_model.encode([context_text])[0] candidate_embedding = self.base_memory.embedding_model.encode([candidate_text])[0] context_similarity = np.dot(context_embedding, candidate_embedding) / ( np.linalg.norm(context_embedding) * np.linalg.norm(candidate_embedding) ) # Weighted combination final_score = 0.7 * candidate['similarity_score'] + 0.3 * context_similarity candidate['contextual_score'] = final_score reranked_results.append(candidate) # Sort by contextual score reranked_results.sort(key=lambda x: x['contextual_score'], reverse=True) return reranked_results[:top_k]

Best Practices

Embedding Optimization

def optimize_embeddings(texts: List[str], max_length: int = 512): """Optimize texts for embedding generation""" optimized = [] for text in texts: # Truncate long texts if len(text.split()) > max_length: words = text.split()[:max_length] text = " ".join(words) + "..." # Clean and normalize text = text.strip().replace('\n', ' ').replace('\t', ' ') # Remove extra whitespace import re text = re.sub(r'\s+', ' ', text) optimized.append(text) return optimized

Index Management

class ManagedVectorIndex: def __init__(self, dimension: int, max_size: int = 1000000): self.dimension = dimension self.max_size = max_size self.index = faiss.IndexFlatIP(dimension) self.metadata = [] def add_with_overflow_handling(self, vectors: np.ndarray, metadata: List[Dict]): if self.index.ntotal + len(vectors) > self.max_size: # Implement overflow strategy self._handle_overflow(len(vectors)) self.index.add(vectors.astype('float32')) self.metadata.extend(metadata) def _handle_overflow(self, new_count: int): # Strategy 1: Remove oldest entries remove_count = new_count self.metadata = self.metadata[remove_count:] # Rebuild index (expensive but necessary for FAISS) remaining_vectors = self._extract_vectors(remove_count) self.index = faiss.IndexFlatIP(self.dimension) self.index.add(remaining_vectors) def save_index(self, path: str): faiss.write_index(self.index, f"{path}/vector.index") import pickle with open(f"{path}/metadata.pkl", 'wb') as f: pickle.dump(self.metadata, f) def load_index(self, path: str): self.index = faiss.read_index(f"{path}/vector.index") import pickle with open(f"{path}/metadata.pkl", 'rb') as f: self.metadata = pickle.load(f)

Quality Monitoring

class VectorQualityMonitor: def __init__(self, vector_memory): self.memory = vector_memory self.quality_metrics = { 'embedding_variance': [], 'similarity_distributions': [], 'retrieval_accuracy': [] } def monitor_embedding_quality(self, new_embeddings: np.ndarray): # Check for embedding degradation variance = np.var(new_embeddings, axis=1).mean() self.quality_metrics['embedding_variance'].append(variance) # Check for clustering mean_similarity = np.mean(np.dot(new_embeddings, new_embeddings.T)) self.quality_metrics['similarity_distributions'].append(mean_similarity) # Alert if quality drops if variance < 0.01: # Low variance indicates poor embeddings logger.warning("Low embedding variance detected") def evaluate_retrieval_quality(self, test_queries: List[str], expected_results: List[List[str]]): """Evaluate retrieval quality with ground truth""" total_precision = 0 total_recall = 0 for query, expected in zip(test_queries, expected_results): retrieved = self.memory.retrieve_relevant(query, top_k=10) retrieved_ids = [r['interaction_id'] for r in retrieved] # Calculate precision and recall relevant_retrieved = set(retrieved_ids) & set(expected) precision = len(relevant_retrieved) / len(retrieved_ids) if retrieved_ids else 0 recall = len(relevant_retrieved) / len(expected) if expected else 0 total_precision += precision total_recall += recall avg_precision = total_precision / len(test_queries) avg_recall = total_recall / len(test_queries) self.quality_metrics['retrieval_accuracy'].append({ 'precision': avg_precision, 'recall': avg_recall, 'f1': 2 * (avg_precision * avg_recall) / (avg_precision + avg_recall) })

Integration with Other Patterns

Vector + Sliding Window Hybrid

class VectorSlidingHybrid: def __init__(self, window_size: int = 50, vector_k: int = 10): self.sliding_window = SlidingWindowMemory(max_size=window_size) self.vector_memory = VectorRetrievalMemory() self.vector_k = vector_k def add_interaction(self, user_input: str, agent_response: str): # Add to both memories self.sliding_window.add_interaction(user_input, agent_response) self.vector_memory.add_interaction(user_input, agent_response) def retrieve_context(self, query: str): # Get recent context from sliding window recent_context = self.sliding_window.get_context(limit=10) # Get semantically relevant context from vector memory relevant_context = self.vector_memory.retrieve_relevant(query, self.vector_k) # Combine and deduplicate combined_context = self._merge_contexts(recent_context, relevant_context) return combined_context def _merge_contexts(self, recent: List[Dict], relevant: List[Dict]): # Simple deduplication and merging strategy seen_interactions = set() merged = [] # Prioritize recent context for interaction in recent: key = (interaction['user_input'], interaction['agent_response']) if key not in seen_interactions: interaction['source'] = 'recent' merged.append(interaction) seen_interactions.add(key) # Add relevant context not already included for interaction in relevant: key = (interaction['user_input'], interaction['agent_response']) if key not in seen_interactions: interaction['source'] = 'semantic' merged.append(interaction) seen_interactions.add(key) return merged

Vector + Graph Memory Integration

class VectorGraphMemory: def __init__(self): self.vector_memory = VectorRetrievalMemory() self.graph_memory = GraphMemory() # Assume this exists def add_interaction_with_entities(self, user_input: str, agent_response: str): # Extract entities and relationships entities = self._extract_entities(user_input, agent_response) relationships = self._extract_relationships(user_input, agent_response) # Add to vector memory self.vector_memory.add_interaction(user_input, agent_response) # Add to graph memory for entity in entities: self.graph_memory.add_entity(entity) for rel in relationships: self.graph_memory.add_relationship(rel['source'], rel['target'], rel['type']) def retrieve_with_graph_expansion(self, query: str, expand_hops: int = 2): # Get initial vector results vector_results = self.vector_memory.retrieve_relevant(query, top_k=5) # Extract entities from results result_entities = [] for result in vector_results: entities = self._extract_entities(result['user_input'], result['agent_response']) result_entities.extend(entities) # Expand through graph relationships expanded_entities = self.graph_memory.expand_entities(result_entities, expand_hops) # Find interactions involving expanded entities expanded_results = [] for entity in expanded_entities: entity_interactions = self.vector_memory.search_by_entity(entity) expanded_results.extend(entity_interactions) # Combine and rank results return self._rank_combined_results(vector_results, expanded_results)

Testing and Validation

Unit Tests

import pytest import numpy as np def test_vector_memory_basic_operations(): memory = VectorRetrievalMemory() # Test adding interactions memory.add_interaction("What is Python?", "Python is a programming language.") memory.add_interaction("How do I write a loop?", "Use for or while statements.") assert len(memory.interactions) == 2 assert memory.index.ntotal == 2 def test_similarity_retrieval(): memory = VectorRetrievalMemory() # Add related interactions memory.add_interaction("What is machine learning?", "ML is AI subset.") memory.add_interaction("Explain neural networks", "Networks mimic brain.") memory.add_interaction("What's for dinner?", "I suggest pasta.") # Query for ML-related content results = memory.retrieve_relevant("Tell me about AI", top_k=2) # Should return ML and neural network interactions assert len(results) == 2 assert results[0]['similarity_score'] > results[1]['similarity_score'] def test_embedding_normalization(): memory = VectorRetrievalMemory() text = "Test interaction" embedding = memory.embedding_model.encode([text])[0] normalized = embedding / np.linalg.norm(embedding) # Check normalization assert abs(np.linalg.norm(normalized) - 1.0) < 1e-6 def test_empty_index_handling(): memory = VectorRetrievalMemory() # Should handle empty index gracefully results = memory.retrieve_relevant("Any query", top_k=5) assert results == []

Performance Tests

def test_vector_memory_performance(): import time memory = VectorRetrievalMemory() # Test insertion performance start_time = time.time() for i in range(1000): memory.add_interaction(f"Question {i}", f"Answer {i}") insertion_time = time.time() - start_time print(f"Insertion time: {insertion_time:.2f}s for 1000 interactions") print(f"Rate: {1000/insertion_time:.0f} interactions/second") # Test retrieval performance start_time = time.time() for i in range(100): results = memory.retrieve_relevant(f"Test query {i}", top_k=10) retrieval_time = time.time() - start_time print(f"Retrieval time: {retrieval_time:.2f}s for 100 queries") print(f"Rate: {100/retrieval_time:.0f} queries/second") def test_memory_usage(): import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss memory = VectorRetrievalMemory() # Add many interactions for i in range(10000): memory.add_interaction(f"Input {i}", f"Output {i}") final_memory = process.memory_info().rss memory_growth = (final_memory - initial_memory) / (1024 * 1024) # MB print(f"Memory growth: {memory_growth:.2f} MB for 10k interactions") print(f"Memory per interaction: {memory_growth / 10000 * 1024:.2f} KB")

Migration and Scaling

Migration from Other Patterns

def migrate_to_vector_memory(source_memory, embedding_model_name: str): """Migrate from any memory pattern to vector memory""" vector_memory = VectorRetrievalMemory(model_name=embedding_model_name) # Get all interactions from source if hasattr(source_memory, 'get_all_interactions'): interactions = source_memory.get_all_interactions() else: # Handle different source types interactions = source_memory.interactions # Convert to vector memory for interaction in interactions: vector_memory.add_interaction( interaction['user_input'], interaction['agent_response'], interaction.get('metadata', {}) ) return vector_memory

Distributed Vector Memory

class DistributedVectorMemory: def __init__(self, shard_count: int = 4): self.shards = [VectorRetrievalMemory() for _ in range(shard_count)] self.shard_count = shard_count def add_interaction(self, user_input: str, agent_response: str, user_id: str = None): # Shard based on user ID or hash of input shard_key = user_id or user_input shard_id = hash(shard_key) % self.shard_count self.shards[shard_id].add_interaction(user_input, agent_response) def retrieve_relevant(self, query: str, top_k: int = 5, user_id: str = None): if user_id: # Query specific shard shard_id = hash(user_id) % self.shard_count return self.shards[shard_id].retrieve_relevant(query, top_k) else: # Query all shards and merge results all_results = [] for shard in self.shards: shard_results = shard.retrieve_relevant(query, top_k) all_results.extend(shard_results) # Re-rank and return top-k all_results.sort(key=lambda x: x['similarity_score'], reverse=True) return all_results[:top_k]

Next Steps

  1. Choose appropriate embedding model for your domain
  2. Select vector database based on scale and requirements
  3. Implement similarity search with your data
  4. Evaluate retrieval quality with test queries
  5. Optimize embedding and indexing parameters
  6. Plan for scaling and production deployment
  7. Consider hybrid approaches with other memory patterns

The Vector Retrieval pattern provides powerful semantic search capabilities that can dramatically improve the relevance of retrieved context in agent applications.