Skip to Content
DocumentationBenchmarksStorage Performance

Storage Performance Benchmarks

Optimizing storage performance is crucial for scalable agent memory systems. This guide provides comprehensive benchmarking methodologies for measuring and improving storage throughput, latency, and efficiency across different storage backends.

Overview

Storage performance directly impacts agent response times, system scalability, and operational costs. Poor storage performance leads to query timeouts, degraded user experience, and infrastructure scaling challenges.

Core Performance Metrics

Throughput Metrics

Write Throughput

import time import threading from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import numpy as np class StorageThroughputBenchmark: def __init__(self, storage_backend): self.storage = storage_backend self.results = [] def benchmark_write_throughput(self, documents, num_threads=10, duration_seconds=60): """ Benchmark write throughput with concurrent operations Args: documents: List of documents to write num_threads: Number of concurrent write threads duration_seconds: Duration to run the benchmark """ start_time = time.time() write_count = 0 write_times = [] errors = 0 def write_worker(doc_batch): nonlocal write_count, errors for doc in doc_batch: try: write_start = time.time() self.storage.store_document(doc) write_end = time.time() write_times.append(write_end - write_start) write_count += 1 except Exception as e: errors += 1 # Split documents into batches for threads batch_size = len(documents) // num_threads batches = [documents[i:i+batch_size] for i in range(0, len(documents), batch_size)] # Run concurrent writes with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [] while time.time() - start_time < duration_seconds: for batch in batches: if time.time() - start_time >= duration_seconds: break future = executor.submit(write_worker, batch) futures.append(future) # Wait for completion for future in futures: future.result() total_time = time.time() - start_time return { 'writes_per_second': write_count / total_time, 'total_writes': write_count, 'total_time': total_time, 'avg_write_latency': np.mean(write_times), 'p95_write_latency': np.percentile(write_times, 95), 'p99_write_latency': np.percentile(write_times, 99), 'error_rate': errors / (write_count + errors) if (write_count + errors) > 0 else 0, 'concurrent_threads': num_threads } # Example usage benchmark = StorageThroughputBenchmark(your_storage_backend) # Generate test documents test_docs = [ {"id": f"doc_{i}", "content": f"Test document content {i}" * 100} for i in range(10000) ] results = benchmark.benchmark_write_throughput(test_docs, num_threads=20) print(f"Write Throughput: {results['writes_per_second']:.1f} docs/sec") print(f"P95 Latency: {results['p95_write_latency']*1000:.1f}ms")

Read Throughput

def benchmark_read_throughput(storage_backend, document_ids, num_threads=10, duration_seconds=60): """Benchmark read throughput with concurrent operations""" start_time = time.time() read_count = 0 read_times = [] errors = 0 def read_worker(): nonlocal read_count, errors while time.time() - start_time < duration_seconds: try: doc_id = np.random.choice(document_ids) read_start = time.time() document = storage_backend.get_document(doc_id) read_end = time.time() if document: read_times.append(read_end - read_start) read_count += 1 else: errors += 1 except Exception as e: errors += 1 # Run concurrent reads with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(read_worker) for _ in range(num_threads)] for future in futures: future.result() total_time = time.time() - start_time return { 'reads_per_second': read_count / total_time, 'total_reads': read_count, 'avg_read_latency': np.mean(read_times) if read_times else 0, 'p95_read_latency': np.percentile(read_times, 95) if read_times else 0, 'p99_read_latency': np.percentile(read_times, 99) if read_times else 0, 'error_rate': errors / (read_count + errors) if (read_count + errors) > 0 else 0 } # Example usage doc_ids = [f"doc_{i}" for i in range(10000)] read_results = benchmark_read_throughput(your_storage_backend, doc_ids) print(f"Read Throughput: {read_results['reads_per_second']:.1f} docs/sec")

Query Performance Benchmarks

Vector Search Performance

