Latency Optimization Benchmarks
Minimizing latency is critical for responsive agent memory systems. This guide provides comprehensive benchmarking methodologies for measuring and optimizing latency across all components of agent memory architectures, from storage access to embedding generation.
Overview
Agent memory latency directly impacts user experience and system responsiveness. High latency leads to slow agent responses, timeouts, and poor user satisfaction. Effective latency optimization requires systematic measurement and targeted optimization strategies.
Core Latency Metrics
End-to-End Latency Measurement
import time
import asyncio
import threading
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
from dataclasses import dataclass
from contextlib import contextmanager
@dataclass
class LatencyMeasurement:
operation: str
latency: float
timestamp: float
success: bool
metadata: Dict[str, Any]
class ComprehensiveLatencyBenchmark:
def __init__(self, memory_system):
self.memory_system = memory_system
self.measurements = []
self.percentiles = [50, 90, 95, 99, 99.9]
@contextmanager
def measure_latency(self, operation_name: str, **metadata):
"""Context manager for measuring operation latency"""
start_time = time.time()
success = True
error = None
try:
yield
except Exception as e:
success = False
error = str(e)
raise
finally:
end_time = time.time()
latency = end_time - start_time
measurement = LatencyMeasurement(
operation=operation_name,
latency=latency,
timestamp=start_time,
success=success,
metadata={**metadata, 'error': error if not success else None}
)
self.measurements.append(measurement)
def benchmark_memory_operations(self, num_iterations: int = 1000):
"""Comprehensive benchmark of all memory operations"""
operations = {
'store_document': self._benchmark_store_operations,
'retrieve_similar': self._benchmark_retrieval_operations,
'search_text': self._benchmark_search_operations,
'update_document': self._benchmark_update_operations,
'delete_document': self._benchmark_delete_operations,
'generate_embedding': self._benchmark_embedding_operations
}
results = {}
for operation_name, benchmark_func in operations.items():
print(f"Benchmarking {operation_name}...")
operation_results = benchmark_func(num_iterations)
results[operation_name] = operation_results
return results
def _benchmark_store_operations(self, num_iterations: int):
"""Benchmark document storage operations"""
results = []
for i in range(num_iterations):
document = {
'id': f'bench_doc_{i}',
'content': f'Benchmark document content {i}' * 10,
'timestamp': time.time()
}
with self.measure_latency('store_document', doc_id=document['id']):
self.memory_system.store_document(document)
# Extract measurements for this operation
store_measurements = [m for m in self.measurements if m.operation == 'store_document']
return self._analyze_measurements(store_measurements)
def _benchmark_retrieval_operations(self, num_iterations: int):
"""Benchmark similarity retrieval operations"""
# Pre-populate with documents if needed
self._ensure_documents_exist(num_iterations)
query_texts = [
f"Find documents about topic {i}" for i in range(num_iterations)
]
for i, query in enumerate(query_texts):
with self.measure_latency('retrieve_similar', query_length=len(query)):
results = self.memory_system.retrieve_similar(query, top_k=5)
retrieval_measurements = [m for m in self.measurements if m.operation == 'retrieve_similar']
return self._analyze_measurements(retrieval_measurements)
def _benchmark_search_operations(self, num_iterations: int):
"""Benchmark text search operations"""
search_queries = [
f"search query {i}" for i in range(num_iterations)
]
for i, query in enumerate(search_queries):
with self.measure_latency('search_text', query=query):
results = self.memory_system.search_text(query)
search_measurements = [m for m in self.measurements if m.operation == 'search_text']
return self._analyze_measurements(search_measurements)
def _benchmark_update_operations(self, num_iterations: int):
"""Benchmark document update operations"""
self._ensure_documents_exist(num_iterations)
for i in range(num_iterations):
doc_id = f'bench_doc_{i}'
updated_content = f'Updated content {i}' * 5
with self.measure_latency('update_document', doc_id=doc_id):
self.memory_system.update_document(doc_id, {'content': updated_content})
update_measurements = [m for m in self.measurements if m.operation == 'update_document']
return self._analyze_measurements(update_measurements)
def _benchmark_delete_operations(self, num_iterations: int):
"""Benchmark document deletion operations"""
# Create documents specifically for deletion
for i in range(num_iterations):
doc_id = f'delete_doc_{i}'
self.memory_system.store_document({
'id': doc_id,
'content': f'Document to delete {i}'
})
for i in range(num_iterations):
doc_id = f'delete_doc_{i}'
with self.measure_latency('delete_document', doc_id=doc_id):
self.memory_system.delete_document(doc_id)
delete_measurements = [m for m in self.measurements if m.operation == 'delete_document']
return self._analyze_measurements(delete_measurements)
def _benchmark_embedding_operations(self, num_iterations: int):
"""Benchmark embedding generation operations"""
texts = [
f'Text to embed number {i} with some meaningful content' for i in range(num_iterations)
]
for i, text in enumerate(texts):
with self.measure_latency('generate_embedding', text_length=len(text)):
embedding = self.memory_system.generate_embedding(text)
embedding_measurements = [m for m in self.measurements if m.operation == 'generate_embedding']
return self._analyze_measurements(embedding_measurements)
def _analyze_measurements(self, measurements: List[LatencyMeasurement]):
"""Analyze latency measurements and calculate statistics"""
if not measurements:
return {}
latencies = [m.latency for m in measurements if m.success]
success_rate = sum(1 for m in measurements if m.success) / len(measurements)
if not latencies:
return {'success_rate': success_rate, 'error': 'No successful operations'}
stats = {
'count': len(latencies),
'success_rate': success_rate,
'mean': np.mean(latencies),
'median': np.median(latencies),
'std': np.std(latencies),
'min': np.min(latencies),
'max': np.max(latencies)
}
# Add percentile statistics
for percentile in self.percentiles:
stats[f'p{percentile}'] = np.percentile(latencies, percentile)
# Add throughput calculation
if latencies:
stats['ops_per_second'] = 1.0 / np.mean(latencies)
return stats
def _ensure_documents_exist(self, count: int):
"""Ensure documents exist for benchmarking"""
try:
# Try to retrieve a document to check if they exist
self.memory_system.get_document('bench_doc_0')
except:
# Documents don't exist, create them
for i in range(count):
self.memory_system.store_document({
'id': f'bench_doc_{i}',
'content': f'Benchmark document {i}' * 10
})
def benchmark_concurrent_operations(self, concurrent_users: int = 10, operations_per_user: int = 100):
"""Benchmark system under concurrent load"""
operations = ['store', 'retrieve', 'search', 'update']
def worker(user_id: int):
"""Worker function for each concurrent user"""
user_measurements = []
for i in range(operations_per_user):
operation = operations[i % len(operations)]
if operation == 'store':
doc = {
'id': f'user_{user_id}_doc_{i}',
'content': f'Content from user {user_id}, iteration {i}'
}
with self.measure_latency('concurrent_store', user_id=user_id):
self.memory_system.store_document(doc)
elif operation == 'retrieve':
query = f'Query from user {user_id}, iteration {i}'
with self.measure_latency('concurrent_retrieve', user_id=user_id):
self.memory_system.retrieve_similar(query, top_k=5)
elif operation == 'search':
query = f'Search from user {user_id}, iteration {i}'
with self.measure_latency('concurrent_search', user_id=user_id):
self.memory_system.search_text(query)
elif operation == 'update':
doc_id = f'user_{user_id}_doc_{max(0, i-10)}' # Update recent docs
with self.measure_latency('concurrent_update', user_id=user_id):
try:
self.memory_system.update_document(doc_id, {
'content': f'Updated by user {user_id} at iteration {i}'
})
except:
pass # Document might not exist
# Run concurrent workers
with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(worker, user_id) for user_id in range(concurrent_users)]
for future in as_completed(futures):
future.result()
# Analyze concurrent operation results
concurrent_measurements = [m for m in self.measurements if 'concurrent_' in m.operation]
results_by_operation = {}
for operation in ['concurrent_store', 'concurrent_retrieve', 'concurrent_search', 'concurrent_update']:
op_measurements = [m for m in concurrent_measurements if m.operation == operation]
results_by_operation[operation] = self._analyze_measurements(op_measurements)
return results_by_operation
def generate_latency_report(self):
"""Generate comprehensive latency report"""
if not self.measurements:
return "No measurements available"
# Group measurements by operation
operations = {}
for measurement in self.measurements:
if measurement.operation not in operations:
operations[measurement.operation] = []
operations[measurement.operation].append(measurement)
report = ["# Latency Benchmark Report\n"]
for operation, measurements in operations.items():
stats = self._analyze_measurements(measurements)
if 'error' in stats:
report.append(f"## {operation}")
report.append(f"Error: {stats['error']}\n")
continue
report.append(f"## {operation}")
report.append(f"- **Count**: {stats['count']} operations")
report.append(f"- **Success Rate**: {stats['success_rate']:.1%}")
report.append(f"- **Mean Latency**: {stats['mean']*1000:.2f}ms")
report.append(f"- **Median Latency**: {stats['median']*1000:.2f}ms")
report.append(f"- **P95 Latency**: {stats['p95']*1000:.2f}ms")
report.append(f"- **P99 Latency**: {stats['p99']*1000:.2f}ms")
report.append(f"- **Max Latency**: {stats['max']*1000:.2f}ms")
report.append(f"- **Throughput**: {stats['ops_per_second']:.1f} ops/sec")
report.append("")
return "\n".join(report)
# Example usage
benchmark = ComprehensiveLatencyBenchmark(your_memory_system)
# Run comprehensive benchmarks
print("Running comprehensive latency benchmarks...")
results = benchmark.benchmark_memory_operations(num_iterations=500)
# Test concurrent performance
print("Testing concurrent performance...")
concurrent_results = benchmark.benchmark_concurrent_operations(
concurrent_users=20, operations_per_user=50
)
# Generate report
print(benchmark.generate_latency_report())
# Print summary
for operation, stats in results.items():
if 'error' not in stats:
print(f"{operation}: P95={stats['p95']*1000:.1f}ms, "
f"Throughput={stats['ops_per_second']:.1f} ops/sec")Component-Level Latency Profiling
class ComponentLatencyProfiler:
def __init__(self):
self.component_measurements = {}
def profile_embedding_pipeline(self, texts: List[str], embedding_models: List[str]):
"""Profile latency of different embedding models"""
results = {}
for model_name in embedding_models:
print(f"Profiling {model_name}...")
model_results = []
for text in texts:
# Measure tokenization
start_time = time.time()
tokens = self._tokenize(text, model_name)
tokenization_time = time.time() - start_time
# Measure embedding generation
start_time = time.time()
embedding = self._generate_embedding(tokens, model_name)
embedding_time = time.time() - start_time
# Measure normalization
start_time = time.time()
normalized = self._normalize_embedding(embedding)
normalization_time = time.time() - start_time
total_time = tokenization_time + embedding_time + normalization_time
model_results.append({
'text_length': len(text),
'token_count': len(tokens) if tokens else 0,
'tokenization_time': tokenization_time,
'embedding_time': embedding_time,
'normalization_time': normalization_time,
'total_time': total_time,
'tokens_per_second': len(tokens) / embedding_time if embedding_time > 0 else 0
})
results[model_name] = self._analyze_component_results(model_results)
return results
def profile_storage_operations(self, storage_backends: List[Dict]):
"""Profile latency across different storage backends"""
results = {}
test_documents = self._generate_test_documents(1000)
for backend_config in storage_backends:
backend_name = backend_config['name']
backend = backend_config['instance']
print(f"Profiling {backend_name}...")
# Profile write operations
write_times = []
for doc in test_documents[:100]: # Test with subset
start_time = time.time()
backend.store_document(doc)
write_times.append(time.time() - start_time)
# Profile read operations
read_times = []
for doc in test_documents[:100]:
start_time = time.time()
retrieved = backend.get_document(doc['id'])
read_times.append(time.time() - start_time)
# Profile search operations
search_times = []
search_queries = [f"query {i}" for i in range(50)]
for query in search_queries:
start_time = time.time()
results_list = backend.search(query, limit=10)
search_times.append(time.time() - start_time)
results[backend_name] = {
'write_latency': self._calculate_latency_stats(write_times),
'read_latency': self._calculate_latency_stats(read_times),
'search_latency': self._calculate_latency_stats(search_times)
}
return results
def profile_network_latency(self, endpoints: List[str]):
"""Profile network latency to various endpoints"""
results = {}
for endpoint in endpoints:
latencies = []
for _ in range(50): # 50 measurements per endpoint
start_time = time.time()
try:
response = self._make_request(endpoint)
latency = time.time() - start_time
latencies.append(latency)
except Exception as e:
print(f"Error measuring {endpoint}: {e}")
if latencies:
results[endpoint] = self._calculate_latency_stats(latencies)
return results
def profile_memory_allocation(self, operations: List[callable]):
"""Profile memory allocation latency for different operations"""
import tracemalloc
results = {}
for i, operation in enumerate(operations):
operation_name = f"operation_{i}"
# Measure memory allocation overhead
allocation_times = []
for _ in range(100):
tracemalloc.start()
start_time = time.time()
try:
result = operation()
allocation_time = time.time() - start_time
allocation_times.append(allocation_time)
except Exception as e:
print(f"Error in {operation_name}: {e}")
finally:
tracemalloc.stop()
if allocation_times:
results[operation_name] = self._calculate_latency_stats(allocation_times)
return results
def _tokenize(self, text: str, model_name: str):
"""Mock tokenization for different models"""
if model_name == 'bert':
return text.split()[:512] # BERT max length
elif model_name == 'gpt':
return text.split()[:2048] # GPT context length
else:
return text.split()
def _generate_embedding(self, tokens: List[str], model_name: str):
"""Mock embedding generation"""
# Simulate different model complexities
if model_name == 'small_model':
time.sleep(0.01) # 10ms
return np.random.rand(256)
elif model_name == 'large_model':
time.sleep(0.1) # 100ms
return np.random.rand(1024)
else:
time.sleep(0.05) # 50ms
return np.random.rand(512)
def _normalize_embedding(self, embedding: np.ndarray):
"""Normalize embedding vector"""
return embedding / np.linalg.norm(embedding)
def _generate_test_documents(self, count: int):
"""Generate test documents"""
return [
{
'id': f'doc_{i}',
'content': f'Test document content {i}' * (i % 10 + 1),
'metadata': {'index': i}
}
for i in range(count)
]
def _make_request(self, endpoint: str):
"""Make HTTP request to endpoint"""
import requests
try:
response = requests.get(endpoint, timeout=5)
return response
except requests.RequestException:
raise
def _calculate_latency_stats(self, latencies: List[float]):
"""Calculate latency statistics"""
if not latencies:
return {}
return {
'count': len(latencies),
'mean': np.mean(latencies),
'median': np.median(latencies),
'std': np.std(latencies),
'min': np.min(latencies),
'max': np.max(latencies),
'p95': np.percentile(latencies, 95),
'p99': np.percentile(latencies, 99)
}
def _analyze_component_results(self, results: List[Dict]):
"""Analyze component-specific results"""
if not results:
return {}
# Extract different timing components
tokenization_times = [r['tokenization_time'] for r in results]
embedding_times = [r['embedding_time'] for r in results]
normalization_times = [r['normalization_time'] for r in results]
total_times = [r['total_time'] for r in results]
return {
'tokenization': self._calculate_latency_stats(tokenization_times),
'embedding_generation': self._calculate_latency_stats(embedding_times),
'normalization': self._calculate_latency_stats(normalization_times),
'total': self._calculate_latency_stats(total_times),
'avg_tokens_per_second': np.mean([r['tokens_per_second'] for r in results])
}
# Example usage
profiler = ComponentLatencyProfiler()
# Profile embedding pipeline
test_texts = [
"Short text",
"Medium length text with some more content to test tokenization",
"Very long text " * 100 + " that will test the limits of the embedding model"
]
embedding_models = ['small_model', 'medium_model', 'large_model']
embedding_results = profiler.profile_embedding_pipeline(test_texts, embedding_models)
for model, stats in embedding_results.items():
print(f"{model}:")
print(f" Total P95: {stats['total']['p95']*1000:.1f}ms")
print(f" Embedding P95: {stats['embedding_generation']['p95']*1000:.1f}ms")Advanced Latency Optimization Techniques
Caching Strategy Optimization
class CacheLatencyOptimizer:
def __init__(self, cache_system):
self.cache = cache_system
self.hit_times = []
self.miss_times = []
def optimize_cache_configuration(self, workload_patterns: List[Dict]):
"""Optimize cache configuration based on workload patterns"""
configurations = [
{'ttl': 300, 'max_size': 1000, 'eviction': 'lru'},
{'ttl': 600, 'max_size': 2000, 'eviction': 'lru'},
{'ttl': 1200, 'max_size': 5000, 'eviction': 'lru'},
{'ttl': 300, 'max_size': 1000, 'eviction': 'lfu'},
{'ttl': 600, 'max_size': 2000, 'eviction': 'lfu'},
]
results = {}
for config in configurations:
config_name = f"ttl{config['ttl']}_size{config['max_size']}_{config['eviction']}"
print(f"Testing configuration: {config_name}")
# Apply configuration
self.cache.configure(**config)
self.cache.clear()
# Test with workload patterns
config_results = self._test_cache_with_workload(workload_patterns)
results[config_name] = config_results
return results
def _test_cache_with_workload(self, workload_patterns: List[Dict]):
"""Test cache performance with specific workload"""
hit_times = []
miss_times = []
hit_count = 0
miss_count = 0
for pattern in workload_patterns:
operation = pattern.get('operation', 'get')
key = pattern.get('key', 'default_key')
value = pattern.get('value', 'default_value')
if operation == 'get':
start_time = time.time()
result = self.cache.get(key)
latency = time.time() - start_time
if result is not None:
hit_times.append(latency)
hit_count += 1
else:
miss_times.append(latency)
miss_count += 1
elif operation == 'set':
start_time = time.time()
self.cache.set(key, value)
latency = time.time() - start_time
total_operations = hit_count + miss_count
hit_rate = hit_count / total_operations if total_operations > 0 else 0
return {
'hit_rate': hit_rate,
'avg_hit_latency': np.mean(hit_times) if hit_times else 0,
'avg_miss_latency': np.mean(miss_times) if miss_times else 0,
'p95_hit_latency': np.percentile(hit_times, 95) if hit_times else 0,
'p95_miss_latency': np.percentile(miss_times, 95) if miss_times else 0,
'total_operations': total_operations
}
def analyze_cache_effectiveness(self, access_patterns: List[str]):
"""Analyze cache effectiveness for different access patterns"""
cache_analysis = {}
# Test with different cache sizes
cache_sizes = [100, 500, 1000, 2000, 5000]
for size in cache_sizes:
self.cache.configure(max_size=size)
self.cache.clear()
hit_count = 0
total_accesses = 0
latencies = []
for key in access_patterns:
start_time = time.time()
result = self.cache.get(key)
latency = time.time() - start_time
latencies.append(latency)
if result is not None:
hit_count += 1
else:
# Simulate cache miss penalty
self.cache.set(key, f"value_for_{key}")
total_accesses += 1
hit_rate = hit_count / total_accesses
avg_latency = np.mean(latencies)
cache_analysis[size] = {
'hit_rate': hit_rate,
'avg_latency': avg_latency,
'effective_latency': avg_latency * hit_rate + (1 - hit_rate) * 0.1 # Assume 100ms miss penalty
}
return cache_analysis
# Example usage
cache_optimizer = CacheLatencyOptimizer(your_cache_system)
# Generate workload patterns
workload = []
for i in range(1000):
# Zipf distribution for realistic access patterns
key_id = int(np.random.zipf(1.2)) % 100
workload.append({
'operation': 'get',
'key': f'key_{key_id}',
'value': f'value_{key_id}'
})
# Add some set operations
for i in range(100):
workload.append({
'operation': 'set',
'key': f'key_{i}',
'value': f'updated_value_{i}'
})
cache_results = cache_optimizer.optimize_cache_configuration(workload)
# Find optimal configuration
best_config = min(cache_results.items(),
key=lambda x: x[1]['avg_hit_latency'] + x[1]['avg_miss_latency'])
print(f"Optimal cache configuration: {best_config[0]}")
print(f"Hit rate: {best_config[1]['hit_rate']:.2%}")
print(f"Avg hit latency: {best_config[1]['avg_hit_latency']*1000:.2f}ms")Database Query Optimization
class DatabaseLatencyOptimizer:
def __init__(self, database_connection):
self.db = database_connection
def optimize_query_performance(self, query_patterns: List[Dict]):
"""Optimize database queries for minimal latency"""
optimization_strategies = [
'add_indexes',
'query_rewriting',
'connection_pooling',
'prepared_statements',
'batch_operations'
]
results = {}
for strategy in optimization_strategies:
print(f"Testing {strategy}...")
strategy_results = self._test_optimization_strategy(strategy, query_patterns)
results[strategy] = strategy_results
return results
def _test_optimization_strategy(self, strategy: str, query_patterns: List[Dict]):
"""Test specific optimization strategy"""
if strategy == 'add_indexes':
return self._test_index_optimization(query_patterns)
elif strategy == 'query_rewriting':
return self._test_query_rewriting(query_patterns)
elif strategy == 'connection_pooling':
return self._test_connection_pooling(query_patterns)
elif strategy == 'prepared_statements':
return self._test_prepared_statements(query_patterns)
elif strategy == 'batch_operations':
return self._test_batch_operations(query_patterns)
def _test_index_optimization(self, query_patterns: List[Dict]):
"""Test impact of adding indexes"""
# Test queries without indexes
baseline_times = []
for pattern in query_patterns[:50]: # Test subset
query = pattern['query']
start_time = time.time()
self.db.execute(query)
baseline_times.append(time.time() - start_time)
baseline_avg = np.mean(baseline_times)
# Add suggested indexes
suggested_indexes = self._analyze_query_patterns(query_patterns)
for index_sql in suggested_indexes:
try:
self.db.execute(index_sql)
except:
pass # Index might already exist
# Test queries with indexes
optimized_times = []
for pattern in query_patterns[:50]:
query = pattern['query']
start_time = time.time()
self.db.execute(query)
optimized_times.append(time.time() - start_time)
optimized_avg = np.mean(optimized_times)
return {
'baseline_avg_latency': baseline_avg,
'optimized_avg_latency': optimized_avg,
'improvement_factor': baseline_avg / optimized_avg if optimized_avg > 0 else 1,
'indexes_added': len(suggested_indexes)
}
def _test_query_rewriting(self, query_patterns: List[Dict]):
"""Test query rewriting optimizations"""
optimization_results = []
for pattern in query_patterns[:20]: # Test subset
original_query = pattern['query']
# Time original query
start_time = time.time()
self.db.execute(original_query)
original_time = time.time() - start_time
# Generate optimized version
optimized_query = self._optimize_query(original_query)
# Time optimized query
start_time = time.time()
self.db.execute(optimized_query)
optimized_time = time.time() - start_time
optimization_results.append({
'original_time': original_time,
'optimized_time': optimized_time,
'improvement': original_time / optimized_time if optimized_time > 0 else 1
})
avg_improvement = np.mean([r['improvement'] for r in optimization_results])
return {
'avg_improvement_factor': avg_improvement,
'queries_optimized': len(optimization_results),
'avg_original_latency': np.mean([r['original_time'] for r in optimization_results]),
'avg_optimized_latency': np.mean([r['optimized_time'] for r in optimization_results])
}
def _test_connection_pooling(self, query_patterns: List[Dict]):
"""Test impact of connection pooling on latency"""
# Test without connection pooling (new connection per query)
single_connection_times = []
for pattern in query_patterns[:30]:
start_time = time.time()
# Simulate new connection overhead
time.sleep(0.01) # 10ms connection overhead
self.db.execute(pattern['query'])
single_connection_times.append(time.time() - start_time)
# Test with connection pooling (reuse connections)
pooled_connection_times = []
for pattern in query_patterns[:30]:
start_time = time.time()
self.db.execute(pattern['query'])
pooled_connection_times.append(time.time() - start_time)
return {
'single_connection_avg': np.mean(single_connection_times),
'pooled_connection_avg': np.mean(pooled_connection_times),
'improvement_factor': (
np.mean(single_connection_times) / np.mean(pooled_connection_times)
if np.mean(pooled_connection_times) > 0 else 1
)
}
def _test_prepared_statements(self, query_patterns: List[Dict]):
"""Test prepared statements vs regular queries"""
# Group patterns by query template
query_templates = {}
for pattern in query_patterns:
template = self._extract_query_template(pattern['query'])
if template not in query_templates:
query_templates[template] = []
query_templates[template].append(pattern)
prepared_statement_results = []
for template, patterns in query_templates.items():
if len(patterns) < 5: # Skip templates with few instances
continue
# Test regular queries
regular_times = []
for pattern in patterns[:10]:
start_time = time.time()
self.db.execute(pattern['query'])
regular_times.append(time.time() - start_time)
# Test prepared statements
prepared_times = []
prepared_query = self.db.prepare(template)
for pattern in patterns[:10]:
start_time = time.time()
prepared_query.execute(pattern.get('params', []))
prepared_times.append(time.time() - start_time)
prepared_statement_results.append({
'regular_avg': np.mean(regular_times),
'prepared_avg': np.mean(prepared_times),
'improvement': np.mean(regular_times) / np.mean(prepared_times) if np.mean(prepared_times) > 0 else 1
})
if prepared_statement_results:
return {
'avg_improvement_factor': np.mean([r['improvement'] for r in prepared_statement_results]),
'templates_tested': len(prepared_statement_results)
}
else:
return {'avg_improvement_factor': 1, 'templates_tested': 0}
def _test_batch_operations(self, query_patterns: List[Dict]):
"""Test batch operations vs individual operations"""
insert_patterns = [p for p in query_patterns if 'INSERT' in p['query'].upper()]
if len(insert_patterns) < 10:
return {'improvement_factor': 1, 'note': 'Insufficient INSERT patterns for testing'}
# Test individual inserts
individual_times = []
for pattern in insert_patterns[:50]:
start_time = time.time()
self.db.execute(pattern['query'])
individual_times.append(time.time() - start_time)
# Test batch insert
batch_queries = [p['query'] for p in insert_patterns[:50]]
start_time = time.time()
self.db.execute_batch(batch_queries)
batch_time = time.time() - start_time
avg_individual_time = np.mean(individual_times)
avg_batch_time = batch_time / len(batch_queries)
return {
'individual_avg_latency': avg_individual_time,
'batch_avg_latency': avg_batch_time,
'improvement_factor': avg_individual_time / avg_batch_time if avg_batch_time > 0 else 1,
'operations_tested': len(batch_queries)
}
def _analyze_query_patterns(self, query_patterns: List[Dict]):
"""Analyze query patterns and suggest indexes"""
suggested_indexes = []
# Simple pattern analysis for common cases
for pattern in query_patterns:
query = pattern['query'].upper()
# Look for WHERE clauses
if 'WHERE' in query and 'document_id' in query.lower():
suggested_indexes.append("CREATE INDEX idx_document_id ON documents(document_id)")
if 'WHERE' in query and 'created_at' in query.lower():
suggested_indexes.append("CREATE INDEX idx_created_at ON documents(created_at)")
if 'WHERE' in query and 'user_id' in query.lower():
suggested_indexes.append("CREATE INDEX idx_user_id ON documents(user_id)")
# Remove duplicates
return list(set(suggested_indexes))
def _optimize_query(self, query: str):
"""Apply basic query optimizations"""
optimized = query
# Add LIMIT if not present for large result sets
if 'SELECT' in query.upper() and 'LIMIT' not in query.upper():
optimized += ' LIMIT 1000'
# Other optimizations can be added here
return optimized
def _extract_query_template(self, query: str):
"""Extract query template by replacing literals with placeholders"""
import re
# Replace string literals
template = re.sub(r"'[^']*'", '?', query)
# Replace numeric literals
template = re.sub(r'\b\d+\b', '?', template)
return template
# Example usage
db_optimizer = DatabaseLatencyOptimizer(your_database_connection)
# Generate sample query patterns
query_patterns = [
{'query': "SELECT * FROM documents WHERE document_id = 'doc_123'"},
{'query': "SELECT content FROM documents WHERE user_id = 'user_456' ORDER BY created_at DESC"},
{'query': "INSERT INTO documents (id, content) VALUES ('new_doc', 'content')"},
{'query': "UPDATE documents SET content = 'updated' WHERE id = 'doc_789'"},
{'query': "SELECT COUNT(*) FROM documents WHERE created_at > '2024-01-01'"}
] * 20 # Repeat for testing
db_optimization_results = db_optimizer.optimize_query_performance(query_patterns)
for strategy, results in db_optimization_results.items():
if 'improvement_factor' in results:
print(f"{strategy}: {results['improvement_factor']:.2f}x improvement")Production Latency Monitoring
Real-Time Latency Tracking
class ProductionLatencyMonitor:
def __init__(self, alert_thresholds: Dict[str, float]):
self.alert_thresholds = alert_thresholds
self.latency_buffer = {}
self.monitoring_active = False
def start_monitoring(self):
"""Start real-time latency monitoring"""
self.monitoring_active = True
# Start monitoring thread
monitoring_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
monitoring_thread.start()
def _monitoring_loop(self):
"""Main monitoring loop"""
while self.monitoring_active:
# Analyze recent latencies
self._analyze_recent_latencies()
# Check for alerts
self._check_latency_alerts()
time.sleep(10) # Check every 10 seconds
def record_latency(self, operation: str, latency: float, metadata: Dict = None):
"""Record latency measurement"""
timestamp = time.time()
if operation not in self.latency_buffer:
self.latency_buffer[operation] = []
self.latency_buffer[operation].append({
'latency': latency,
'timestamp': timestamp,
'metadata': metadata or {}
})
# Keep only recent measurements (last hour)
cutoff_time = timestamp - 3600
self.latency_buffer[operation] = [
m for m in self.latency_buffer[operation]
if m['timestamp'] > cutoff_time
]
def _analyze_recent_latencies(self):
"""Analyze recent latency trends"""
for operation, measurements in self.latency_buffer.items():
if len(measurements) < 10: # Need enough data
continue
recent_latencies = [m['latency'] for m in measurements[-100:]]
# Calculate statistics
current_p95 = np.percentile(recent_latencies, 95)
current_mean = np.mean(recent_latencies)
# Check for degradation
if len(measurements) > 200:
previous_latencies = [m['latency'] for m in measurements[-200:-100]]
previous_p95 = np.percentile(previous_latencies, 95)
# Alert if P95 increased significantly
if current_p95 > previous_p95 * 1.5: # 50% increase
self._send_alert(f"P95 latency degradation in {operation}: "
f"{previous_p95*1000:.1f}ms -> {current_p95*1000:.1f}ms")
def _check_latency_alerts(self):
"""Check latency against thresholds"""
for operation, measurements in self.latency_buffer.items():
if not measurements:
continue
recent_latencies = [m['latency'] for m in measurements[-50:]]
if operation in self.alert_thresholds:
threshold = self.alert_thresholds[operation]
p95_latency = np.percentile(recent_latencies, 95)
if p95_latency > threshold:
self._send_alert(f"High latency alert for {operation}: "
f"P95 = {p95_latency*1000:.1f}ms (threshold: {threshold*1000:.1f}ms)")
def _send_alert(self, message: str):
"""Send latency alert"""
print(f"LATENCY ALERT: {message}")
# Integrate with your alerting system here
def get_latency_dashboard_data(self):
"""Get data for latency dashboard"""
dashboard_data = {}
for operation, measurements in self.latency_buffer.items():
if not measurements:
continue
latencies = [m['latency'] for m in measurements]
dashboard_data[operation] = {
'count': len(latencies),
'mean': np.mean(latencies),
'p50': np.percentile(latencies, 50),
'p95': np.percentile(latencies, 95),
'p99': np.percentile(latencies, 99),
'max': np.max(latencies),
'recent_trend': self._calculate_trend(measurements)
}
return dashboard_data
def _calculate_trend(self, measurements: List[Dict]):
"""Calculate latency trend (improving/degrading)"""
if len(measurements) < 20:
return 'insufficient_data'
# Compare first half vs second half of recent measurements
mid_point = len(measurements) // 2
first_half = [m['latency'] for m in measurements[:mid_point]]
second_half = [m['latency'] for m in measurements[mid_point:]]
first_avg = np.mean(first_half)
second_avg = np.mean(second_half)
if second_avg > first_avg * 1.1: # 10% worse
return 'degrading'
elif second_avg < first_avg * 0.9: # 10% better
return 'improving'
else:
return 'stable'
# Example usage
latency_thresholds = {
'store_document': 0.1, # 100ms
'retrieve_similar': 0.2, # 200ms
'search_text': 0.15, # 150ms
'generate_embedding': 0.5 # 500ms
}
monitor = ProductionLatencyMonitor(latency_thresholds)
monitor.start_monitoring()
# In your application code, record latencies:
# monitor.record_latency('store_document', 0.085)
# monitor.record_latency('retrieve_similar', 0.156, {'query_length': 50})
# Get dashboard data
dashboard_data = monitor.get_latency_dashboard_data()
for operation, stats in dashboard_data.items():
print(f"{operation}: P95={stats['p95']*1000:.1f}ms, Trend={stats['recent_trend']}")Best Practices
Latency Optimization Guidelines
- Measure First: Always establish latency baselines before optimization
- Target User-Facing Operations: Prioritize optimizations that directly impact user experience
- Consider P95/P99: Optimize for tail latencies, not just averages
- Monitor Continuously: Set up real-time latency monitoring in production
- Test Under Load: Measure latency under realistic concurrent load
Common Optimization Strategies
- Caching: Implement multi-level caching for frequently accessed data
- Connection Pooling: Reuse database connections to reduce overhead
- Async Operations: Use asynchronous processing for non-blocking operations
- Batch Operations: Group multiple operations to reduce round-trip latency
- Index Optimization: Create indexes based on actual query patterns
- CDN Usage: Use content delivery networks for geographically distributed users
Latency SLA Management
- Set Realistic Targets: Base SLAs on actual user requirements and system capabilities
- Monitor Compliance: Track SLA compliance and alert on violations
- Capacity Planning: Plan infrastructure capacity based on latency requirements
- Graceful Degradation: Implement fallback mechanisms for high-latency scenarios
This comprehensive latency optimization guide provides the tools and methodologies needed to build responsive, high-performance agent memory systems that deliver excellent user experiences.