Skip to Content
DocumentationBenchmarksLatency Optimization

Latency Optimization Benchmarks

Minimizing latency is critical for responsive agent memory systems. This guide provides comprehensive benchmarking methodologies for measuring and optimizing latency across all components of agent memory architectures, from storage access to embedding generation.

Overview

Agent memory latency directly impacts user experience and system responsiveness. High latency leads to slow agent responses, timeouts, and poor user satisfaction. Effective latency optimization requires systematic measurement and targeted optimization strategies.

Core Latency Metrics

End-to-End Latency Measurement

import time import asyncio import threading from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import numpy as np from dataclasses import dataclass from contextlib import contextmanager @dataclass class LatencyMeasurement: operation: str latency: float timestamp: float success: bool metadata: Dict[str, Any] class ComprehensiveLatencyBenchmark: def __init__(self, memory_system): self.memory_system = memory_system self.measurements = [] self.percentiles = [50, 90, 95, 99, 99.9] @contextmanager def measure_latency(self, operation_name: str, **metadata): """Context manager for measuring operation latency""" start_time = time.time() success = True error = None try: yield except Exception as e: success = False error = str(e) raise finally: end_time = time.time() latency = end_time - start_time measurement = LatencyMeasurement( operation=operation_name, latency=latency, timestamp=start_time, success=success, metadata={**metadata, 'error': error if not success else None} ) self.measurements.append(measurement) def benchmark_memory_operations(self, num_iterations: int = 1000): """Comprehensive benchmark of all memory operations""" operations = { 'store_document': self._benchmark_store_operations, 'retrieve_similar': self._benchmark_retrieval_operations, 'search_text': self._benchmark_search_operations, 'update_document': self._benchmark_update_operations, 'delete_document': self._benchmark_delete_operations, 'generate_embedding': self._benchmark_embedding_operations } results = {} for operation_name, benchmark_func in operations.items(): print(f"Benchmarking {operation_name}...") operation_results = benchmark_func(num_iterations) results[operation_name] = operation_results return results def _benchmark_store_operations(self, num_iterations: int): """Benchmark document storage operations""" results = [] for i in range(num_iterations): document = { 'id': f'bench_doc_{i}', 'content': f'Benchmark document content {i}' * 10, 'timestamp': time.time() } with self.measure_latency('store_document', doc_id=document['id']): self.memory_system.store_document(document) # Extract measurements for this operation store_measurements = [m for m in self.measurements if m.operation == 'store_document'] return self._analyze_measurements(store_measurements) def _benchmark_retrieval_operations(self, num_iterations: int): """Benchmark similarity retrieval operations""" # Pre-populate with documents if needed self._ensure_documents_exist(num_iterations) query_texts = [ f"Find documents about topic {i}" for i in range(num_iterations) ] for i, query in enumerate(query_texts): with self.measure_latency('retrieve_similar', query_length=len(query)): results = self.memory_system.retrieve_similar(query, top_k=5) retrieval_measurements = [m for m in self.measurements if m.operation == 'retrieve_similar'] return self._analyze_measurements(retrieval_measurements) def _benchmark_search_operations(self, num_iterations: int): """Benchmark text search operations""" search_queries = [ f"search query {i}" for i in range(num_iterations) ] for i, query in enumerate(search_queries): with self.measure_latency('search_text', query=query): results = self.memory_system.search_text(query) search_measurements = [m for m in self.measurements if m.operation == 'search_text'] return self._analyze_measurements(search_measurements) def _benchmark_update_operations(self, num_iterations: int): """Benchmark document update operations""" self._ensure_documents_exist(num_iterations) for i in range(num_iterations): doc_id = f'bench_doc_{i}' updated_content = f'Updated content {i}' * 5 with self.measure_latency('update_document', doc_id=doc_id): self.memory_system.update_document(doc_id, {'content': updated_content}) update_measurements = [m for m in self.measurements if m.operation == 'update_document'] return self._analyze_measurements(update_measurements) def _benchmark_delete_operations(self, num_iterations: int): """Benchmark document deletion operations""" # Create documents specifically for deletion for i in range(num_iterations): doc_id = f'delete_doc_{i}' self.memory_system.store_document({ 'id': doc_id, 'content': f'Document to delete {i}' }) for i in range(num_iterations): doc_id = f'delete_doc_{i}' with self.measure_latency('delete_document', doc_id=doc_id): self.memory_system.delete_document(doc_id) delete_measurements = [m for m in self.measurements if m.operation == 'delete_document'] return self._analyze_measurements(delete_measurements) def _benchmark_embedding_operations(self, num_iterations: int): """Benchmark embedding generation operations""" texts = [ f'Text to embed number {i} with some meaningful content' for i in range(num_iterations) ] for i, text in enumerate(texts): with self.measure_latency('generate_embedding', text_length=len(text)): embedding = self.memory_system.generate_embedding(text) embedding_measurements = [m for m in self.measurements if m.operation == 'generate_embedding'] return self._analyze_measurements(embedding_measurements) def _analyze_measurements(self, measurements: List[LatencyMeasurement]): """Analyze latency measurements and calculate statistics""" if not measurements: return {} latencies = [m.latency for m in measurements if m.success] success_rate = sum(1 for m in measurements if m.success) / len(measurements) if not latencies: return {'success_rate': success_rate, 'error': 'No successful operations'} stats = { 'count': len(latencies), 'success_rate': success_rate, 'mean': np.mean(latencies), 'median': np.median(latencies), 'std': np.std(latencies), 'min': np.min(latencies), 'max': np.max(latencies) } # Add percentile statistics for percentile in self.percentiles: stats[f'p{percentile}'] = np.percentile(latencies, percentile) # Add throughput calculation if latencies: stats['ops_per_second'] = 1.0 / np.mean(latencies) return stats def _ensure_documents_exist(self, count: int): """Ensure documents exist for benchmarking""" try: # Try to retrieve a document to check if they exist self.memory_system.get_document('bench_doc_0') except: # Documents don't exist, create them for i in range(count): self.memory_system.store_document({ 'id': f'bench_doc_{i}', 'content': f'Benchmark document {i}' * 10 }) def benchmark_concurrent_operations(self, concurrent_users: int = 10, operations_per_user: int = 100): """Benchmark system under concurrent load""" operations = ['store', 'retrieve', 'search', 'update'] def worker(user_id: int): """Worker function for each concurrent user""" user_measurements = [] for i in range(operations_per_user): operation = operations[i % len(operations)] if operation == 'store': doc = { 'id': f'user_{user_id}_doc_{i}', 'content': f'Content from user {user_id}, iteration {i}' } with self.measure_latency('concurrent_store', user_id=user_id): self.memory_system.store_document(doc) elif operation == 'retrieve': query = f'Query from user {user_id}, iteration {i}' with self.measure_latency('concurrent_retrieve', user_id=user_id): self.memory_system.retrieve_similar(query, top_k=5) elif operation == 'search': query = f'Search from user {user_id}, iteration {i}' with self.measure_latency('concurrent_search', user_id=user_id): self.memory_system.search_text(query) elif operation == 'update': doc_id = f'user_{user_id}_doc_{max(0, i-10)}' # Update recent docs with self.measure_latency('concurrent_update', user_id=user_id): try: self.memory_system.update_document(doc_id, { 'content': f'Updated by user {user_id} at iteration {i}' }) except: pass # Document might not exist # Run concurrent workers with ThreadPoolExecutor(max_workers=concurrent_users) as executor: futures = [executor.submit(worker, user_id) for user_id in range(concurrent_users)] for future in as_completed(futures): future.result() # Analyze concurrent operation results concurrent_measurements = [m for m in self.measurements if 'concurrent_' in m.operation] results_by_operation = {} for operation in ['concurrent_store', 'concurrent_retrieve', 'concurrent_search', 'concurrent_update']: op_measurements = [m for m in concurrent_measurements if m.operation == operation] results_by_operation[operation] = self._analyze_measurements(op_measurements) return results_by_operation def generate_latency_report(self): """Generate comprehensive latency report""" if not self.measurements: return "No measurements available" # Group measurements by operation operations = {} for measurement in self.measurements: if measurement.operation not in operations: operations[measurement.operation] = [] operations[measurement.operation].append(measurement) report = ["# Latency Benchmark Report\n"] for operation, measurements in operations.items(): stats = self._analyze_measurements(measurements) if 'error' in stats: report.append(f"## {operation}") report.append(f"Error: {stats['error']}\n") continue report.append(f"## {operation}") report.append(f"- **Count**: {stats['count']} operations") report.append(f"- **Success Rate**: {stats['success_rate']:.1%}") report.append(f"- **Mean Latency**: {stats['mean']*1000:.2f}ms") report.append(f"- **Median Latency**: {stats['median']*1000:.2f}ms") report.append(f"- **P95 Latency**: {stats['p95']*1000:.2f}ms") report.append(f"- **P99 Latency**: {stats['p99']*1000:.2f}ms") report.append(f"- **Max Latency**: {stats['max']*1000:.2f}ms") report.append(f"- **Throughput**: {stats['ops_per_second']:.1f} ops/sec") report.append("") return "\n".join(report) # Example usage benchmark = ComprehensiveLatencyBenchmark(your_memory_system) # Run comprehensive benchmarks print("Running comprehensive latency benchmarks...") results = benchmark.benchmark_memory_operations(num_iterations=500) # Test concurrent performance print("Testing concurrent performance...") concurrent_results = benchmark.benchmark_concurrent_operations( concurrent_users=20, operations_per_user=50 ) # Generate report print(benchmark.generate_latency_report()) # Print summary for operation, stats in results.items(): if 'error' not in stats: print(f"{operation}: P95={stats['p95']*1000:.1f}ms, " f"Throughput={stats['ops_per_second']:.1f} ops/sec")