class VectorSearchBenchmark: def __init__(self, vector_store): self.vector_store = vector_store def benchmark_vector_search(self, query_vectors, k_values=[1, 5, 10, 20, 50], num_threads=5): """ Benchmark vector search performance across different k values Args: query_vectors: List of query embedding vectors k_values: List of k values to test num_threads: Number of concurrent query threads """ results = {} for k in k_values: print(f"Benchmarking k={k}") search_times = [] num_queries = 0 def search_worker(query_batch): nonlocal num_queries for query_vector in query_batch: start_time = time.time() results = self.vector_store.search(query_vector, k=k) end_time = time.time() search_times.append(end_time - start_time) num_queries += 1 # Split queries into batches batch_size = len(query_vectors) // num_threads batches = [query_vectors[i:i+batch_size] for i in range(0, len(query_vectors), batch_size)] # Execute concurrent searches start_total = time.time() with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(search_worker, batch) for batch in batches] for future in futures: future.result() end_total = time.time() results[k] = { 'avg_latency': np.mean(search_times), 'p50_latency': np.percentile(search_times, 50), 'p95_latency': np.percentile(search_times, 95), 'p99_latency': np.percentile(search_times, 99), 'queries_per_second': num_queries / (end_total - start_total), 'total_queries': num_queries } return results # Example usage import numpy as np # Generate random query vectors (replace with actual embeddings) query_vectors = [np.random.rand(384) for _ in range(1000)] vector_benchmark = VectorSearchBenchmark(your_vector_store) search_results = vector_benchmark.benchmark_vector_search(query_vectors) for k, metrics in search_results.items(): print(f"k={k}: {metrics['queries_per_second']:.1f} QPS, " f"P95: {metrics['p95_latency']*1000:.1f}ms")

Full-Text Search Performance

def benchmark_text_search(text_search_engine, queries, concurrent_users=10): """Benchmark full-text search performance""" search_times = [] total_results = 0 errors = 0 def search_worker(): nonlocal total_results, errors for query in queries: try: start_time = time.time() results = text_search_engine.search(query) end_time = time.time() search_times.append(end_time - start_time) total_results += len(results) except Exception as e: errors += 1 start_total = time.time() with ThreadPoolExecutor(max_workers=concurrent_users) as executor: futures = [executor.submit(search_worker) for _ in range(concurrent_users)] for future in futures: future.result() end_total = time.time() total_time = end_total - start_total total_queries = len(queries) * concurrent_users return { 'queries_per_second': total_queries / total_time, 'avg_latency': np.mean(search_times), 'p95_latency': np.percentile(search_times, 95), 'p99_latency': np.percentile(search_times, 99), 'avg_results_per_query': total_results / total_queries, 'error_rate': errors / total_queries, 'total_queries': total_queries } # Example queries test_queries = [ "machine learning algorithms", "deep neural networks", "natural language processing", "computer vision techniques", "reinforcement learning" ] * 200 # Repeat for more comprehensive testing text_search_results = benchmark_text_search(your_text_search, test_queries) print(f"Text Search QPS: {text_search_results['queries_per_second']:.1f}")

Storage Backend Comparisons

Database Performance Comparison

