Skip to Content
DocumentationBenchmarksRetrieval Accuracy

Retrieval Accuracy Benchmarks

Measuring and optimizing the accuracy of memory retrieval systems is critical for building reliable agent memory. This guide provides comprehensive methodologies for benchmarking retrieval accuracy across different memory architectures.

Overview

Retrieval accuracy measures how well your memory system finds and returns relevant information when queried. Poor accuracy leads to hallucinations, missed context, and degraded agent performance.

Core Metrics

Primary Accuracy Metrics

Precision at K (P@K)

def precision_at_k(retrieved_items, relevant_items, k): """ Calculate precision at rank k Args: retrieved_items: List of retrieved item IDs in rank order relevant_items: Set of ground truth relevant item IDs k: Cut-off rank Returns: float: Precision score between 0 and 1 """ if k == 0 or len(retrieved_items) == 0: return 0.0 top_k = retrieved_items[:k] relevant_in_top_k = len([item for item in top_k if item in relevant_items]) return relevant_in_top_k / min(k, len(retrieved_items)) # Example usage retrieved = ["doc1", "doc2", "doc3", "doc4", "doc5"] relevant = {"doc1", "doc3", "doc7", "doc9"} p_at_3 = precision_at_k(retrieved, relevant, 3) print(f"P@3: {p_at_3}") # Output: 0.667

Recall at K (R@K)

def recall_at_k(retrieved_items, relevant_items, k): """ Calculate recall at rank k Returns: float: Recall score between 0 and 1 """ if len(relevant_items) == 0: return 0.0 top_k = retrieved_items[:k] relevant_in_top_k = len([item for item in top_k if item in relevant_items]) return relevant_in_top_k / len(relevant_items) # Benchmark different k values k_values = [1, 3, 5, 10, 20] for k in k_values: p_k = precision_at_k(retrieved, relevant, k) r_k = recall_at_k(retrieved, relevant, k) print(f"P@{k}: {p_k:.3f}, R@{k}: {r_k:.3f}")

Mean Average Precision (MAP)

def average_precision(retrieved_items, relevant_items): """Calculate Average Precision for a single query""" if len(relevant_items) == 0: return 0.0 precision_sum = 0.0 relevant_count = 0 for i, item in enumerate(retrieved_items): if item in relevant_items: relevant_count += 1 precision_at_i = relevant_count / (i + 1) precision_sum += precision_at_i return precision_sum / len(relevant_items) def mean_average_precision(queries_results): """Calculate MAP across multiple queries""" total_ap = 0.0 for retrieved, relevant in queries_results: ap = average_precision(retrieved, relevant) total_ap += ap return total_ap / len(queries_results) # Example with multiple queries queries_results = [ (["doc1", "doc2", "doc3"], {"doc1", "doc3"}), (["doc4", "doc5", "doc6"], {"doc5", "doc7"}), (["doc8", "doc9", "doc10"], {"doc8", "doc9", "doc10"}) ] map_score = mean_average_precision(queries_results) print(f"MAP: {map_score:.3f}")

Normalized Discounted Cumulative Gain (NDCG)

import math def dcg_at_k(scores, k): """Calculate DCG at rank k""" dcg = 0.0 for i in range(min(k, len(scores))): dcg += (2 ** scores[i] - 1) / math.log2(i + 2) return dcg def ndcg_at_k(retrieved_items, relevance_scores, k): """ Calculate NDCG at rank k Args: retrieved_items: List of retrieved item IDs relevance_scores: Dict mapping item IDs to relevance scores (0-3) k: Cut-off rank """ # Get relevance scores for retrieved items retrieved_scores = [relevance_scores.get(item, 0) for item in retrieved_items[:k]] # Calculate DCG for retrieved items dcg = dcg_at_k(retrieved_scores, k) # Calculate IDCG (perfect ordering) all_scores = list(relevance_scores.values()) all_scores.sort(reverse=True) idcg = dcg_at_k(all_scores, k) return dcg / idcg if idcg > 0 else 0.0 # Example usage relevance_scores = { "doc1": 3, "doc2": 1, "doc3": 2, "doc4": 0, "doc5": 3 } retrieved = ["doc1", "doc2", "doc3", "doc4", "doc5"] ndcg_5 = ndcg_at_k(retrieved, relevance_scores, 5) print(f"NDCG@5: {ndcg_5:.3f}")

Benchmark Test Suites

Standard Datasets

MS MARCO Passage Retrieval