Component-Level Latency Profiling

class ComponentLatencyProfiler: def __init__(self): self.component_measurements = {} def profile_embedding_pipeline(self, texts: List[str], embedding_models: List[str]): """Profile latency of different embedding models""" results = {} for model_name in embedding_models: print(f"Profiling {model_name}...") model_results = [] for text in texts: # Measure tokenization start_time = time.time() tokens = self._tokenize(text, model_name) tokenization_time = time.time() - start_time # Measure embedding generation start_time = time.time() embedding = self._generate_embedding(tokens, model_name) embedding_time = time.time() - start_time # Measure normalization start_time = time.time() normalized = self._normalize_embedding(embedding) normalization_time = time.time() - start_time total_time = tokenization_time + embedding_time + normalization_time model_results.append({ 'text_length': len(text), 'token_count': len(tokens) if tokens else 0, 'tokenization_time': tokenization_time, 'embedding_time': embedding_time, 'normalization_time': normalization_time, 'total_time': total_time, 'tokens_per_second': len(tokens) / embedding_time if embedding_time > 0 else 0 }) results[model_name] = self._analyze_component_results(model_results) return results def profile_storage_operations(self, storage_backends: List[Dict]): """Profile latency across different storage backends""" results = {} test_documents = self._generate_test_documents(1000) for backend_config in storage_backends: backend_name = backend_config['name'] backend = backend_config['instance'] print(f"Profiling {backend_name}...") # Profile write operations write_times = [] for doc in test_documents[:100]: # Test with subset start_time = time.time() backend.store_document(doc) write_times.append(time.time() - start_time) # Profile read operations read_times = [] for doc in test_documents[:100]: start_time = time.time() retrieved = backend.get_document(doc['id']) read_times.append(time.time() - start_time) # Profile search operations search_times = [] search_queries = [f"query {i}" for i in range(50)] for query in search_queries: start_time = time.time() results_list = backend.search(query, limit=10) search_times.append(time.time() - start_time) results[backend_name] = { 'write_latency': self._calculate_latency_stats(write_times), 'read_latency': self._calculate_latency_stats(read_times), 'search_latency': self._calculate_latency_stats(search_times) } return results def profile_network_latency(self, endpoints: List[str]): """Profile network latency to various endpoints""" results = {} for endpoint in endpoints: latencies = [] for _ in range(50): # 50 measurements per endpoint start_time = time.time() try: response = self._make_request(endpoint) latency = time.time() - start_time latencies.append(latency) except Exception as e: print(f"Error measuring {endpoint}: {e}") if latencies: results[endpoint] = self._calculate_latency_stats(latencies) return results def profile_memory_allocation(self, operations: List[callable]): """Profile memory allocation latency for different operations""" import tracemalloc results = {} for i, operation in enumerate(operations): operation_name = f"operation_{i}" # Measure memory allocation overhead allocation_times = [] for _ in range(100): tracemalloc.start() start_time = time.time() try: result = operation() allocation_time = time.time() - start_time allocation_times.append(allocation_time) except Exception as e: print(f"Error in {operation_name}: {e}") finally: tracemalloc.stop() if allocation_times: results[operation_name] = self._calculate_latency_stats(allocation_times) return results def _tokenize(self, text: str, model_name: str): """Mock tokenization for different models""" if model_name == 'bert': return text.split()[:512] # BERT max length elif model_name == 'gpt': return text.split()[:2048] # GPT context length else: return text.split() def _generate_embedding(self, tokens: List[str], model_name: str): """Mock embedding generation""" # Simulate different model complexities if model_name == 'small_model': time.sleep(0.01) # 10ms return np.random.rand(256) elif model_name == 'large_model': time.sleep(0.1) # 100ms return np.random.rand(1024) else: time.sleep(0.05) # 50ms return np.random.rand(512) def _normalize_embedding(self, embedding: np.ndarray): """Normalize embedding vector""" return embedding / np.linalg.norm(embedding) def _generate_test_documents(self, count: int): """Generate test documents""" return [ { 'id': f'doc_{i}', 'content': f'Test document content {i}' * (i % 10 + 1), 'metadata': {'index': i} } for i in range(count) ] def _make_request(self, endpoint: str): """Make HTTP request to endpoint""" import requests try: response = requests.get(endpoint, timeout=5) return response except requests.RequestException: raise def _calculate_latency_stats(self, latencies: List[float]): """Calculate latency statistics""" if not latencies: return {} return { 'count': len(latencies), 'mean': np.mean(latencies), 'median': np.median(latencies), 'std': np.std(latencies), 'min': np.min(latencies), 'max': np.max(latencies), 'p95': np.percentile(latencies, 95), 'p99': np.percentile(latencies, 99) } def _analyze_component_results(self, results: List[Dict]): """Analyze component-specific results""" if not results: return {} # Extract different timing components tokenization_times = [r['tokenization_time'] for r in results] embedding_times = [r['embedding_time'] for r in results] normalization_times = [r['normalization_time'] for r in results] total_times = [r['total_time'] for r in results] return { 'tokenization': self._calculate_latency_stats(tokenization_times), 'embedding_generation': self._calculate_latency_stats(embedding_times), 'normalization': self._calculate_latency_stats(normalization_times), 'total': self._calculate_latency_stats(total_times), 'avg_tokens_per_second': np.mean([r['tokens_per_second'] for r in results]) } # Example usage profiler = ComponentLatencyProfiler() # Profile embedding pipeline test_texts = [ "Short text", "Medium length text with some more content to test tokenization", "Very long text " * 100 + " that will test the limits of the embedding model" ] embedding_models = ['small_model', 'medium_model', 'large_model'] embedding_results = profiler.profile_embedding_pipeline(test_texts, embedding_models) for model, stats in embedding_results.items(): print(f"{model}:") print(f" Total P95: {stats['total']['p95']*1000:.1f}ms") print(f" Embedding P95: {stats['embedding_generation']['p95']*1000:.1f}ms")