class StorageBackendComparison: def __init__(self): self.backends = {} def add_backend(self, name, backend): self.backends[name] = backend def run_comprehensive_benchmark(self, test_documents, test_queries): """Run comprehensive benchmark across all storage backends""" results = {} for backend_name, backend in self.backends.items(): print(f"Benchmarking {backend_name}...") # Write performance write_benchmark = StorageThroughputBenchmark(backend) write_results = write_benchmark.benchmark_write_throughput( test_documents, num_threads=10, duration_seconds=30 ) # Read performance doc_ids = [doc['id'] for doc in test_documents] read_results = benchmark_read_throughput(backend, doc_ids, duration_seconds=30) # Query performance (if backend supports search) query_results = None if hasattr(backend, 'search'): try: query_results = self._benchmark_query_performance(backend, test_queries) except Exception as e: print(f"Query benchmark failed for {backend_name}: {e}") results[backend_name] = { 'write_performance': write_results, 'read_performance': read_results, 'query_performance': query_results } return results def _benchmark_query_performance(self, backend, queries): """Benchmark query performance for backends that support search""" search_times = [] for query in queries: start_time = time.time() results = backend.search(query, limit=10) end_time = time.time() search_times.append(end_time - start_time) return { 'avg_query_latency': np.mean(search_times), 'p95_query_latency': np.percentile(search_times, 95), 'queries_per_second': len(queries) / sum(search_times) } def generate_comparison_report(self, results): """Generate detailed comparison report""" report = [] report.append("# Storage Backend Performance Comparison\n") # Write performance comparison report.append("## Write Performance") report.append("| Backend | Writes/sec | Avg Latency | P95 Latency | Error Rate |") report.append("|---------|------------|-------------|-------------|------------|") for backend_name, data in results.items(): write_perf = data['write_performance'] report.append( f"| {backend_name} | {write_perf['writes_per_second']:.1f} | " f"{write_perf['avg_write_latency']*1000:.1f}ms | " f"{write_perf['p95_write_latency']*1000:.1f}ms | " f"{write_perf['error_rate']:.3f} |" ) report.append("\n## Read Performance") report.append("| Backend | Reads/sec | Avg Latency | P95 Latency | Error Rate |") report.append("|---------|-----------|-------------|-------------|------------|") for backend_name, data in results.items(): read_perf = data['read_performance'] report.append( f"| {backend_name} | {read_perf['reads_per_second']:.1f} | " f"{read_perf['avg_read_latency']*1000:.1f}ms | " f"{read_perf['p95_read_latency']*1000:.1f}ms | " f"{read_perf['error_rate']:.3f} |" ) return "\n".join(report) # Example usage comparison = StorageBackendComparison() comparison.add_backend("PostgreSQL", postgresql_backend) comparison.add_backend("Elasticsearch", elasticsearch_backend) comparison.add_backend("Pinecone", pinecone_backend) comparison.add_backend("Chroma", chroma_backend) # Generate test data test_docs = [ {"id": f"doc_{i}", "content": f"Sample document content {i}" * 50} for i in range(5000) ] test_queries = ["search query"] * 100 # Run benchmarks results = comparison.run_comprehensive_benchmark(test_docs, test_queries) # Generate report report = comparison.generate_comparison_report(results) print(report)

Scalability Testing

Load Testing Framework