def load_msmarco_benchmark(): """Load MS MARCO passage retrieval benchmark""" import datasets # Load official MS MARCO dataset dataset = datasets.load_dataset("ms_marco", "v1.1") queries = {} relevance_judgments = {} for example in dataset['train']: query_id = example['query_id'] query_text = example['query'] passages = example['passages'] queries[query_id] = query_text # Extract relevant passages relevant_passages = [ p['passage_id'] for p in passages if p['is_selected'] == 1 ] relevance_judgments[query_id] = set(relevant_passages) return queries, relevance_judgments # Benchmark your retrieval system queries, ground_truth = load_msmarco_benchmark() def benchmark_retrieval_system(retriever, queries, ground_truth, k=10): """Benchmark a retrieval system""" results = [] for query_id, query_text in queries.items(): if query_id not in ground_truth: continue # Get retrieval results retrieved_docs = retriever.retrieve(query_text, top_k=k) retrieved_ids = [doc.id for doc in retrieved_docs] # Calculate metrics relevant_ids = ground_truth[query_id] p_at_k = precision_at_k(retrieved_ids, relevant_ids, k) r_at_k = recall_at_k(retrieved_ids, relevant_ids, k) ap = average_precision(retrieved_ids, relevant_ids) results.append({ 'query_id': query_id, 'precision_at_k': p_at_k, 'recall_at_k': r_at_k, 'average_precision': ap }) # Aggregate results avg_precision = sum(r['precision_at_k'] for r in results) / len(results) avg_recall = sum(r['recall_at_k'] for r in results) / len(results) map_score = sum(r['average_precision'] for r in results) / len(results) return { 'precision_at_k': avg_precision, 'recall_at_k': avg_recall, 'mean_average_precision': map_score, 'num_queries': len(results) }

BEIR Benchmark Suite

def run_beir_benchmark(retriever, dataset_name="scifact"): """ Run BEIR benchmark on your retrieval system Args: retriever: Your retrieval system dataset_name: BEIR dataset (scifact, nfcorpus, trec-covid, etc.) """ from beir import util, LoggingHandler from beir.datasets.data_loader import GenericDataLoader from beir.retrieval.evaluation import EvaluateRetrieval # Download and load dataset dataset_path = util.download_and_unzip(dataset_name, "datasets") corpus, queries, qrels = GenericDataLoader(dataset_path).load(split="test") # Create retriever wrapper class RetrieverWrapper: def __init__(self, retriever): self.retriever = retriever def search(self, corpus, queries, top_k, *args, **kwargs): results = {} for query_id, query_text in queries.items(): retrieved = self.retriever.retrieve(query_text, top_k=top_k) results[query_id] = { doc.id: doc.score for doc in retrieved } return results # Run evaluation evaluator = EvaluateRetrieval(RetrieverWrapper(retriever)) results = evaluator.evaluate(corpus, queries, qrels) return results # Example usage # beir_results = run_beir_benchmark(your_retriever, "scifact") # print(f"NDCG@10: {beir_results['NDCG@10']:.3f}")

Custom Benchmark Creation

Domain-Specific Test Sets

def create_domain_benchmark(domain_documents, query_templates): """ Create domain-specific benchmark from your documents Args: domain_documents: List of documents in your domain query_templates: Templates for generating queries """ import random from sentence_transformers import SentenceTransformer model = SentenceTransformer('all-MiniLM-L6-v2') queries = [] relevance_judgments = {} for i, doc in enumerate(domain_documents): # Generate queries from document content for template in query_templates: query = template.format(doc=doc) query_id = f"q_{i}_{template}" queries.append((query_id, query)) # Find relevant documents using semantic similarity query_embedding = model.encode(query) doc_embeddings = model.encode([d.content for d in domain_documents]) similarities = model.similarity(query_embedding, doc_embeddings)[0] # Mark documents with similarity > threshold as relevant relevant_docs = [] for j, sim in enumerate(similarities): if sim > 0.7: # Adjust threshold as needed relevant_docs.append(domain_documents[j].id) relevance_judgments[query_id] = set(relevant_docs) return queries, relevance_judgments # Example query templates query_templates = [ "What does this document say about {topic}?", "How is {concept} explained in the text?", "What are the key points regarding {subject}?", "Can you summarize the information about {theme}?" ]

Production Monitoring

Real-Time Accuracy Tracking