Advanced Latency Optimization Techniques

Caching Strategy Optimization

class CacheLatencyOptimizer: def __init__(self, cache_system): self.cache = cache_system self.hit_times = [] self.miss_times = [] def optimize_cache_configuration(self, workload_patterns: List[Dict]): """Optimize cache configuration based on workload patterns""" configurations = [ {'ttl': 300, 'max_size': 1000, 'eviction': 'lru'}, {'ttl': 600, 'max_size': 2000, 'eviction': 'lru'}, {'ttl': 1200, 'max_size': 5000, 'eviction': 'lru'}, {'ttl': 300, 'max_size': 1000, 'eviction': 'lfu'}, {'ttl': 600, 'max_size': 2000, 'eviction': 'lfu'}, ] results = {} for config in configurations: config_name = f"ttl{config['ttl']}_size{config['max_size']}_{config['eviction']}" print(f"Testing configuration: {config_name}") # Apply configuration self.cache.configure(**config) self.cache.clear() # Test with workload patterns config_results = self._test_cache_with_workload(workload_patterns) results[config_name] = config_results return results def _test_cache_with_workload(self, workload_patterns: List[Dict]): """Test cache performance with specific workload""" hit_times = [] miss_times = [] hit_count = 0 miss_count = 0 for pattern in workload_patterns: operation = pattern.get('operation', 'get') key = pattern.get('key', 'default_key') value = pattern.get('value', 'default_value') if operation == 'get': start_time = time.time() result = self.cache.get(key) latency = time.time() - start_time if result is not None: hit_times.append(latency) hit_count += 1 else: miss_times.append(latency) miss_count += 1 elif operation == 'set': start_time = time.time() self.cache.set(key, value) latency = time.time() - start_time total_operations = hit_count + miss_count hit_rate = hit_count / total_operations if total_operations > 0 else 0 return { 'hit_rate': hit_rate, 'avg_hit_latency': np.mean(hit_times) if hit_times else 0, 'avg_miss_latency': np.mean(miss_times) if miss_times else 0, 'p95_hit_latency': np.percentile(hit_times, 95) if hit_times else 0, 'p95_miss_latency': np.percentile(miss_times, 95) if miss_times else 0, 'total_operations': total_operations } def analyze_cache_effectiveness(self, access_patterns: List[str]): """Analyze cache effectiveness for different access patterns""" cache_analysis = {} # Test with different cache sizes cache_sizes = [100, 500, 1000, 2000, 5000] for size in cache_sizes: self.cache.configure(max_size=size) self.cache.clear() hit_count = 0 total_accesses = 0 latencies = [] for key in access_patterns: start_time = time.time() result = self.cache.get(key) latency = time.time() - start_time latencies.append(latency) if result is not None: hit_count += 1 else: # Simulate cache miss penalty self.cache.set(key, f"value_for_{key}") total_accesses += 1 hit_rate = hit_count / total_accesses avg_latency = np.mean(latencies) cache_analysis[size] = { 'hit_rate': hit_rate, 'avg_latency': avg_latency, 'effective_latency': avg_latency * hit_rate + (1 - hit_rate) * 0.1 # Assume 100ms miss penalty } return cache_analysis # Example usage cache_optimizer = CacheLatencyOptimizer(your_cache_system) # Generate workload patterns workload = [] for i in range(1000): # Zipf distribution for realistic access patterns key_id = int(np.random.zipf(1.2)) % 100 workload.append({ 'operation': 'get', 'key': f'key_{key_id}', 'value': f'value_{key_id}' }) # Add some set operations for i in range(100): workload.append({ 'operation': 'set', 'key': f'key_{i}', 'value': f'updated_value_{i}' }) cache_results = cache_optimizer.optimize_cache_configuration(workload) # Find optimal configuration best_config = min(cache_results.items(), key=lambda x: x[1]['avg_hit_latency'] + x[1]['avg_miss_latency']) print(f"Optimal cache configuration: {best_config[0]}") print(f"Hit rate: {best_config[1]['hit_rate']:.2%}") print(f"Avg hit latency: {best_config[1]['avg_hit_latency']*1000:.2f}ms")