class ScalabilityTest: def __init__(self, storage_backend): self.storage = storage_backend def test_concurrent_load(self, max_concurrent_users=100, ramp_up_duration=300): """ Test system performance under increasing concurrent load Args: max_concurrent_users: Maximum number of concurrent operations ramp_up_duration: Time in seconds to ramp up to max load """ results = [] error_counts = [] user_increments = [10, 20, 50, 100, 200, 500, 1000] if max_concurrent_users not in user_increments: user_increments.append(max_concurrent_users) user_increments = [u for u in user_increments if u <= max_concurrent_users] user_increments.sort() for concurrent_users in user_increments: print(f"Testing with {concurrent_users} concurrent users...") # Run load test for this concurrency level metrics = self._run_load_test_iteration(concurrent_users, duration=60) metrics['concurrent_users'] = concurrent_users results.append(metrics) # Stop if error rate gets too high if metrics['error_rate'] > 0.1: # 10% error rate print(f"Stopping at {concurrent_users} users due to high error rate") break return results def _run_load_test_iteration(self, concurrent_users, duration): """Run single load test iteration""" operations = ['read', 'write', 'search'] # Mix of operations operation_times = {op: [] for op in operations} error_counts = {op: 0 for op in operations} total_operations = 0 def worker(): nonlocal total_operations end_time = time.time() + duration while time.time() < end_time: # Random operation selection operation = np.random.choice(operations) total_operations += 1 try: start_time = time.time() if operation == 'read': self.storage.get_document(f"doc_{np.random.randint(1000)}") elif operation == 'write': self.storage.store_document({ "id": f"temp_{total_operations}", "content": "Temporary test document" }) elif operation == 'search': self.storage.search("test query", limit=10) end_time_op = time.time() operation_times[operation].append(end_time_op - start_time) except Exception as e: error_counts[operation] += 1 # Run concurrent workers start_total = time.time() with ThreadPoolExecutor(max_workers=concurrent_users) as executor: futures = [executor.submit(worker) for _ in range(concurrent_users)] for future in futures: future.result() end_total = time.time() # Calculate aggregated metrics all_times = [] total_errors = 0 for op in operations: all_times.extend(operation_times[op]) total_errors += error_counts[op] return { 'throughput': total_operations / (end_total - start_total), 'avg_latency': np.mean(all_times) if all_times else 0, 'p95_latency': np.percentile(all_times, 95) if all_times else 0, 'p99_latency': np.percentile(all_times, 99) if all_times else 0, 'error_rate': total_errors / total_operations if total_operations > 0 else 0, 'total_operations': total_operations, 'operation_breakdown': { op: { 'count': len(times), 'avg_latency': np.mean(times) if times else 0, 'error_count': error_counts[op] } for op, times in operation_times.items() } } def find_performance_breaking_point(self, results): """Analyze results to find performance breaking point""" breaking_point = None for i, result in enumerate(results): # Define breaking point criteria latency_degraded = result['p95_latency'] > 1.0 # > 1 second P95 high_error_rate = result['error_rate'] > 0.05 # > 5% errors throughput_plateau = False if i > 0: prev_throughput = results[i-1]['throughput'] current_throughput = result['throughput'] throughput_plateau = current_throughput < prev_throughput * 0.9 # 10% drop if latency_degraded or high_error_rate or throughput_plateau: breaking_point = result['concurrent_users'] break return breaking_point # Example usage scalability_test = ScalabilityTest(your_storage_backend) load_results = scalability_test.test_concurrent_load(max_concurrent_users=1000) breaking_point = scalability_test.find_performance_breaking_point(load_results) print(f"Performance breaking point: {breaking_point} concurrent users") # Plot results import matplotlib.pyplot as plt users = [r['concurrent_users'] for r in load_results] throughput = [r['throughput'] for r in load_results] p95_latency = [r['p95_latency'] * 1000 for r in load_results] # Convert to ms fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8)) ax1.plot(users, throughput, 'b-o') ax1.set_ylabel('Operations/sec') ax1.set_title('Throughput vs Concurrent Users') ax1.grid(True) ax2.plot(users, p95_latency, 'r-o') ax2.set_ylabel('P95 Latency (ms)') ax2.set_xlabel('Concurrent Users') ax2.set_title('P95 Latency vs Concurrent Users') ax2.grid(True) plt.tight_layout() plt.savefig('scalability_results.png')

Storage Optimization Strategies

Index Performance Optimization

