Production Patterns for Agent Memory
Building production-ready agent memory systems requires careful attention to scalability, reliability, monitoring, and operational excellence. This guide provides battle-tested patterns and implementations for enterprise deployments.
Scalability Patterns
Horizontal Scaling Architecture
import asyncio
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import hashlib
import redis
import json
import time
from datetime import datetime, timedelta
class ShardingStrategy(Enum):
USER_BASED = "user_based"
CONTENT_HASH = "content_hash"
TEMPORAL = "temporal"
MEMORY_TYPE = "memory_type"
@dataclass
class ShardConfig:
shard_id: str
database_config: Dict[str, Any]
weight: int = 1
status: str = "active" # active, readonly, maintenance
class MemoryShardManager:
"""Manages memory distribution across multiple database shards"""
def __init__(self, shard_configs: List[ShardConfig],
sharding_strategy: ShardingStrategy = ShardingStrategy.USER_BASED,
redis_client: Optional[redis.Redis] = None):
self.shard_configs = {config.shard_id: config for config in shard_configs}
self.sharding_strategy = sharding_strategy
self.redis_client = redis_client or redis.Redis()
self.shard_stores: Dict[str, Any] = {}
self._initialize_shards()
def _initialize_shards(self):
"""Initialize connections to all shards"""
for shard_id, config in self.shard_configs.items():
# Initialize database connection based on config
# This would use your chosen database implementation
self.shard_stores[shard_id] = self._create_store_instance(config)
def _create_store_instance(self, config: ShardConfig):
"""Create database store instance from config"""
# Factory method to create appropriate store type
db_type = config.database_config.get('type', 'postgresql')
if db_type == 'postgresql':
from .database_implementations import PostgreSQLMemoryStore
return PostgreSQLMemoryStore(config.database_config['connection_string'])
elif db_type == 'mongodb':
from .database_implementations import MongoMemoryStore
return MongoMemoryStore(config.database_config['connection_string'])
# Add other database types as needed
def get_shard_for_memory(self, memory_data: Dict[str, Any]) -> str:
"""Determine which shard should store this memory"""
if self.sharding_strategy == ShardingStrategy.USER_BASED:
user_id = memory_data.get('metadata', {}).get('user_id', 'unknown')
return self._hash_to_shard(user_id)
elif self.sharding_strategy == ShardingStrategy.CONTENT_HASH:
content = memory_data.get('content', '')
return self._hash_to_shard(content)
elif self.sharding_strategy == ShardingStrategy.TEMPORAL:
timestamp = memory_data.get('timestamp', datetime.now())
# Shard by month/year
period_key = timestamp.strftime('%Y-%m')
return self._hash_to_shard(period_key)
elif self.sharding_strategy == ShardingStrategy.MEMORY_TYPE:
memory_type = memory_data.get('memory_type', 'episodic')
return self._hash_to_shard(memory_type)
else:
# Default to first active shard
return self._get_active_shards()[0]
def _hash_to_shard(self, key: str) -> str:
"""Hash key to determine shard"""
active_shards = self._get_active_shards()
if not active_shards:
raise Exception("No active shards available")
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
shard_index = hash_value % len(active_shards)
return active_shards[shard_index]
def _get_active_shards(self) -> List[str]:
"""Get list of active shard IDs"""
return [
shard_id for shard_id, config in self.shard_configs.items()
if config.status == "active"
]
async def store_memory(self, memory_data: Dict[str, Any]) -> Tuple[str, str]:
"""Store memory in appropriate shard"""
shard_id = self.get_shard_for_memory(memory_data)
store = self.shard_stores[shard_id]
memory_id = await store.store_memory(**memory_data)
# Cache shard location for quick retrieval
await self._cache_memory_location(memory_id, shard_id)
return memory_id, shard_id
async def retrieve_memory(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve memory from appropriate shard"""
shard_id = await self._get_memory_location(memory_id)
if not shard_id:
# Fallback: search all shards
return await self._search_all_shards_for_memory(memory_id)
store = self.shard_stores.get(shard_id)
if not store:
return None
return await store.retrieve_memory(memory_id)
async def search_memories(self, query: str, user_id: Optional[str] = None,
**kwargs) -> List[Dict[str, Any]]:
"""Search memories across relevant shards"""
if user_id and self.sharding_strategy == ShardingStrategy.USER_BASED:
# Search only user's shard
shard_id = self._hash_to_shard(user_id)
store = self.shard_stores[shard_id]
return await store.search_memories(query, **kwargs)
else:
# Search across all active shards and merge results
return await self._search_across_shards(query, **kwargs)
async def _cache_memory_location(self, memory_id: str, shard_id: str):
"""Cache memory location in Redis for fast retrieval"""
try:
await self.redis_client.setex(
f"memory_location:{memory_id}",
timedelta(hours=24).total_seconds(),
shard_id
)
except Exception as e:
logging.warning(f"Failed to cache memory location: {e}")
async def _get_memory_location(self, memory_id: str) -> Optional[str]:
"""Get cached memory location"""
try:
shard_id = await self.redis_client.get(f"memory_location:{memory_id}")
return shard_id.decode() if shard_id else None
except Exception:
return None
async def _search_all_shards_for_memory(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Search all shards for a memory (expensive fallback)"""
tasks = []
for shard_id in self._get_active_shards():
store = self.shard_stores[shard_id]
task = asyncio.create_task(store.retrieve_memory(memory_id))
tasks.append((shard_id, task))
results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
for (shard_id, _), result in zip(tasks, results):
if isinstance(result, Exception):
continue
if result is not None:
# Cache the found location
await self._cache_memory_location(memory_id, shard_id)
return result
return None
async def _search_across_shards(self, query: str, **kwargs) -> List[Dict[str, Any]]:
"""Search across all shards and merge results"""
limit = kwargs.pop('limit', 10)
shard_limit = max(limit // len(self._get_active_shards()) + 1, 5)
tasks = []
for shard_id in self._get_active_shards():
store = self.shard_stores[shard_id]
task = asyncio.create_task(
store.search_memories(query, limit=shard_limit, **kwargs)
)
tasks.append(task)
shard_results = await asyncio.gather(*tasks, return_exceptions=True)
# Merge and sort results
all_results = []
for results in shard_results:
if isinstance(results, Exception):
continue
all_results.extend(results)
# Sort by relevance/importance and limit
all_results.sort(key=lambda x: x.get('score', 0), reverse=True)
return all_results[:limit]
class LoadBalancedMemoryCluster:
"""Load-balanced cluster with read replicas and write distribution"""
def __init__(self, primary_shards: List[ShardConfig],
read_replicas: Dict[str, List[ShardConfig]]):
self.primary_shards = {config.shard_id: config for config in primary_shards}
self.read_replicas = read_replicas
self.shard_manager = MemoryShardManager(primary_shards)
self.replica_managers: Dict[str, MemoryShardManager] = {}
# Initialize replica managers
for shard_id, replica_configs in read_replicas.items():
self.replica_managers[shard_id] = MemoryShardManager(replica_configs)
async def store_memory(self, memory_data: Dict[str, Any]) -> str:
"""Store memory in primary shard"""
memory_id, shard_id = await self.shard_manager.store_memory(memory_data)
# Asynchronously replicate to read replicas
asyncio.create_task(self._replicate_to_replicas(memory_data, shard_id))
return memory_id
async def search_memories(self, query: str, **kwargs) -> List[Dict[str, Any]]:
"""Search using read replicas for better performance"""
# Choose replica based on load balancing
replica_manager = self._choose_replica_manager()
if replica_manager:
try:
return await replica_manager.search_memories(query, **kwargs)
except Exception as e:
logging.warning(f"Replica search failed, falling back to primary: {e}")
# Fallback to primary
return await self.shard_manager.search_memories(query, **kwargs)
def _choose_replica_manager(self) -> Optional[MemoryShardManager]:
"""Choose replica manager based on load balancing"""
# Simple round-robin or random selection
if not self.replica_managers:
return None
# For production, implement proper load balancing
import random
return random.choice(list(self.replica_managers.values()))
async def _replicate_to_replicas(self, memory_data: Dict[str, Any], shard_id: str):
"""Replicate memory to read replicas"""
replica_manager = self.replica_managers.get(shard_id)
if not replica_manager:
return
try:
await replica_manager.store_memory(memory_data)
except Exception as e:
logging.error(f"Replication failed for shard {shard_id}: {e}")Caching Layer Implementation
from typing import Optional, Dict, List, Any
import pickle
import zlib
import json
from datetime import datetime, timedelta
import hashlib
class MemoryCacheLayer:
"""Multi-level caching for memory operations"""
def __init__(self, redis_client: redis.Redis,
local_cache_size: int = 1000,
default_ttl: int = 3600):
self.redis_client = redis_client
self.local_cache: Dict[str, Tuple[Any, datetime]] = {}
self.local_cache_size = local_cache_size
self.default_ttl = default_ttl
self.stats = {
'local_hits': 0,
'redis_hits': 0,
'misses': 0,
'stores': 0
}
async def get_memory(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get memory with multi-level caching"""
# Level 1: Local cache
if memory_id in self.local_cache:
data, timestamp = self.local_cache[memory_id]
if datetime.now() - timestamp < timedelta(seconds=300): # 5 min TTL
self.stats['local_hits'] += 1
return data
# Level 2: Redis cache
try:
cached_data = await self.redis_client.get(f"memory:{memory_id}")
if cached_data:
# Decompress and deserialize
decompressed = zlib.decompress(cached_data)
memory_data = pickle.loads(decompressed)
# Update local cache
self._update_local_cache(memory_id, memory_data)
self.stats['redis_hits'] += 1
return memory_data
except Exception as e:
logging.warning(f"Redis cache error: {e}")
self.stats['misses'] += 1
return None
async def store_memory(self, memory_id: str, memory_data: Dict[str, Any],
ttl: Optional[int] = None):
"""Store memory in cache with compression"""
# Store in local cache
self._update_local_cache(memory_id, memory_data)
# Store in Redis with compression
try:
# Serialize and compress
serialized = pickle.dumps(memory_data)
compressed = zlib.compress(serialized, level=6)
ttl = ttl or self.default_ttl
await self.redis_client.setex(
f"memory:{memory_id}",
ttl,
compressed
)
self.stats['stores'] += 1
except Exception as e:
logging.error(f"Failed to cache memory {memory_id}: {e}")
async def store_search_results(self, query_hash: str,
results: List[Dict[str, Any]],
ttl: int = 300):
"""Cache search results"""
try:
serialized = pickle.dumps(results)
compressed = zlib.compress(serialized)
await self.redis_client.setex(
f"search:{query_hash}",
ttl,
compressed
)
except Exception as e:
logging.error(f"Failed to cache search results: {e}")
async def get_search_results(self, query_hash: str) -> Optional[List[Dict[str, Any]]]:
"""Get cached search results"""
try:
cached_data = await self.redis_client.get(f"search:{query_hash}")
if cached_data:
decompressed = zlib.decompress(cached_data)
return pickle.loads(decompressed)
except Exception as e:
logging.warning(f"Search cache error: {e}")
return None
def _update_local_cache(self, memory_id: str, memory_data: Dict[str, Any]):
"""Update local cache with LRU eviction"""
# Remove oldest entries if cache is full
if len(self.local_cache) >= self.local_cache_size:
# Simple LRU: remove oldest entry
oldest_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k][1]
)
del self.local_cache[oldest_key]
self.local_cache[memory_id] = (memory_data, datetime.now())
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
total_requests = sum([
self.stats['local_hits'],
self.stats['redis_hits'],
self.stats['misses']
])
return {
**self.stats,
'local_hit_rate': self.stats['local_hits'] / max(total_requests, 1),
'redis_hit_rate': self.stats['redis_hits'] / max(total_requests, 1),
'overall_hit_rate': (self.stats['local_hits'] + self.stats['redis_hits']) / max(total_requests, 1),
'local_cache_size': len(self.local_cache)
}
class SearchQueryOptimizer:
"""Optimize and cache search queries"""
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def generate_query_hash(self, query: str, filters: Dict[str, Any]) -> str:
"""Generate deterministic hash for query and filters"""
query_data = {
'query': query.lower().strip(),
'filters': filters
}
query_string = json.dumps(query_data, sort_keys=True)
return hashlib.md5(query_string.encode()).hexdigest()
async def optimize_search_query(self, query: str) -> str:
"""Optimize search query using common patterns"""
# Check for cached optimizations
query_hash = hashlib.md5(query.encode()).hexdigest()
cached_optimization = await self.redis_client.get(f"query_opt:{query_hash}")
if cached_optimization:
return cached_optimization.decode()
# Simple optimizations
optimized = query.strip().lower()
# Remove common stop words for better semantic search
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words = optimized.split()
optimized_words = [w for w in words if w not in stop_words]
optimized = ' '.join(optimized_words)
# Cache the optimization
await self.redis_client.setex(f"query_opt:{query_hash}", 3600, optimized)
return optimizedReliability and Resilience
Circuit Breaker Pattern
from typing import Callable, Any, Optional
from enum import Enum
from datetime import datetime, timedelta
import asyncio
import logging
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreakerConfig:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: tuple = (Exception,)):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
class CircuitBreaker:
"""Circuit breaker for memory operations"""
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[datetime] = None
self.next_attempt_time: Optional[datetime] = None
self.logger = logging.getLogger(f"CircuitBreaker.{name}")
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.logger.info(f"Circuit breaker {self.name} transitioning to HALF_OPEN")
else:
raise Exception(f"Circuit breaker {self.name} is OPEN")
try:
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
if self.next_attempt_time is None:
return True
return datetime.now() >= self.next_attempt_time
def _on_success(self):
"""Handle successful operation"""
if self.state == CircuitState.HALF_OPEN:
self.logger.info(f"Circuit breaker {self.name} recovered, transitioning to CLOSED")
self.failure_count = 0
self.last_failure_time = None
self.next_attempt_time = None
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed operation"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
self.next_attempt_time = datetime.now() + timedelta(seconds=self.config.recovery_timeout)
self.logger.warning(f"Circuit breaker {self.name} opened due to {self.failure_count} failures")
class ResilientMemoryManager:
"""Memory manager with circuit breaker protection"""
def __init__(self, memory_store, cache_layer: Optional[MemoryCacheLayer] = None):
self.memory_store = memory_store
self.cache_layer = cache_layer
# Circuit breakers for different operations
self.circuit_breakers = {
'store': CircuitBreaker('memory_store', CircuitBreakerConfig(failure_threshold=3)),
'retrieve': CircuitBreaker('memory_retrieve', CircuitBreakerConfig(failure_threshold=5)),
'search': CircuitBreaker('memory_search', CircuitBreakerConfig(failure_threshold=5)),
}
async def store_memory(self, memory_data: Dict[str, Any]) -> str:
"""Store memory with circuit breaker protection"""
async def _store_operation():
memory_id = await self.memory_store.store_memory(**memory_data)
# Update cache asynchronously
if self.cache_layer:
asyncio.create_task(
self.cache_layer.store_memory(memory_id, memory_data)
)
return memory_id
return await self.circuit_breakers['store'].call(_store_operation)
async def retrieve_memory(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve memory with caching and fallback"""
# Try cache first
if self.cache_layer:
cached_result = await self.cache_layer.get_memory(memory_id)
if cached_result:
return cached_result
# Try database with circuit breaker
async def _retrieve_operation():
result = await self.memory_store.retrieve_memory(memory_id)
# Cache the result
if result and self.cache_layer:
asyncio.create_task(
self.cache_layer.store_memory(memory_id, result)
)
return result
try:
return await self.circuit_breakers['retrieve'].call(_retrieve_operation)
except Exception as e:
logging.error(f"Failed to retrieve memory {memory_id}: {e}")
return None
async def search_memories(self, query: str, **kwargs) -> List[Dict[str, Any]]:
"""Search memories with caching and circuit breaker"""
# Generate cache key
if self.cache_layer:
optimizer = SearchQueryOptimizer(self.cache_layer.redis_client)
query_hash = optimizer.generate_query_hash(query, kwargs)
# Try cache first
cached_results = await self.cache_layer.get_search_results(query_hash)
if cached_results:
return cached_results
# Try database with circuit breaker
async def _search_operation():
results = await self.memory_store.search_memories(query, **kwargs)
# Cache the results
if self.cache_layer:
asyncio.create_task(
self.cache_layer.store_search_results(query_hash, results)
)
return results
try:
return await self.circuit_breakers['search'].call(_search_operation)
except Exception as e:
logging.error(f"Search failed for query '{query}': {e}")
return []
def get_health_status(self) -> Dict[str, Any]:
"""Get health status of circuit breakers"""
return {
name: {
'state': breaker.state.value,
'failure_count': breaker.failure_count,
'last_failure': breaker.last_failure_time.isoformat() if breaker.last_failure_time else None
}
for name, breaker in self.circuit_breakers.items()
}Data Backup and Recovery
import asyncio
import json
import gzip
from typing import AsyncGenerator, Dict, Any, List
from datetime import datetime, timedelta
import logging
import boto3
from botocore.exceptions import ClientError
class MemoryBackupManager:
"""Manage memory data backups and recovery"""
def __init__(self, memory_store,
s3_bucket: Optional[str] = None,
backup_interval_hours: int = 24,
retention_days: int = 30):
self.memory_store = memory_store
self.s3_bucket = s3_bucket
self.backup_interval = timedelta(hours=backup_interval_hours)
self.retention_days = retention_days
self.logger = logging.getLogger(__name__)
# Initialize S3 client if bucket specified
self.s3_client = boto3.client('s3') if s3_bucket else None
async def create_backup(self, backup_name: Optional[str] = None) -> str:
"""Create a full backup of memory data"""
if not backup_name:
backup_name = f"memory_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_data = {
'metadata': {
'backup_name': backup_name,
'created_at': datetime.now().isoformat(),
'total_memories': 0,
'backup_version': '1.0'
},
'memories': []
}
total_memories = 0
# Stream memories for large datasets
async for batch in self._get_memory_batches():
backup_data['memories'].extend(batch)
total_memories += len(batch)
# Log progress for large backups
if total_memories % 10000 == 0:
self.logger.info(f"Backup progress: {total_memories} memories processed")
backup_data['metadata']['total_memories'] = total_memories
# Compress and save backup
backup_content = json.dumps(backup_data, default=str)
compressed_content = gzip.compress(backup_content.encode('utf-8'))
# Save locally
local_path = f"backups/{backup_name}.json.gz"
await self._save_backup_locally(local_path, compressed_content)
# Save to S3 if configured
if self.s3_client:
await self._save_backup_to_s3(backup_name, compressed_content)
self.logger.info(f"Backup {backup_name} completed: {total_memories} memories")
return backup_name
async def restore_backup(self, backup_name: str,
target_store=None,
dry_run: bool = True) -> Dict[str, Any]:
"""Restore from backup"""
target = target_store or self.memory_store
# Load backup data
backup_data = await self._load_backup(backup_name)
if not backup_data:
raise Exception(f"Backup {backup_name} not found")
restoration_stats = {
'total_memories': len(backup_data['memories']),
'restored_memories': 0,
'failed_memories': 0,
'errors': []
}
if dry_run:
self.logger.info(f"DRY RUN: Would restore {restoration_stats['total_memories']} memories")
return restoration_stats
# Restore memories in batches
batch_size = 100
for i in range(0, len(backup_data['memories']), batch_size):
batch = backup_data['memories'][i:i + batch_size]
batch_results = await self._restore_memory_batch(batch, target)
restoration_stats['restored_memories'] += batch_results['successful']
restoration_stats['failed_memories'] += batch_results['failed']
restoration_stats['errors'].extend(batch_results['errors'])
# Progress logging
if i % 1000 == 0:
self.logger.info(f"Restoration progress: {i} memories processed")
self.logger.info(f"Restoration completed: {restoration_stats}")
return restoration_stats
async def _get_memory_batches(self, batch_size: int = 1000) -> AsyncGenerator[List[Dict], None]:
"""Stream memory data in batches"""
# This depends on your memory store implementation
# Example for PostgreSQL with offset/limit
offset = 0
while True:
if hasattr(self.memory_store, 'get_batch'):
batch = await self.memory_store.get_batch(offset=offset, limit=batch_size)
else:
# Fallback implementation
all_memories = await self.memory_store.get_all_memories()
batch = all_memories[offset:offset + batch_size]
if not batch:
break
yield batch
offset += len(batch)
async def _save_backup_locally(self, path: str, content: bytes):
"""Save backup to local filesystem"""
import os
import aiofiles
os.makedirs(os.path.dirname(path), exist_ok=True)
async with aiofiles.open(path, 'wb') as f:
await f.write(content)
async def _save_backup_to_s3(self, backup_name: str, content: bytes):
"""Save backup to S3"""
try:
s3_key = f"memory-backups/{backup_name}.json.gz"
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self.s3_client.put_object,
{
'Bucket': self.s3_bucket,
'Key': s3_key,
'Body': content,
'ContentType': 'application/gzip',
'Metadata': {
'backup_name': backup_name,
'created_at': datetime.now().isoformat()
}
}
)
self.logger.info(f"Backup saved to S3: s3://{self.s3_bucket}/{s3_key}")
except ClientError as e:
self.logger.error(f"Failed to save backup to S3: {e}")
raise
async def _load_backup(self, backup_name: str) -> Optional[Dict[str, Any]]:
"""Load backup data"""
# Try S3 first if available
if self.s3_client:
try:
s3_data = await self._load_backup_from_s3(backup_name)
if s3_data:
return s3_data
except Exception as e:
self.logger.warning(f"Failed to load from S3, trying local: {e}")
# Try local backup
return await self._load_backup_locally(backup_name)
async def _load_backup_from_s3(self, backup_name: str) -> Optional[Dict[str, Any]]:
"""Load backup from S3"""
try:
s3_key = f"memory-backups/{backup_name}.json.gz"
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
self.s3_client.get_object,
{'Bucket': self.s3_bucket, 'Key': s3_key}
)
compressed_data = response['Body'].read()
json_data = gzip.decompress(compressed_data).decode('utf-8')
return json.loads(json_data)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return None
raise
async def _load_backup_locally(self, backup_name: str) -> Optional[Dict[str, Any]]:
"""Load backup from local filesystem"""
import aiofiles
path = f"backups/{backup_name}.json.gz"
try:
async with aiofiles.open(path, 'rb') as f:
compressed_data = await f.read()
json_data = gzip.decompress(compressed_data).decode('utf-8')
return json.loads(json_data)
except FileNotFoundError:
return None
async def _restore_memory_batch(self, batch: List[Dict], target_store) -> Dict[str, Any]:
"""Restore a batch of memories"""
results = {'successful': 0, 'failed': 0, 'errors': []}
tasks = []
for memory_data in batch:
task = asyncio.create_task(
self._restore_single_memory(memory_data, target_store)
)
tasks.append(task)
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in batch_results:
if isinstance(result, Exception):
results['failed'] += 1
results['errors'].append(str(result))
elif result:
results['successful'] += 1
else:
results['failed'] += 1
results['errors'].append("Unknown restore failure")
return results
async def _restore_single_memory(self, memory_data: Dict, target_store) -> bool:
"""Restore a single memory"""
try:
await target_store.store_memory(**memory_data)
return True
except Exception as e:
self.logger.error(f"Failed to restore memory {memory_data.get('id', 'unknown')}: {e}")
return False
async def cleanup_old_backups(self):
"""Remove old backups based on retention policy"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
# Cleanup local backups
await self._cleanup_local_backups(cutoff_date)
# Cleanup S3 backups
if self.s3_client:
await self._cleanup_s3_backups(cutoff_date)
async def _cleanup_local_backups(self, cutoff_date: datetime):
"""Cleanup old local backup files"""
import os
import aiofiles.os
backup_dir = "backups"
if not os.path.exists(backup_dir):
return
for filename in os.listdir(backup_dir):
if not filename.endswith('.json.gz'):
continue
file_path = os.path.join(backup_dir, filename)
stat_result = await aiofiles.os.stat(file_path)
file_date = datetime.fromtimestamp(stat_result.st_mtime)
if file_date < cutoff_date:
await aiofiles.os.remove(file_path)
self.logger.info(f"Deleted old local backup: {filename}")
async def _cleanup_s3_backups(self, cutoff_date: datetime):
"""Cleanup old S3 backup files"""
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
self.s3_client.list_objects_v2,
{
'Bucket': self.s3_bucket,
'Prefix': 'memory-backups/'
}
)
if 'Contents' not in response:
return
delete_keys = []
for obj in response['Contents']:
if obj['LastModified'].replace(tzinfo=None) < cutoff_date:
delete_keys.append({'Key': obj['Key']})
if delete_keys:
await loop.run_in_executor(
None,
self.s3_client.delete_objects,
{
'Bucket': self.s3_bucket,
'Delete': {'Objects': delete_keys}
}
)
self.logger.info(f"Deleted {len(delete_keys)} old S3 backups")
except ClientError as e:
self.logger.error(f"Failed to cleanup S3 backups: {e}")Monitoring and Observability
Comprehensive Monitoring System
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import asyncio
import json
import logging
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import psutil
import time
# Prometheus metrics
memory_operations_total = Counter('memory_operations_total', 'Total memory operations', ['operation', 'status'])
memory_operation_duration = Histogram('memory_operation_duration_seconds', 'Memory operation duration', ['operation'])
memory_store_size = Gauge('memory_store_size_bytes', 'Total memory store size')
active_memory_connections = Gauge('active_memory_connections', 'Active database connections')
memory_cache_hit_rate = Gauge('memory_cache_hit_rate', 'Cache hit rate')
@dataclass
class MemoryMetrics:
"""Memory system metrics"""
timestamp: datetime
total_memories: int
memories_by_type: Dict[str, int]
operation_counts: Dict[str, int]
average_response_times: Dict[str, float]
cache_stats: Dict[str, Any]
error_rates: Dict[str, float]
system_health: Dict[str, Any]
class MetricsCollector:
"""Collect and aggregate memory system metrics"""
def __init__(self, memory_manager: ResilientMemoryManager):
self.memory_manager = memory_manager
self.operation_counts = {}
self.response_times = {}
self.error_counts = {}
self.start_time = time.time()
def record_operation(self, operation: str, duration: float, success: bool = True):
"""Record operation metrics"""
# Prometheus metrics
status = 'success' if success else 'error'
memory_operations_total.labels(operation=operation, status=status).inc()
memory_operation_duration.labels(operation=operation).observe(duration)
# Internal tracking
if operation not in self.operation_counts:
self.operation_counts[operation] = {'success': 0, 'error': 0}
self.response_times[operation] = []
self.operation_counts[operation][status] += 1
self.response_times[operation].append(duration)
# Keep only recent response times (last 1000 operations)
if len(self.response_times[operation]) > 1000:
self.response_times[operation] = self.response_times[operation][-1000:]
async def collect_metrics(self) -> MemoryMetrics:
"""Collect current metrics snapshot"""
# Get cache stats
cache_stats = {}
if hasattr(self.memory_manager, 'cache_layer') and self.memory_manager.cache_layer:
cache_stats = self.memory_manager.cache_layer.get_cache_stats()
memory_cache_hit_rate.set(cache_stats.get('overall_hit_rate', 0))
# Calculate error rates
error_rates = {}
for operation, counts in self.operation_counts.items():
total = counts['success'] + counts['error']
error_rates[operation] = counts['error'] / total if total > 0 else 0
# Calculate average response times
avg_response_times = {}
for operation, times in self.response_times.items():
avg_response_times[operation] = sum(times) / len(times) if times else 0
# System health
system_health = await self._collect_system_health()
return MemoryMetrics(
timestamp=datetime.now(),
total_memories=await self._get_total_memory_count(),
memories_by_type=await self._get_memories_by_type(),
operation_counts=self.operation_counts.copy(),
average_response_times=avg_response_times,
cache_stats=cache_stats,
error_rates=error_rates,
system_health=system_health
)
async def _get_total_memory_count(self) -> int:
"""Get total number of memories"""
try:
# This depends on your store implementation
if hasattr(self.memory_manager.memory_store, 'get_total_count'):
return await self.memory_manager.memory_store.get_total_count()
return 0
except Exception:
return 0
async def _get_memories_by_type(self) -> Dict[str, int]:
"""Get memory counts by type"""
try:
if hasattr(self.memory_manager.memory_store, 'get_counts_by_type'):
return await self.memory_manager.memory_store.get_counts_by_type()
return {}
except Exception:
return {}
async def _collect_system_health(self) -> Dict[str, Any]:
"""Collect system health metrics"""
# CPU and memory usage
cpu_percent = psutil.cpu_percent(interval=1)
memory_info = psutil.virtual_memory()
# Database connection health
circuit_breaker_health = self.memory_manager.get_health_status()
# Update Prometheus gauges
memory_store_size.set(memory_info.used)
return {
'cpu_usage_percent': cpu_percent,
'memory_usage_percent': memory_info.percent,
'disk_usage_percent': psutil.disk_usage('/').percent,
'circuit_breakers': circuit_breaker_health,
'uptime_seconds': time.time() - self.start_time
}
class HealthChecker:
"""Health check system for memory services"""
def __init__(self, memory_manager: ResilientMemoryManager,
metrics_collector: MetricsCollector):
self.memory_manager = memory_manager
self.metrics_collector = metrics_collector
self.health_checks = {}
async def perform_health_check(self) -> Dict[str, Any]:
"""Perform comprehensive health check"""
health_status = {
'overall_status': 'healthy',
'timestamp': datetime.now().isoformat(),
'checks': {}
}
# Database connectivity check
db_health = await self._check_database_health()
health_status['checks']['database'] = db_health
# Cache health check
cache_health = await self._check_cache_health()
health_status['checks']['cache'] = cache_health
# Performance health check
performance_health = await self._check_performance_health()
health_status['checks']['performance'] = performance_health
# System resources health check
system_health = await self._check_system_resources()
health_status['checks']['system'] = system_health
# Circuit breaker health
cb_health = self._check_circuit_breakers()
health_status['checks']['circuit_breakers'] = cb_health
# Determine overall status
failed_checks = [
name for name, check in health_status['checks'].items()
if check.get('status') != 'healthy'
]
if failed_checks:
if any(check.get('severity') == 'critical' for check in health_status['checks'].values()):
health_status['overall_status'] = 'unhealthy'
else:
health_status['overall_status'] = 'degraded'
return health_status
async def _check_database_health(self) -> Dict[str, Any]:
"""Check database connectivity and performance"""
try:
start_time = time.time()
# Test basic operations
test_memory = {
'id': f'health_check_{int(time.time())}',
'content': 'Health check memory',
'memory_type': 'working',
'importance': 0.5,
'timestamp': datetime.now()
}
# Test store operation
memory_id = await self.memory_manager.store_memory(test_memory)
# Test retrieve operation
retrieved = await self.memory_manager.retrieve_memory(memory_id)
# Test search operation
await self.memory_manager.search_memories('health check', limit=1)
# Cleanup
if hasattr(self.memory_manager.memory_store, 'delete_memory'):
await self.memory_manager.memory_store.delete_memory(memory_id)
duration = time.time() - start_time
return {
'status': 'healthy',
'response_time_ms': duration * 1000,
'operations_tested': ['store', 'retrieve', 'search']
}
except Exception as e:
return {
'status': 'unhealthy',
'severity': 'critical',
'error': str(e),
'operations_tested': ['store', 'retrieve', 'search']
}
async def _check_cache_health(self) -> Dict[str, Any]:
"""Check cache layer health"""
if not hasattr(self.memory_manager, 'cache_layer') or not self.memory_manager.cache_layer:
return {'status': 'not_configured'}
try:
cache_stats = self.memory_manager.cache_layer.get_cache_stats()
hit_rate = cache_stats.get('overall_hit_rate', 0)
status = 'healthy' if hit_rate > 0.5 else 'degraded'
return {
'status': status,
'hit_rate': hit_rate,
'cache_size': cache_stats.get('local_cache_size', 0),
'stats': cache_stats
}
except Exception as e:
return {
'status': 'unhealthy',
'severity': 'warning',
'error': str(e)
}
async def _check_performance_health(self) -> Dict[str, Any]:
"""Check performance metrics"""
metrics = await self.metrics_collector.collect_metrics()
# Check average response times
slow_operations = []
for operation, avg_time in metrics.average_response_times.items():
if avg_time > 1.0: # More than 1 second
slow_operations.append({
'operation': operation,
'avg_time_ms': avg_time * 1000
})
# Check error rates
high_error_operations = []
for operation, error_rate in metrics.error_rates.items():
if error_rate > 0.05: # More than 5% error rate
high_error_operations.append({
'operation': operation,
'error_rate': error_rate
})
status = 'healthy'
if slow_operations or high_error_operations:
status = 'degraded' if not high_error_operations else 'unhealthy'
return {
'status': status,
'slow_operations': slow_operations,
'high_error_operations': high_error_operations,
'metrics_summary': {
'total_memories': metrics.total_memories,
'avg_response_times': metrics.average_response_times
}
}
async def _check_system_resources(self) -> Dict[str, Any]:
"""Check system resource usage"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
disk_percent = psutil.disk_usage('/').percent
warnings = []
if cpu_percent > 80:
warnings.append(f"High CPU usage: {cpu_percent}%")
if memory_percent > 85:
warnings.append(f"High memory usage: {memory_percent}%")
if disk_percent > 90:
warnings.append(f"High disk usage: {disk_percent}%")
status = 'healthy'
if warnings:
status = 'degraded' if cpu_percent < 95 and memory_percent < 95 else 'unhealthy'
return {
'status': status,
'cpu_usage_percent': cpu_percent,
'memory_usage_percent': memory_percent,
'disk_usage_percent': disk_percent,
'warnings': warnings
}
def _check_circuit_breakers(self) -> Dict[str, Any]:
"""Check circuit breaker status"""
cb_status = self.memory_manager.get_health_status()
failed_circuits = [
name for name, status in cb_status.items()
if status['state'] != 'closed'
]
overall_status = 'healthy' if not failed_circuits else 'unhealthy'
return {
'status': overall_status,
'severity': 'critical' if failed_circuits else 'info',
'failed_circuits': failed_circuits,
'circuit_details': cb_status
}
class AlertManager:
"""Manage alerts based on metrics and health checks"""
def __init__(self, webhook_url: Optional[str] = None,
email_config: Optional[Dict[str, str]] = None):
self.webhook_url = webhook_url
self.email_config = email_config
self.alert_history = []
self.alert_thresholds = {
'error_rate': 0.10,
'response_time': 2.0,
'cpu_usage': 90,
'memory_usage': 90,
'disk_usage': 95
}
async def check_and_send_alerts(self, metrics: MemoryMetrics,
health_status: Dict[str, Any]):
"""Check metrics and send alerts if thresholds are exceeded"""
alerts = []
# Check error rates
for operation, error_rate in metrics.error_rates.items():
if error_rate > self.alert_thresholds['error_rate']:
alerts.append({
'type': 'error_rate',
'operation': operation,
'value': error_rate,
'threshold': self.alert_thresholds['error_rate'],
'severity': 'warning'
})
# Check response times
for operation, response_time in metrics.average_response_times.items():
if response_time > self.alert_thresholds['response_time']:
alerts.append({
'type': 'slow_response',
'operation': operation,
'value': response_time,
'threshold': self.alert_thresholds['response_time'],
'severity': 'warning'
})
# Check system resources
cpu_usage = metrics.system_health.get('cpu_usage_percent', 0)
if cpu_usage > self.alert_thresholds['cpu_usage']:
alerts.append({
'type': 'high_cpu',
'value': cpu_usage,
'threshold': self.alert_thresholds['cpu_usage'],
'severity': 'critical'
})
# Check overall health
if health_status['overall_status'] == 'unhealthy':
alerts.append({
'type': 'system_unhealthy',
'details': health_status,
'severity': 'critical'
})
# Send alerts
for alert in alerts:
await self._send_alert(alert)
async def _send_alert(self, alert: Dict[str, Any]):
"""Send individual alert"""
# Prevent duplicate alerts (simple deduplication)
alert_key = f"{alert['type']}_{alert.get('operation', 'system')}"
recent_alerts = [
a for a in self.alert_history[-10:] # Check last 10 alerts
if a.get('key') == alert_key and
(datetime.now() - a['timestamp']).seconds < 300 # Within 5 minutes
]
if recent_alerts:
return # Skip duplicate alert
# Add to history
alert['key'] = alert_key
alert['timestamp'] = datetime.now()
self.alert_history.append(alert)
# Keep only last 100 alerts in history
if len(self.alert_history) > 100:
self.alert_history = self.alert_history[-100:]
# Send alert via configured channels
if self.webhook_url:
await self._send_webhook_alert(alert)
if self.email_config:
await self._send_email_alert(alert)
logging.warning(f"ALERT: {alert}")
async def _send_webhook_alert(self, alert: Dict[str, Any]):
"""Send alert via webhook"""
try:
import aiohttp
payload = {
'alert': alert,
'timestamp': alert['timestamp'].isoformat(),
'service': 'memory_system'
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as response:
if response.status != 200:
logging.error(f"Failed to send webhook alert: {response.status}")
except Exception as e:
logging.error(f"Webhook alert failed: {e}")
async def _send_email_alert(self, alert: Dict[str, Any]):
"""Send alert via email"""
# Implementation would depend on your email service
# This is a placeholder
logging.info(f"Would send email alert: {alert}")
# Example production setup
async def setup_production_monitoring():
"""Setup complete production monitoring"""
# Initialize memory system
memory_manager = ResilientMemoryManager(memory_store=your_memory_store)
# Start metrics collection
metrics_collector = MetricsCollector(memory_manager)
# Start Prometheus metrics server
start_http_server(8000)
# Setup health checking
health_checker = HealthChecker(memory_manager, metrics_collector)
# Setup alerting
alert_manager = AlertManager(
webhook_url="https://your-webhook-endpoint.com/alerts",
email_config={'smtp_server': 'smtp.example.com'}
)
# Background monitoring loop
async def monitoring_loop():
while True:
try:
# Collect metrics
metrics = await metrics_collector.collect_metrics()
# Perform health check
health_status = await health_checker.perform_health_check()
# Check for alerts
await alert_manager.check_and_send_alerts(metrics, health_status)
# Log summary
logging.info(f"Monitoring cycle: {health_status['overall_status']}, "
f"Total memories: {metrics.total_memories}")
await asyncio.sleep(60) # Check every minute
except Exception as e:
logging.error(f"Monitoring loop error: {e}")
await asyncio.sleep(30) # Shorter sleep on error
# Start monitoring
asyncio.create_task(monitoring_loop())
return {
'memory_manager': memory_manager,
'metrics_collector': metrics_collector,
'health_checker': health_checker,
'alert_manager': alert_manager
}This comprehensive production patterns guide provides enterprise-grade implementations for scaling, reliability, backup/recovery, and monitoring of agent memory systems. All patterns are designed for high-throughput production environments with proper error handling, observability, and operational excellence.