Database Query Optimization

class DatabaseLatencyOptimizer: def __init__(self, database_connection): self.db = database_connection def optimize_query_performance(self, query_patterns: List[Dict]): """Optimize database queries for minimal latency""" optimization_strategies = [ 'add_indexes', 'query_rewriting', 'connection_pooling', 'prepared_statements', 'batch_operations' ] results = {} for strategy in optimization_strategies: print(f"Testing {strategy}...") strategy_results = self._test_optimization_strategy(strategy, query_patterns) results[strategy] = strategy_results return results def _test_optimization_strategy(self, strategy: str, query_patterns: List[Dict]): """Test specific optimization strategy""" if strategy == 'add_indexes': return self._test_index_optimization(query_patterns) elif strategy == 'query_rewriting': return self._test_query_rewriting(query_patterns) elif strategy == 'connection_pooling': return self._test_connection_pooling(query_patterns) elif strategy == 'prepared_statements': return self._test_prepared_statements(query_patterns) elif strategy == 'batch_operations': return self._test_batch_operations(query_patterns) def _test_index_optimization(self, query_patterns: List[Dict]): """Test impact of adding indexes""" # Test queries without indexes baseline_times = [] for pattern in query_patterns[:50]: # Test subset query = pattern['query'] start_time = time.time() self.db.execute(query) baseline_times.append(time.time() - start_time) baseline_avg = np.mean(baseline_times) # Add suggested indexes suggested_indexes = self._analyze_query_patterns(query_patterns) for index_sql in suggested_indexes: try: self.db.execute(index_sql) except: pass # Index might already exist # Test queries with indexes optimized_times = [] for pattern in query_patterns[:50]: query = pattern['query'] start_time = time.time() self.db.execute(query) optimized_times.append(time.time() - start_time) optimized_avg = np.mean(optimized_times) return { 'baseline_avg_latency': baseline_avg, 'optimized_avg_latency': optimized_avg, 'improvement_factor': baseline_avg / optimized_avg if optimized_avg > 0 else 1, 'indexes_added': len(suggested_indexes) } def _test_query_rewriting(self, query_patterns: List[Dict]): """Test query rewriting optimizations""" optimization_results = [] for pattern in query_patterns[:20]: # Test subset original_query = pattern['query'] # Time original query start_time = time.time() self.db.execute(original_query) original_time = time.time() - start_time # Generate optimized version optimized_query = self._optimize_query(original_query) # Time optimized query start_time = time.time() self.db.execute(optimized_query) optimized_time = time.time() - start_time optimization_results.append({ 'original_time': original_time, 'optimized_time': optimized_time, 'improvement': original_time / optimized_time if optimized_time > 0 else 1 }) avg_improvement = np.mean([r['improvement'] for r in optimization_results]) return { 'avg_improvement_factor': avg_improvement, 'queries_optimized': len(optimization_results), 'avg_original_latency': np.mean([r['original_time'] for r in optimization_results]), 'avg_optimized_latency': np.mean([r['optimized_time'] for r in optimization_results]) } def _test_connection_pooling(self, query_patterns: List[Dict]): """Test impact of connection pooling on latency""" # Test without connection pooling (new connection per query) single_connection_times = [] for pattern in query_patterns[:30]: start_time = time.time() # Simulate new connection overhead time.sleep(0.01) # 10ms connection overhead self.db.execute(pattern['query']) single_connection_times.append(time.time() - start_time) # Test with connection pooling (reuse connections) pooled_connection_times = [] for pattern in query_patterns[:30]: start_time = time.time() self.db.execute(pattern['query']) pooled_connection_times.append(time.time() - start_time) return { 'single_connection_avg': np.mean(single_connection_times), 'pooled_connection_avg': np.mean(pooled_connection_times), 'improvement_factor': ( np.mean(single_connection_times) / np.mean(pooled_connection_times) if np.mean(pooled_connection_times) > 0 else 1 ) } def _test_prepared_statements(self, query_patterns: List[Dict]): """Test prepared statements vs regular queries""" # Group patterns by query template query_templates = {} for pattern in query_patterns: template = self._extract_query_template(pattern['query']) if template not in query_templates: query_templates[template] = [] query_templates[template].append(pattern) prepared_statement_results = [] for template, patterns in query_templates.items(): if len(patterns) < 5: # Skip templates with few instances continue # Test regular queries regular_times = [] for pattern in patterns[:10]: start_time = time.time() self.db.execute(pattern['query']) regular_times.append(time.time() - start_time) # Test prepared statements prepared_times = [] prepared_query = self.db.prepare(template) for pattern in patterns[:10]: start_time = time.time() prepared_query.execute(pattern.get('params', [])) prepared_times.append(time.time() - start_time) prepared_statement_results.append({ 'regular_avg': np.mean(regular_times), 'prepared_avg': np.mean(prepared_times), 'improvement': np.mean(regular_times) / np.mean(prepared_times) if np.mean(prepared_times) > 0 else 1 }) if prepared_statement_results: return { 'avg_improvement_factor': np.mean([r['improvement'] for r in prepared_statement_results]), 'templates_tested': len(prepared_statement_results) } else: return {'avg_improvement_factor': 1, 'templates_tested': 0} def _test_batch_operations(self, query_patterns: List[Dict]): """Test batch operations vs individual operations""" insert_patterns = [p for p in query_patterns if 'INSERT' in p['query'].upper()] if len(insert_patterns) < 10: return {'improvement_factor': 1, 'note': 'Insufficient INSERT patterns for testing'} # Test individual inserts individual_times = [] for pattern in insert_patterns[:50]: start_time = time.time() self.db.execute(pattern['query']) individual_times.append(time.time() - start_time) # Test batch insert batch_queries = [p['query'] for p in insert_patterns[:50]] start_time = time.time() self.db.execute_batch(batch_queries) batch_time = time.time() - start_time avg_individual_time = np.mean(individual_times) avg_batch_time = batch_time / len(batch_queries) return { 'individual_avg_latency': avg_individual_time, 'batch_avg_latency': avg_batch_time, 'improvement_factor': avg_individual_time / avg_batch_time if avg_batch_time > 0 else 1, 'operations_tested': len(batch_queries) } def _analyze_query_patterns(self, query_patterns: List[Dict]): """Analyze query patterns and suggest indexes""" suggested_indexes = [] # Simple pattern analysis for common cases for pattern in query_patterns: query = pattern['query'].upper() # Look for WHERE clauses if 'WHERE' in query and 'document_id' in query.lower(): suggested_indexes.append("CREATE INDEX idx_document_id ON documents(document_id)") if 'WHERE' in query and 'created_at' in query.lower(): suggested_indexes.append("CREATE INDEX idx_created_at ON documents(created_at)") if 'WHERE' in query and 'user_id' in query.lower(): suggested_indexes.append("CREATE INDEX idx_user_id ON documents(user_id)") # Remove duplicates return list(set(suggested_indexes)) def _optimize_query(self, query: str): """Apply basic query optimizations""" optimized = query # Add LIMIT if not present for large result sets if 'SELECT' in query.upper() and 'LIMIT' not in query.upper(): optimized += ' LIMIT 1000' # Other optimizations can be added here return optimized def _extract_query_template(self, query: str): """Extract query template by replacing literals with placeholders""" import re # Replace string literals template = re.sub(r"'[^']*'", '?', query) # Replace numeric literals template = re.sub(r'\b\d+\b', '?', template) return template # Example usage db_optimizer = DatabaseLatencyOptimizer(your_database_connection) # Generate sample query patterns query_patterns = [ {'query': "SELECT * FROM documents WHERE document_id = 'doc_123'"}, {'query': "SELECT content FROM documents WHERE user_id = 'user_456' ORDER BY created_at DESC"}, {'query': "INSERT INTO documents (id, content) VALUES ('new_doc', 'content')"}, {'query': "UPDATE documents SET content = 'updated' WHERE id = 'doc_789'"}, {'query': "SELECT COUNT(*) FROM documents WHERE created_at > '2024-01-01'"} ] * 20 # Repeat for testing db_optimization_results = db_optimizer.optimize_query_performance(query_patterns) for strategy, results in db_optimization_results.items(): if 'improvement_factor' in results: print(f"{strategy}: {results['improvement_factor']:.2f}x improvement")