class IndexPerformanceOptimizer: def __init__(self, database): self.db = database def benchmark_index_configurations(self, table_name, query_patterns): """Test different index configurations for optimal performance""" index_configs = [ {'type': 'btree', 'columns': ['created_at']}, {'type': 'btree', 'columns': ['document_id']}, {'type': 'gin', 'columns': ['content_vector']}, # For JSONB/arrays {'type': 'gist', 'columns': ['embedding']}, # For vector similarity {'type': 'composite', 'columns': ['user_id', 'created_at']} ] results = {} for config in index_configs: index_name = f"test_idx_{config['type']}_{'_'.join(config['columns'])}" try: # Create index self._create_index(index_name, table_name, config) # Benchmark queries with this index perf_metrics = self._benchmark_queries_with_index(query_patterns, index_name) results[index_name] = perf_metrics # Clean up self._drop_index(index_name) except Exception as e: print(f"Failed to test index {index_name}: {e}") return results def _create_index(self, index_name, table_name, config): """Create index based on configuration""" columns_str = ', '.join(config['columns']) if config['type'] == 'btree': sql = f"CREATE INDEX {index_name} ON {table_name} ({columns_str})" elif config['type'] == 'gin': sql = f"CREATE INDEX {index_name} ON {table_name} USING GIN ({columns_str})" elif config['type'] == 'gist': sql = f"CREATE INDEX {index_name} ON {table_name} USING GIST ({columns_str})" elif config['type'] == 'composite': sql = f"CREATE INDEX {index_name} ON {table_name} ({columns_str})" self.db.execute(sql) def _benchmark_queries_with_index(self, query_patterns, index_name): """Benchmark query performance with given index""" query_times = [] for query in query_patterns: # Warm up for _ in range(3): self.db.execute(query) # Measure start_time = time.time() for _ in range(10): # Multiple iterations for stable measurement self.db.execute(query) end_time = time.time() avg_time = (end_time - start_time) / 10 query_times.append(avg_time) return { 'avg_query_time': np.mean(query_times), 'p95_query_time': np.percentile(query_times, 95), 'index_name': index_name } def _drop_index(self, index_name): """Drop test index""" try: self.db.execute(f"DROP INDEX {index_name}") except Exception: pass # Index might not exist # Example usage optimizer = IndexPerformanceOptimizer(your_database) test_queries = [ "SELECT * FROM documents WHERE created_at > '2024-01-01'", "SELECT * FROM documents WHERE document_id = 'doc_123'", "SELECT * FROM documents WHERE user_id = 'user_456' ORDER BY created_at DESC LIMIT 10" ] index_results = optimizer.benchmark_index_configurations('documents', test_queries) for index_name, metrics in index_results.items(): print(f"{index_name}: Avg Query Time: {metrics['avg_query_time']*1000:.2f}ms")

Caching Strategy Evaluation

class CachePerformanceBenchmark: def __init__(self, cache_systems): self.cache_systems = cache_systems # Dict of cache name -> cache instance def benchmark_cache_strategies(self, workload_patterns): """ Benchmark different caching strategies workload_patterns: Dict with patterns like: { 'read_heavy': {'reads': 0.8, 'writes': 0.2}, 'write_heavy': {'reads': 0.3, 'writes': 0.7}, 'mixed': {'reads': 0.5, 'writes': 0.5} } """ results = {} for pattern_name, pattern in workload_patterns.items(): print(f"Testing workload pattern: {pattern_name}") pattern_results = {} for cache_name, cache in self.cache_systems.items(): print(f" Testing cache: {cache_name}") # Reset cache state cache.clear() # Generate workload operations = self._generate_workload(pattern, num_operations=10000) # Benchmark metrics = self._run_cache_benchmark(cache, operations) pattern_results[cache_name] = metrics results[pattern_name] = pattern_results return results def _generate_workload(self, pattern, num_operations): """Generate realistic workload operations""" operations = [] # Create pool of keys with Zipf distribution (realistic access pattern) num_unique_keys = 1000 zipf_param = 1.2 # Higher values = more skewed towards popular keys key_weights = [1.0 / (i ** zipf_param) for i in range(1, num_unique_keys + 1)] key_weights = np.array(key_weights) / sum(key_weights) for _ in range(num_operations): # Choose operation type based on pattern if np.random.random() < pattern['reads']: operation_type = 'read' else: operation_type = 'write' # Choose key with Zipf distribution key_idx = np.random.choice(num_unique_keys, p=key_weights) key = f"key_{key_idx}" operations.append({ 'type': operation_type, 'key': key, 'value': f"value_for_{key}" if operation_type == 'write' else None }) return operations def _run_cache_benchmark(self, cache, operations): """Run benchmark operations on cache system""" start_time = time.time() hit_count = 0 miss_count = 0 write_count = 0 operation_times = [] for op in operations: op_start = time.time() if op['type'] == 'read': result = cache.get(op['key']) if result is not None: hit_count += 1 else: miss_count += 1 else: # write cache.set(op['key'], op['value']) write_count += 1 op_end = time.time() operation_times.append(op_end - op_start) end_time = time.time() total_reads = hit_count + miss_count hit_rate = hit_count / total_reads if total_reads > 0 else 0 return { 'total_time': end_time - start_time, 'operations_per_second': len(operations) / (end_time - start_time), 'hit_rate': hit_rate, 'miss_rate': 1 - hit_rate, 'avg_operation_latency': np.mean(operation_times), 'p95_operation_latency': np.percentile(operation_times, 95), 'total_operations': len(operations), 'reads': total_reads, 'writes': write_count } # Example usage with different cache systems cache_systems = { 'redis': redis_cache_instance, 'memcached': memcached_cache_instance, 'in_memory': in_memory_cache_instance } workload_patterns = { 'read_heavy': {'reads': 0.8, 'writes': 0.2}, 'write_heavy': {'reads': 0.3, 'writes': 0.7}, 'balanced': {'reads': 0.5, 'writes': 0.5} } cache_benchmark = CachePerformanceBenchmark(cache_systems) cache_results = cache_benchmark.benchmark_cache_strategies(workload_patterns) # Analyze results for pattern, systems in cache_results.items(): print(f"\n{pattern.upper()} WORKLOAD:") for system, metrics in systems.items(): print(f" {system}: Hit Rate: {metrics['hit_rate']:.3f}, " f"OPS: {metrics['operations_per_second']:.0f}, " f"P95 Latency: {metrics['p95_operation_latency']*1000:.2f}ms")

Production Monitoring

Real-Time Performance Metrics

class ProductionPerformanceMonitor: def __init__(self, storage_backend): self.storage = storage_backend self.metrics_buffer = [] self.alert_thresholds = { 'write_latency_p95': 1.0, # seconds 'read_latency_p95': 0.5, # seconds 'error_rate': 0.05, # 5% 'throughput_drop': 0.3 # 30% drop from baseline } def start_monitoring(self, collection_interval=60): """Start continuous performance monitoring""" def collect_metrics(): while True: metrics = self._collect_current_metrics() self.metrics_buffer.append(metrics) # Keep only recent metrics (last 24 hours) if len(self.metrics_buffer) > 1440: # 24 * 60 minutes self.metrics_buffer.pop(0) # Check for alerts self._check_alerts(metrics) time.sleep(collection_interval) monitoring_thread = threading.Thread(target=collect_metrics, daemon=True) monitoring_thread.start() def _collect_current_metrics(self): """Collect current performance metrics""" # This would integrate with your storage system's metrics # Here's a simplified example timestamp = time.time() # Sample metrics collection (adapt to your storage system) metrics = { 'timestamp': timestamp, 'write_ops_per_second': self._get_write_ops_rate(), 'read_ops_per_second': self._get_read_ops_rate(), 'write_latency_avg': self._get_avg_write_latency(), 'write_latency_p95': self._get_p95_write_latency(), 'read_latency_avg': self._get_avg_read_latency(), 'read_latency_p95': self._get_p95_read_latency(), 'error_rate': self._get_current_error_rate(), 'active_connections': self._get_active_connections(), 'memory_usage_percent': self._get_memory_usage(), 'disk_usage_percent': self._get_disk_usage() } return metrics def _check_alerts(self, current_metrics): """Check metrics against alert thresholds""" alerts = [] if current_metrics['write_latency_p95'] > self.alert_thresholds['write_latency_p95']: alerts.append(f"High write latency: {current_metrics['write_latency_p95']:.3f}s") if current_metrics['read_latency_p95'] > self.alert_thresholds['read_latency_p95']: alerts.append(f"High read latency: {current_metrics['read_latency_p95']:.3f}s") if current_metrics['error_rate'] > self.alert_thresholds['error_rate']: alerts.append(f"High error rate: {current_metrics['error_rate']:.3%}") # Check for throughput drops if len(self.metrics_buffer) > 10: recent_throughput = current_metrics['read_ops_per_second'] + current_metrics['write_ops_per_second'] baseline_metrics = self.metrics_buffer[-11:-1] # Previous 10 minutes baseline_throughput = np.mean([ m['read_ops_per_second'] + m['write_ops_per_second'] for m in baseline_metrics ]) if baseline_throughput > 0: throughput_ratio = recent_throughput / baseline_throughput if throughput_ratio < (1 - self.alert_thresholds['throughput_drop']): alerts.append(f"Throughput drop: {throughput_ratio:.2%} of baseline") if alerts: self._send_alerts(alerts) def _send_alerts(self, alerts): """Send performance alerts""" for alert in alerts: print(f"ALERT: {alert}") # Integrate with your alerting system (PagerDuty, Slack, etc.) def get_performance_report(self, hours=24): """Generate performance report for specified time period""" if not self.metrics_buffer: return "No metrics available" cutoff_time = time.time() - (hours * 3600) recent_metrics = [m for m in self.metrics_buffer if m['timestamp'] > cutoff_time] if not recent_metrics: return f"No metrics available for the last {hours} hours" # Calculate aggregated statistics write_latencies = [m['write_latency_p95'] for m in recent_metrics] read_latencies = [m['read_latency_p95'] for m in recent_metrics] error_rates = [m['error_rate'] for m in recent_metrics] throughputs = [m['read_ops_per_second'] + m['write_ops_per_second'] for m in recent_metrics] report = f""" Performance Report - Last {hours} Hours ===================================== Latency Metrics: - Write P95 Latency: Avg {np.mean(write_latencies)*1000:.1f}ms, Max {np.max(write_latencies)*1000:.1f}ms - Read P95 Latency: Avg {np.mean(read_latencies)*1000:.1f}ms, Max {np.max(read_latencies)*1000:.1f}ms Throughput Metrics: - Average Throughput: {np.mean(throughputs):.1f} ops/sec - Peak Throughput: {np.max(throughputs):.1f} ops/sec - Minimum Throughput: {np.min(throughputs):.1f} ops/sec Reliability Metrics: - Average Error Rate: {np.mean(error_rates):.3%} - Peak Error Rate: {np.max(error_rates):.3%} Total Operations: {len(recent_metrics)} metric collections """ return report.strip() # Usage monitor = ProductionPerformanceMonitor(your_storage_backend) monitor.start_monitoring(collection_interval=60) # Collect metrics every minute # Later, generate a report print(monitor.get_performance_report(hours=6))

Best Practices

Performance Testing Guidelines

  1. Baseline Establishment: Always establish performance baselines before optimizations
  2. Realistic Data: Use production-like data volumes and distributions
  3. Environment Consistency: Test in environments similar to production
  4. Warmup Periods: Allow for JIT compilation and cache warming
  5. Statistical Significance: Run multiple iterations and calculate confidence intervals

Optimization Strategies

  1. Index Strategy: Create indexes based on actual query patterns
  2. Batch Operations: Batch small operations to reduce overhead
  3. Connection Pooling: Optimize connection pool sizes for your workload
  4. Async Operations: Use asynchronous operations where possible
  5. Caching Layers: Implement multi-level caching strategies

Monitoring and Alerting

  1. Key Metrics: Monitor latency percentiles, throughput, and error rates
  2. Trending: Track performance trends over time to identify degradation
  3. Capacity Planning: Monitor resource utilization for capacity planning
  4. Alert Fatigue: Set appropriate alert thresholds to avoid noise

This comprehensive storage performance benchmarking guide provides the tools and methodologies needed to measure, optimize, and maintain high-performance agent memory systems at scale.