Storage Performance Benchmarks
Optimizing storage performance is crucial for scalable agent memory systems. This guide provides comprehensive benchmarking methodologies for measuring and improving storage throughput, latency, and efficiency across different storage backends.
Overview
Storage performance directly impacts agent response times, system scalability, and operational costs. Poor storage performance leads to query timeouts, degraded user experience, and infrastructure scaling challenges.
Core Performance Metrics
Throughput Metrics
Write Throughput
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
class StorageThroughputBenchmark:
def __init__(self, storage_backend):
self.storage = storage_backend
self.results = []
def benchmark_write_throughput(self, documents, num_threads=10, duration_seconds=60):
"""
Benchmark write throughput with concurrent operations
Args:
documents: List of documents to write
num_threads: Number of concurrent write threads
duration_seconds: Duration to run the benchmark
"""
start_time = time.time()
write_count = 0
write_times = []
errors = 0
def write_worker(doc_batch):
nonlocal write_count, errors
for doc in doc_batch:
try:
write_start = time.time()
self.storage.store_document(doc)
write_end = time.time()
write_times.append(write_end - write_start)
write_count += 1
except Exception as e:
errors += 1
# Split documents into batches for threads
batch_size = len(documents) // num_threads
batches = [documents[i:i+batch_size] for i in range(0, len(documents), batch_size)]
# Run concurrent writes
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
while time.time() - start_time < duration_seconds:
for batch in batches:
if time.time() - start_time >= duration_seconds:
break
future = executor.submit(write_worker, batch)
futures.append(future)
# Wait for completion
for future in futures:
future.result()
total_time = time.time() - start_time
return {
'writes_per_second': write_count / total_time,
'total_writes': write_count,
'total_time': total_time,
'avg_write_latency': np.mean(write_times),
'p95_write_latency': np.percentile(write_times, 95),
'p99_write_latency': np.percentile(write_times, 99),
'error_rate': errors / (write_count + errors) if (write_count + errors) > 0 else 0,
'concurrent_threads': num_threads
}
# Example usage
benchmark = StorageThroughputBenchmark(your_storage_backend)
# Generate test documents
test_docs = [
{"id": f"doc_{i}", "content": f"Test document content {i}" * 100}
for i in range(10000)
]
results = benchmark.benchmark_write_throughput(test_docs, num_threads=20)
print(f"Write Throughput: {results['writes_per_second']:.1f} docs/sec")
print(f"P95 Latency: {results['p95_write_latency']*1000:.1f}ms")Read Throughput
def benchmark_read_throughput(storage_backend, document_ids, num_threads=10, duration_seconds=60):
"""Benchmark read throughput with concurrent operations"""
start_time = time.time()
read_count = 0
read_times = []
errors = 0
def read_worker():
nonlocal read_count, errors
while time.time() - start_time < duration_seconds:
try:
doc_id = np.random.choice(document_ids)
read_start = time.time()
document = storage_backend.get_document(doc_id)
read_end = time.time()
if document:
read_times.append(read_end - read_start)
read_count += 1
else:
errors += 1
except Exception as e:
errors += 1
# Run concurrent reads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(read_worker) for _ in range(num_threads)]
for future in futures:
future.result()
total_time = time.time() - start_time
return {
'reads_per_second': read_count / total_time,
'total_reads': read_count,
'avg_read_latency': np.mean(read_times) if read_times else 0,
'p95_read_latency': np.percentile(read_times, 95) if read_times else 0,
'p99_read_latency': np.percentile(read_times, 99) if read_times else 0,
'error_rate': errors / (read_count + errors) if (read_count + errors) > 0 else 0
}
# Example usage
doc_ids = [f"doc_{i}" for i in range(10000)]
read_results = benchmark_read_throughput(your_storage_backend, doc_ids)
print(f"Read Throughput: {read_results['reads_per_second']:.1f} docs/sec")Query Performance Benchmarks
Vector Search Performance
class VectorSearchBenchmark:
def __init__(self, vector_store):
self.vector_store = vector_store
def benchmark_vector_search(self, query_vectors, k_values=[1, 5, 10, 20, 50], num_threads=5):
"""
Benchmark vector search performance across different k values
Args:
query_vectors: List of query embedding vectors
k_values: List of k values to test
num_threads: Number of concurrent query threads
"""
results = {}
for k in k_values:
print(f"Benchmarking k={k}")
search_times = []
num_queries = 0
def search_worker(query_batch):
nonlocal num_queries
for query_vector in query_batch:
start_time = time.time()
results = self.vector_store.search(query_vector, k=k)
end_time = time.time()
search_times.append(end_time - start_time)
num_queries += 1
# Split queries into batches
batch_size = len(query_vectors) // num_threads
batches = [query_vectors[i:i+batch_size] for i in range(0, len(query_vectors), batch_size)]
# Execute concurrent searches
start_total = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(search_worker, batch) for batch in batches]
for future in futures:
future.result()
end_total = time.time()
results[k] = {
'avg_latency': np.mean(search_times),
'p50_latency': np.percentile(search_times, 50),
'p95_latency': np.percentile(search_times, 95),
'p99_latency': np.percentile(search_times, 99),
'queries_per_second': num_queries / (end_total - start_total),
'total_queries': num_queries
}
return results
# Example usage
import numpy as np
# Generate random query vectors (replace with actual embeddings)
query_vectors = [np.random.rand(384) for _ in range(1000)]
vector_benchmark = VectorSearchBenchmark(your_vector_store)
search_results = vector_benchmark.benchmark_vector_search(query_vectors)
for k, metrics in search_results.items():
print(f"k={k}: {metrics['queries_per_second']:.1f} QPS, "
f"P95: {metrics['p95_latency']*1000:.1f}ms")Full-Text Search Performance
def benchmark_text_search(text_search_engine, queries, concurrent_users=10):
"""Benchmark full-text search performance"""
search_times = []
total_results = 0
errors = 0
def search_worker():
nonlocal total_results, errors
for query in queries:
try:
start_time = time.time()
results = text_search_engine.search(query)
end_time = time.time()
search_times.append(end_time - start_time)
total_results += len(results)
except Exception as e:
errors += 1
start_total = time.time()
with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(search_worker) for _ in range(concurrent_users)]
for future in futures:
future.result()
end_total = time.time()
total_time = end_total - start_total
total_queries = len(queries) * concurrent_users
return {
'queries_per_second': total_queries / total_time,
'avg_latency': np.mean(search_times),
'p95_latency': np.percentile(search_times, 95),
'p99_latency': np.percentile(search_times, 99),
'avg_results_per_query': total_results / total_queries,
'error_rate': errors / total_queries,
'total_queries': total_queries
}
# Example queries
test_queries = [
"machine learning algorithms",
"deep neural networks",
"natural language processing",
"computer vision techniques",
"reinforcement learning"
] * 200 # Repeat for more comprehensive testing
text_search_results = benchmark_text_search(your_text_search, test_queries)
print(f"Text Search QPS: {text_search_results['queries_per_second']:.1f}")Storage Backend Comparisons
Database Performance Comparison
class StorageBackendComparison:
def __init__(self):
self.backends = {}
def add_backend(self, name, backend):
self.backends[name] = backend
def run_comprehensive_benchmark(self, test_documents, test_queries):
"""Run comprehensive benchmark across all storage backends"""
results = {}
for backend_name, backend in self.backends.items():
print(f"Benchmarking {backend_name}...")
# Write performance
write_benchmark = StorageThroughputBenchmark(backend)
write_results = write_benchmark.benchmark_write_throughput(
test_documents, num_threads=10, duration_seconds=30
)
# Read performance
doc_ids = [doc['id'] for doc in test_documents]
read_results = benchmark_read_throughput(backend, doc_ids, duration_seconds=30)
# Query performance (if backend supports search)
query_results = None
if hasattr(backend, 'search'):
try:
query_results = self._benchmark_query_performance(backend, test_queries)
except Exception as e:
print(f"Query benchmark failed for {backend_name}: {e}")
results[backend_name] = {
'write_performance': write_results,
'read_performance': read_results,
'query_performance': query_results
}
return results
def _benchmark_query_performance(self, backend, queries):
"""Benchmark query performance for backends that support search"""
search_times = []
for query in queries:
start_time = time.time()
results = backend.search(query, limit=10)
end_time = time.time()
search_times.append(end_time - start_time)
return {
'avg_query_latency': np.mean(search_times),
'p95_query_latency': np.percentile(search_times, 95),
'queries_per_second': len(queries) / sum(search_times)
}
def generate_comparison_report(self, results):
"""Generate detailed comparison report"""
report = []
report.append("# Storage Backend Performance Comparison\n")
# Write performance comparison
report.append("## Write Performance")
report.append("| Backend | Writes/sec | Avg Latency | P95 Latency | Error Rate |")
report.append("|---------|------------|-------------|-------------|------------|")
for backend_name, data in results.items():
write_perf = data['write_performance']
report.append(
f"| {backend_name} | {write_perf['writes_per_second']:.1f} | "
f"{write_perf['avg_write_latency']*1000:.1f}ms | "
f"{write_perf['p95_write_latency']*1000:.1f}ms | "
f"{write_perf['error_rate']:.3f} |"
)
report.append("\n## Read Performance")
report.append("| Backend | Reads/sec | Avg Latency | P95 Latency | Error Rate |")
report.append("|---------|-----------|-------------|-------------|------------|")
for backend_name, data in results.items():
read_perf = data['read_performance']
report.append(
f"| {backend_name} | {read_perf['reads_per_second']:.1f} | "
f"{read_perf['avg_read_latency']*1000:.1f}ms | "
f"{read_perf['p95_read_latency']*1000:.1f}ms | "
f"{read_perf['error_rate']:.3f} |"
)
return "\n".join(report)
# Example usage
comparison = StorageBackendComparison()
comparison.add_backend("PostgreSQL", postgresql_backend)
comparison.add_backend("Elasticsearch", elasticsearch_backend)
comparison.add_backend("Pinecone", pinecone_backend)
comparison.add_backend("Chroma", chroma_backend)
# Generate test data
test_docs = [
{"id": f"doc_{i}", "content": f"Sample document content {i}" * 50}
for i in range(5000)
]
test_queries = ["search query"] * 100
# Run benchmarks
results = comparison.run_comprehensive_benchmark(test_docs, test_queries)
# Generate report
report = comparison.generate_comparison_report(results)
print(report)Scalability Testing
Load Testing Framework
class ScalabilityTest:
def __init__(self, storage_backend):
self.storage = storage_backend
def test_concurrent_load(self, max_concurrent_users=100, ramp_up_duration=300):
"""
Test system performance under increasing concurrent load
Args:
max_concurrent_users: Maximum number of concurrent operations
ramp_up_duration: Time in seconds to ramp up to max load
"""
results = []
error_counts = []
user_increments = [10, 20, 50, 100, 200, 500, 1000]
if max_concurrent_users not in user_increments:
user_increments.append(max_concurrent_users)
user_increments = [u for u in user_increments if u <= max_concurrent_users]
user_increments.sort()
for concurrent_users in user_increments:
print(f"Testing with {concurrent_users} concurrent users...")
# Run load test for this concurrency level
metrics = self._run_load_test_iteration(concurrent_users, duration=60)
metrics['concurrent_users'] = concurrent_users
results.append(metrics)
# Stop if error rate gets too high
if metrics['error_rate'] > 0.1: # 10% error rate
print(f"Stopping at {concurrent_users} users due to high error rate")
break
return results
def _run_load_test_iteration(self, concurrent_users, duration):
"""Run single load test iteration"""
operations = ['read', 'write', 'search'] # Mix of operations
operation_times = {op: [] for op in operations}
error_counts = {op: 0 for op in operations}
total_operations = 0
def worker():
nonlocal total_operations
end_time = time.time() + duration
while time.time() < end_time:
# Random operation selection
operation = np.random.choice(operations)
total_operations += 1
try:
start_time = time.time()
if operation == 'read':
self.storage.get_document(f"doc_{np.random.randint(1000)}")
elif operation == 'write':
self.storage.store_document({
"id": f"temp_{total_operations}",
"content": "Temporary test document"
})
elif operation == 'search':
self.storage.search("test query", limit=10)
end_time_op = time.time()
operation_times[operation].append(end_time_op - start_time)
except Exception as e:
error_counts[operation] += 1
# Run concurrent workers
start_total = time.time()
with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(worker) for _ in range(concurrent_users)]
for future in futures:
future.result()
end_total = time.time()
# Calculate aggregated metrics
all_times = []
total_errors = 0
for op in operations:
all_times.extend(operation_times[op])
total_errors += error_counts[op]
return {
'throughput': total_operations / (end_total - start_total),
'avg_latency': np.mean(all_times) if all_times else 0,
'p95_latency': np.percentile(all_times, 95) if all_times else 0,
'p99_latency': np.percentile(all_times, 99) if all_times else 0,
'error_rate': total_errors / total_operations if total_operations > 0 else 0,
'total_operations': total_operations,
'operation_breakdown': {
op: {
'count': len(times),
'avg_latency': np.mean(times) if times else 0,
'error_count': error_counts[op]
}
for op, times in operation_times.items()
}
}
def find_performance_breaking_point(self, results):
"""Analyze results to find performance breaking point"""
breaking_point = None
for i, result in enumerate(results):
# Define breaking point criteria
latency_degraded = result['p95_latency'] > 1.0 # > 1 second P95
high_error_rate = result['error_rate'] > 0.05 # > 5% errors
throughput_plateau = False
if i > 0:
prev_throughput = results[i-1]['throughput']
current_throughput = result['throughput']
throughput_plateau = current_throughput < prev_throughput * 0.9 # 10% drop
if latency_degraded or high_error_rate or throughput_plateau:
breaking_point = result['concurrent_users']
break
return breaking_point
# Example usage
scalability_test = ScalabilityTest(your_storage_backend)
load_results = scalability_test.test_concurrent_load(max_concurrent_users=1000)
breaking_point = scalability_test.find_performance_breaking_point(load_results)
print(f"Performance breaking point: {breaking_point} concurrent users")
# Plot results
import matplotlib.pyplot as plt
users = [r['concurrent_users'] for r in load_results]
throughput = [r['throughput'] for r in load_results]
p95_latency = [r['p95_latency'] * 1000 for r in load_results] # Convert to ms
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
ax1.plot(users, throughput, 'b-o')
ax1.set_ylabel('Operations/sec')
ax1.set_title('Throughput vs Concurrent Users')
ax1.grid(True)
ax2.plot(users, p95_latency, 'r-o')
ax2.set_ylabel('P95 Latency (ms)')
ax2.set_xlabel('Concurrent Users')
ax2.set_title('P95 Latency vs Concurrent Users')
ax2.grid(True)
plt.tight_layout()
plt.savefig('scalability_results.png')Storage Optimization Strategies
Index Performance Optimization
class IndexPerformanceOptimizer:
def __init__(self, database):
self.db = database
def benchmark_index_configurations(self, table_name, query_patterns):
"""Test different index configurations for optimal performance"""
index_configs = [
{'type': 'btree', 'columns': ['created_at']},
{'type': 'btree', 'columns': ['document_id']},
{'type': 'gin', 'columns': ['content_vector']}, # For JSONB/arrays
{'type': 'gist', 'columns': ['embedding']}, # For vector similarity
{'type': 'composite', 'columns': ['user_id', 'created_at']}
]
results = {}
for config in index_configs:
index_name = f"test_idx_{config['type']}_{'_'.join(config['columns'])}"
try:
# Create index
self._create_index(index_name, table_name, config)
# Benchmark queries with this index
perf_metrics = self._benchmark_queries_with_index(query_patterns, index_name)
results[index_name] = perf_metrics
# Clean up
self._drop_index(index_name)
except Exception as e:
print(f"Failed to test index {index_name}: {e}")
return results
def _create_index(self, index_name, table_name, config):
"""Create index based on configuration"""
columns_str = ', '.join(config['columns'])
if config['type'] == 'btree':
sql = f"CREATE INDEX {index_name} ON {table_name} ({columns_str})"
elif config['type'] == 'gin':
sql = f"CREATE INDEX {index_name} ON {table_name} USING GIN ({columns_str})"
elif config['type'] == 'gist':
sql = f"CREATE INDEX {index_name} ON {table_name} USING GIST ({columns_str})"
elif config['type'] == 'composite':
sql = f"CREATE INDEX {index_name} ON {table_name} ({columns_str})"
self.db.execute(sql)
def _benchmark_queries_with_index(self, query_patterns, index_name):
"""Benchmark query performance with given index"""
query_times = []
for query in query_patterns:
# Warm up
for _ in range(3):
self.db.execute(query)
# Measure
start_time = time.time()
for _ in range(10): # Multiple iterations for stable measurement
self.db.execute(query)
end_time = time.time()
avg_time = (end_time - start_time) / 10
query_times.append(avg_time)
return {
'avg_query_time': np.mean(query_times),
'p95_query_time': np.percentile(query_times, 95),
'index_name': index_name
}
def _drop_index(self, index_name):
"""Drop test index"""
try:
self.db.execute(f"DROP INDEX {index_name}")
except Exception:
pass # Index might not exist
# Example usage
optimizer = IndexPerformanceOptimizer(your_database)
test_queries = [
"SELECT * FROM documents WHERE created_at > '2024-01-01'",
"SELECT * FROM documents WHERE document_id = 'doc_123'",
"SELECT * FROM documents WHERE user_id = 'user_456' ORDER BY created_at DESC LIMIT 10"
]
index_results = optimizer.benchmark_index_configurations('documents', test_queries)
for index_name, metrics in index_results.items():
print(f"{index_name}: Avg Query Time: {metrics['avg_query_time']*1000:.2f}ms")Caching Strategy Evaluation
class CachePerformanceBenchmark:
def __init__(self, cache_systems):
self.cache_systems = cache_systems # Dict of cache name -> cache instance
def benchmark_cache_strategies(self, workload_patterns):
"""
Benchmark different caching strategies
workload_patterns: Dict with patterns like:
{
'read_heavy': {'reads': 0.8, 'writes': 0.2},
'write_heavy': {'reads': 0.3, 'writes': 0.7},
'mixed': {'reads': 0.5, 'writes': 0.5}
}
"""
results = {}
for pattern_name, pattern in workload_patterns.items():
print(f"Testing workload pattern: {pattern_name}")
pattern_results = {}
for cache_name, cache in self.cache_systems.items():
print(f" Testing cache: {cache_name}")
# Reset cache state
cache.clear()
# Generate workload
operations = self._generate_workload(pattern, num_operations=10000)
# Benchmark
metrics = self._run_cache_benchmark(cache, operations)
pattern_results[cache_name] = metrics
results[pattern_name] = pattern_results
return results
def _generate_workload(self, pattern, num_operations):
"""Generate realistic workload operations"""
operations = []
# Create pool of keys with Zipf distribution (realistic access pattern)
num_unique_keys = 1000
zipf_param = 1.2 # Higher values = more skewed towards popular keys
key_weights = [1.0 / (i ** zipf_param) for i in range(1, num_unique_keys + 1)]
key_weights = np.array(key_weights) / sum(key_weights)
for _ in range(num_operations):
# Choose operation type based on pattern
if np.random.random() < pattern['reads']:
operation_type = 'read'
else:
operation_type = 'write'
# Choose key with Zipf distribution
key_idx = np.random.choice(num_unique_keys, p=key_weights)
key = f"key_{key_idx}"
operations.append({
'type': operation_type,
'key': key,
'value': f"value_for_{key}" if operation_type == 'write' else None
})
return operations
def _run_cache_benchmark(self, cache, operations):
"""Run benchmark operations on cache system"""
start_time = time.time()
hit_count = 0
miss_count = 0
write_count = 0
operation_times = []
for op in operations:
op_start = time.time()
if op['type'] == 'read':
result = cache.get(op['key'])
if result is not None:
hit_count += 1
else:
miss_count += 1
else: # write
cache.set(op['key'], op['value'])
write_count += 1
op_end = time.time()
operation_times.append(op_end - op_start)
end_time = time.time()
total_reads = hit_count + miss_count
hit_rate = hit_count / total_reads if total_reads > 0 else 0
return {
'total_time': end_time - start_time,
'operations_per_second': len(operations) / (end_time - start_time),
'hit_rate': hit_rate,
'miss_rate': 1 - hit_rate,
'avg_operation_latency': np.mean(operation_times),
'p95_operation_latency': np.percentile(operation_times, 95),
'total_operations': len(operations),
'reads': total_reads,
'writes': write_count
}
# Example usage with different cache systems
cache_systems = {
'redis': redis_cache_instance,
'memcached': memcached_cache_instance,
'in_memory': in_memory_cache_instance
}
workload_patterns = {
'read_heavy': {'reads': 0.8, 'writes': 0.2},
'write_heavy': {'reads': 0.3, 'writes': 0.7},
'balanced': {'reads': 0.5, 'writes': 0.5}
}
cache_benchmark = CachePerformanceBenchmark(cache_systems)
cache_results = cache_benchmark.benchmark_cache_strategies(workload_patterns)
# Analyze results
for pattern, systems in cache_results.items():
print(f"\n{pattern.upper()} WORKLOAD:")
for system, metrics in systems.items():
print(f" {system}: Hit Rate: {metrics['hit_rate']:.3f}, "
f"OPS: {metrics['operations_per_second']:.0f}, "
f"P95 Latency: {metrics['p95_operation_latency']*1000:.2f}ms")Production Monitoring
Real-Time Performance Metrics
class ProductionPerformanceMonitor:
def __init__(self, storage_backend):
self.storage = storage_backend
self.metrics_buffer = []
self.alert_thresholds = {
'write_latency_p95': 1.0, # seconds
'read_latency_p95': 0.5, # seconds
'error_rate': 0.05, # 5%
'throughput_drop': 0.3 # 30% drop from baseline
}
def start_monitoring(self, collection_interval=60):
"""Start continuous performance monitoring"""
def collect_metrics():
while True:
metrics = self._collect_current_metrics()
self.metrics_buffer.append(metrics)
# Keep only recent metrics (last 24 hours)
if len(self.metrics_buffer) > 1440: # 24 * 60 minutes
self.metrics_buffer.pop(0)
# Check for alerts
self._check_alerts(metrics)
time.sleep(collection_interval)
monitoring_thread = threading.Thread(target=collect_metrics, daemon=True)
monitoring_thread.start()
def _collect_current_metrics(self):
"""Collect current performance metrics"""
# This would integrate with your storage system's metrics
# Here's a simplified example
timestamp = time.time()
# Sample metrics collection (adapt to your storage system)
metrics = {
'timestamp': timestamp,
'write_ops_per_second': self._get_write_ops_rate(),
'read_ops_per_second': self._get_read_ops_rate(),
'write_latency_avg': self._get_avg_write_latency(),
'write_latency_p95': self._get_p95_write_latency(),
'read_latency_avg': self._get_avg_read_latency(),
'read_latency_p95': self._get_p95_read_latency(),
'error_rate': self._get_current_error_rate(),
'active_connections': self._get_active_connections(),
'memory_usage_percent': self._get_memory_usage(),
'disk_usage_percent': self._get_disk_usage()
}
return metrics
def _check_alerts(self, current_metrics):
"""Check metrics against alert thresholds"""
alerts = []
if current_metrics['write_latency_p95'] > self.alert_thresholds['write_latency_p95']:
alerts.append(f"High write latency: {current_metrics['write_latency_p95']:.3f}s")
if current_metrics['read_latency_p95'] > self.alert_thresholds['read_latency_p95']:
alerts.append(f"High read latency: {current_metrics['read_latency_p95']:.3f}s")
if current_metrics['error_rate'] > self.alert_thresholds['error_rate']:
alerts.append(f"High error rate: {current_metrics['error_rate']:.3%}")
# Check for throughput drops
if len(self.metrics_buffer) > 10:
recent_throughput = current_metrics['read_ops_per_second'] + current_metrics['write_ops_per_second']
baseline_metrics = self.metrics_buffer[-11:-1] # Previous 10 minutes
baseline_throughput = np.mean([
m['read_ops_per_second'] + m['write_ops_per_second']
for m in baseline_metrics
])
if baseline_throughput > 0:
throughput_ratio = recent_throughput / baseline_throughput
if throughput_ratio < (1 - self.alert_thresholds['throughput_drop']):
alerts.append(f"Throughput drop: {throughput_ratio:.2%} of baseline")
if alerts:
self._send_alerts(alerts)
def _send_alerts(self, alerts):
"""Send performance alerts"""
for alert in alerts:
print(f"ALERT: {alert}")
# Integrate with your alerting system (PagerDuty, Slack, etc.)
def get_performance_report(self, hours=24):
"""Generate performance report for specified time period"""
if not self.metrics_buffer:
return "No metrics available"
cutoff_time = time.time() - (hours * 3600)
recent_metrics = [m for m in self.metrics_buffer if m['timestamp'] > cutoff_time]
if not recent_metrics:
return f"No metrics available for the last {hours} hours"
# Calculate aggregated statistics
write_latencies = [m['write_latency_p95'] for m in recent_metrics]
read_latencies = [m['read_latency_p95'] for m in recent_metrics]
error_rates = [m['error_rate'] for m in recent_metrics]
throughputs = [m['read_ops_per_second'] + m['write_ops_per_second'] for m in recent_metrics]
report = f"""
Performance Report - Last {hours} Hours
=====================================
Latency Metrics:
- Write P95 Latency: Avg {np.mean(write_latencies)*1000:.1f}ms, Max {np.max(write_latencies)*1000:.1f}ms
- Read P95 Latency: Avg {np.mean(read_latencies)*1000:.1f}ms, Max {np.max(read_latencies)*1000:.1f}ms
Throughput Metrics:
- Average Throughput: {np.mean(throughputs):.1f} ops/sec
- Peak Throughput: {np.max(throughputs):.1f} ops/sec
- Minimum Throughput: {np.min(throughputs):.1f} ops/sec
Reliability Metrics:
- Average Error Rate: {np.mean(error_rates):.3%}
- Peak Error Rate: {np.max(error_rates):.3%}
Total Operations: {len(recent_metrics)} metric collections
"""
return report.strip()
# Usage
monitor = ProductionPerformanceMonitor(your_storage_backend)
monitor.start_monitoring(collection_interval=60) # Collect metrics every minute
# Later, generate a report
print(monitor.get_performance_report(hours=6))Best Practices
Performance Testing Guidelines
- Baseline Establishment: Always establish performance baselines before optimizations
- Realistic Data: Use production-like data volumes and distributions
- Environment Consistency: Test in environments similar to production
- Warmup Periods: Allow for JIT compilation and cache warming
- Statistical Significance: Run multiple iterations and calculate confidence intervals
Optimization Strategies
- Index Strategy: Create indexes based on actual query patterns
- Batch Operations: Batch small operations to reduce overhead
- Connection Pooling: Optimize connection pool sizes for your workload
- Async Operations: Use asynchronous operations where possible
- Caching Layers: Implement multi-level caching strategies
Monitoring and Alerting
- Key Metrics: Monitor latency percentiles, throughput, and error rates
- Trending: Track performance trends over time to identify degradation
- Capacity Planning: Monitor resource utilization for capacity planning
- Alert Fatigue: Set appropriate alert thresholds to avoid noise
This comprehensive storage performance benchmarking guide provides the tools and methodologies needed to measure, optimize, and maintain high-performance agent memory systems at scale.