Production Latency Monitoring

Real-Time Latency Tracking

class ProductionLatencyMonitor: def __init__(self, alert_thresholds: Dict[str, float]): self.alert_thresholds = alert_thresholds self.latency_buffer = {} self.monitoring_active = False def start_monitoring(self): """Start real-time latency monitoring""" self.monitoring_active = True # Start monitoring thread monitoring_thread = threading.Thread(target=self._monitoring_loop, daemon=True) monitoring_thread.start() def _monitoring_loop(self): """Main monitoring loop""" while self.monitoring_active: # Analyze recent latencies self._analyze_recent_latencies() # Check for alerts self._check_latency_alerts() time.sleep(10) # Check every 10 seconds def record_latency(self, operation: str, latency: float, metadata: Dict = None): """Record latency measurement""" timestamp = time.time() if operation not in self.latency_buffer: self.latency_buffer[operation] = [] self.latency_buffer[operation].append({ 'latency': latency, 'timestamp': timestamp, 'metadata': metadata or {} }) # Keep only recent measurements (last hour) cutoff_time = timestamp - 3600 self.latency_buffer[operation] = [ m for m in self.latency_buffer[operation] if m['timestamp'] > cutoff_time ] def _analyze_recent_latencies(self): """Analyze recent latency trends""" for operation, measurements in self.latency_buffer.items(): if len(measurements) < 10: # Need enough data continue recent_latencies = [m['latency'] for m in measurements[-100:]] # Calculate statistics current_p95 = np.percentile(recent_latencies, 95) current_mean = np.mean(recent_latencies) # Check for degradation if len(measurements) > 200: previous_latencies = [m['latency'] for m in measurements[-200:-100]] previous_p95 = np.percentile(previous_latencies, 95) # Alert if P95 increased significantly if current_p95 > previous_p95 * 1.5: # 50% increase self._send_alert(f"P95 latency degradation in {operation}: " f"{previous_p95*1000:.1f}ms -> {current_p95*1000:.1f}ms") def _check_latency_alerts(self): """Check latency against thresholds""" for operation, measurements in self.latency_buffer.items(): if not measurements: continue recent_latencies = [m['latency'] for m in measurements[-50:]] if operation in self.alert_thresholds: threshold = self.alert_thresholds[operation] p95_latency = np.percentile(recent_latencies, 95) if p95_latency > threshold: self._send_alert(f"High latency alert for {operation}: " f"P95 = {p95_latency*1000:.1f}ms (threshold: {threshold*1000:.1f}ms)") def _send_alert(self, message: str): """Send latency alert""" print(f"LATENCY ALERT: {message}") # Integrate with your alerting system here def get_latency_dashboard_data(self): """Get data for latency dashboard""" dashboard_data = {} for operation, measurements in self.latency_buffer.items(): if not measurements: continue latencies = [m['latency'] for m in measurements] dashboard_data[operation] = { 'count': len(latencies), 'mean': np.mean(latencies), 'p50': np.percentile(latencies, 50), 'p95': np.percentile(latencies, 95), 'p99': np.percentile(latencies, 99), 'max': np.max(latencies), 'recent_trend': self._calculate_trend(measurements) } return dashboard_data def _calculate_trend(self, measurements: List[Dict]): """Calculate latency trend (improving/degrading)""" if len(measurements) < 20: return 'insufficient_data' # Compare first half vs second half of recent measurements mid_point = len(measurements) // 2 first_half = [m['latency'] for m in measurements[:mid_point]] second_half = [m['latency'] for m in measurements[mid_point:]] first_avg = np.mean(first_half) second_avg = np.mean(second_half) if second_avg > first_avg * 1.1: # 10% worse return 'degrading' elif second_avg < first_avg * 0.9: # 10% better return 'improving' else: return 'stable' # Example usage latency_thresholds = { 'store_document': 0.1, # 100ms 'retrieve_similar': 0.2, # 200ms 'search_text': 0.15, # 150ms 'generate_embedding': 0.5 # 500ms } monitor = ProductionLatencyMonitor(latency_thresholds) monitor.start_monitoring() # In your application code, record latencies: # monitor.record_latency('store_document', 0.085) # monitor.record_latency('retrieve_similar', 0.156, {'query_length': 50}) # Get dashboard data dashboard_data = monitor.get_latency_dashboard_data() for operation, stats in dashboard_data.items(): print(f"{operation}: P95={stats['p95']*1000:.1f}ms, Trend={stats['recent_trend']}")

Best Practices

Latency Optimization Guidelines

  1. Measure First: Always establish latency baselines before optimization
  2. Target User-Facing Operations: Prioritize optimizations that directly impact user experience
  3. Consider P95/P99: Optimize for tail latencies, not just averages
  4. Monitor Continuously: Set up real-time latency monitoring in production
  5. Test Under Load: Measure latency under realistic concurrent load

Common Optimization Strategies

  1. Caching: Implement multi-level caching for frequently accessed data
  2. Connection Pooling: Reuse database connections to reduce overhead
  3. Async Operations: Use asynchronous processing for non-blocking operations
  4. Batch Operations: Group multiple operations to reduce round-trip latency
  5. Index Optimization: Create indexes based on actual query patterns
  6. CDN Usage: Use content delivery networks for geographically distributed users

Latency SLA Management

  1. Set Realistic Targets: Base SLAs on actual user requirements and system capabilities
  2. Monitor Compliance: Track SLA compliance and alert on violations
  3. Capacity Planning: Plan infrastructure capacity based on latency requirements
  4. Graceful Degradation: Implement fallback mechanisms for high-latency scenarios

This comprehensive latency optimization guide provides the tools and methodologies needed to build responsive, high-performance agent memory systems that deliver excellent user experiences.