class AccuracyMonitor: def __init__(self, window_size=1000): self.window_size = window_size self.recent_queries = [] def log_query_result(self, query, retrieved_docs, user_feedback): """Log query result with user feedback for accuracy tracking""" self.recent_queries.append({ 'timestamp': time.time(), 'query': query, 'retrieved': retrieved_docs, 'relevant_feedback': user_feedback # User clicks, ratings, etc. }) # Keep only recent queries if len(self.recent_queries) > self.window_size: self.recent_queries.pop(0) def calculate_online_metrics(self): """Calculate accuracy metrics from recent queries""" if not self.recent_queries: return {} total_precision = 0 total_recall = 0 total_queries = 0 for query_data in self.recent_queries: retrieved = query_data['retrieved'] relevant = query_data['relevant_feedback'] if not relevant: # Skip queries without feedback continue p_at_k = precision_at_k(retrieved, relevant, len(retrieved)) r_at_k = recall_at_k(retrieved, relevant, len(retrieved)) total_precision += p_at_k total_recall += r_at_k total_queries += 1 if total_queries == 0: return {} return { 'avg_precision': total_precision / total_queries, 'avg_recall': total_recall / total_queries, 'num_queries_with_feedback': total_queries } # Usage in production monitor = AccuracyMonitor() # Log each retrieval with user feedback monitor.log_query_result( query="How to implement RAG?", retrieved_docs=["doc1", "doc2", "doc3"], user_feedback={"doc1", "doc3"} # User clicked on these ) # Get current accuracy metrics metrics = monitor.calculate_online_metrics() print(f"Current Precision: {metrics.get('avg_precision', 0):.3f}")

A/B Testing Framework

class RetrievalABTest: def __init__(self, system_a, system_b, traffic_split=0.5): self.system_a = system_a self.system_b = system_b self.traffic_split = traffic_split self.results_a = [] self.results_b = [] def route_query(self, query, ground_truth_relevant=None): """Route query to system A or B based on traffic split""" import random use_system_a = random.random() < self.traffic_split if use_system_a: results = self.system_a.retrieve(query) if ground_truth_relevant: metrics = self._calculate_metrics(results, ground_truth_relevant) self.results_a.append(metrics) return results, 'A' else: results = self.system_b.retrieve(query) if ground_truth_relevant: metrics = self._calculate_metrics(results, ground_truth_relevant) self.results_b.append(metrics) return results, 'B' def _calculate_metrics(self, retrieved, relevant): retrieved_ids = [doc.id for doc in retrieved] return { 'precision_at_5': precision_at_k(retrieved_ids, relevant, 5), 'recall_at_5': recall_at_k(retrieved_ids, relevant, 5), 'average_precision': average_precision(retrieved_ids, relevant) } def get_statistical_significance(self, metric='precision_at_5'): """Check if difference between systems is statistically significant""" from scipy import stats if not self.results_a or not self.results_b: return None values_a = [r[metric] for r in self.results_a] values_b = [r[metric] for r in self.results_b] # Perform t-test t_stat, p_value = stats.ttest_ind(values_a, values_b) return { 'metric': metric, 'system_a_mean': sum(values_a) / len(values_a), 'system_b_mean': sum(values_b) / len(values_b), 'p_value': p_value, 'significant': p_value < 0.05, 't_statistic': t_stat } # Example usage ab_test = RetrievalABTest(old_retriever, new_retriever) # Run test for query, relevant in test_queries: results, system = ab_test.route_query(query, relevant) # Check results significance = ab_test.get_statistical_significance('precision_at_5') print(f"System A Precision: {significance['system_a_mean']:.3f}") print(f"System B Precision: {significance['system_b_mean']:.3f}") print(f"Statistically Significant: {significance['significant']}")

Advanced Evaluation Techniques

Semantic Similarity Evaluation

def semantic_accuracy_evaluation(queries, retrieved_docs, ground_truth, similarity_model): """ Evaluate retrieval accuracy using semantic similarity instead of exact matching """ from sentence_transformers import SentenceTransformer, util if similarity_model is None: similarity_model = SentenceTransformer('all-MiniLM-L6-v2') semantic_scores = [] for query, retrieved, relevant in zip(queries, retrieved_docs, ground_truth): if not retrieved or not relevant: semantic_scores.append(0.0) continue # Get embeddings query_embedding = similarity_model.encode(query, convert_to_tensor=True) retrieved_embeddings = similarity_model.encode(retrieved, convert_to_tensor=True) relevant_embeddings = similarity_model.encode(relevant, convert_to_tensor=True) # Calculate semantic relevance scores retrieved_relevance = util.pytorch_cos_sim(query_embedding, retrieved_embeddings)[0] relevant_relevance = util.pytorch_cos_sim(query_embedding, relevant_embeddings)[0] # Compare against best possible relevance max_possible_score = relevant_relevance.max().item() actual_score = retrieved_relevance.mean().item() semantic_accuracy = actual_score / max_possible_score if max_possible_score > 0 else 0.0 semantic_scores.append(min(semantic_accuracy, 1.0)) # Cap at 1.0 return { 'semantic_accuracy_mean': sum(semantic_scores) / len(semantic_scores), 'semantic_accuracy_scores': semantic_scores } # Example usage queries = ["How to implement neural networks?", "What is machine learning?"] retrieved = [["Neural network implementation guide", "Deep learning basics"], ["ML fundamentals", "AI overview"]] ground_truth = [["Comprehensive neural network tutorial"], ["Machine learning introduction"]] semantic_scores = semantic_accuracy_evaluation(queries, retrieved, ground_truth, None) print(f"Semantic Accuracy: {semantic_scores['semantic_accuracy_mean']:.3f}")

Error Analysis Framework

class RetrievalErrorAnalyzer: def __init__(self): self.error_categories = { 'query_understanding': [], 'index_coverage': [], 'ranking_quality': [], 'semantic_drift': [] } def analyze_failures(self, query, retrieved_docs, expected_docs, similarity_model=None): """Categorize and analyze retrieval failures""" if similarity_model is None: from sentence_transformers import SentenceTransformer similarity_model = SentenceTransformer('all-MiniLM-L6-v2') retrieved_ids = set(doc.id for doc in retrieved_docs) expected_ids = set(expected_docs) # Missing relevant documents missing_docs = expected_ids - retrieved_ids if missing_docs: # Analyze why documents were missed for doc_id in missing_docs: doc_content = self._get_doc_content(doc_id) # Check if query terms appear in document query_tokens = set(query.lower().split()) doc_tokens = set(doc_content.lower().split()) if not query_tokens.intersection(doc_tokens): self.error_categories['query_understanding'].append({ 'query': query, 'doc_id': doc_id, 'issue': 'No lexical overlap' }) else: # Check semantic similarity query_emb = similarity_model.encode(query) doc_emb = similarity_model.encode(doc_content) similarity = similarity_model.similarity(query_emb, doc_emb)[0][0] if similarity < 0.3: self.error_categories['semantic_drift'].append({ 'query': query, 'doc_id': doc_id, 'similarity': float(similarity), 'issue': 'Low semantic similarity' }) else: self.error_categories['ranking_quality'].append({ 'query': query, 'doc_id': doc_id, 'similarity': float(similarity), 'issue': 'Good match ranked too low' }) # False positives false_positives = retrieved_ids - expected_ids for doc in retrieved_docs: if doc.id in false_positives: self.error_categories['ranking_quality'].append({ 'query': query, 'doc_id': doc.id, 'rank': doc.rank, 'score': doc.score, 'issue': 'Irrelevant document ranked high' }) def get_error_summary(self): """Get summary of error patterns""" summary = {} for category, errors in self.error_categories.items(): summary[category] = { 'count': len(errors), 'percentage': len(errors) / sum(len(e) for e in self.error_categories.values()) * 100 } return summary def _get_doc_content(self, doc_id): # Implement document lookup # This should connect to your document store pass # Usage analyzer = RetrievalErrorAnalyzer() for query, retrieved, expected in failed_queries: analyzer.analyze_failures(query, retrieved, expected) error_summary = analyzer.get_error_summary() print("Error Distribution:") for category, stats in error_summary.items(): print(f"{category}: {stats['count']} ({stats['percentage']:.1f}%)")

Best Practices

Evaluation Frequency

  • Development: Run full accuracy benchmarks after every major change
  • Staging: Daily automated accuracy tests with representative query sets
  • Production: Continuous monitoring with user feedback integration

Metric Selection Guide

  • P@K and R@K: Good for systems where rank order matters
  • MAP: Best for comparing overall system quality
  • NDCG: Use when you have graded relevance judgments (0-3 scale)
  • Semantic Accuracy: Essential for embedding-based systems

Common Pitfalls

  1. Dataset Leakage: Ensure test queries don’t appear in training data
  2. Temporal Bias: Test on recent data, not just historical benchmarks
  3. Query Distribution: Match test queries to production query patterns
  4. Relevance Drift: Regularly update ground truth judgments

This comprehensive guide provides the foundation for measuring and improving retrieval accuracy in agent memory systems. Regular benchmarking with these methodologies ensures your system maintains high performance as it scales.