Skip to Content
DocumentationImplementationProduction Patterns

Production Patterns for Agent Memory

Building production-ready agent memory systems requires careful attention to scalability, reliability, monitoring, and operational excellence. This guide provides battle-tested patterns and implementations for enterprise deployments.

Scalability Patterns

Horizontal Scaling Architecture

import asyncio from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass from enum import Enum import hashlib import redis import json import time from datetime import datetime, timedelta class ShardingStrategy(Enum): USER_BASED = "user_based" CONTENT_HASH = "content_hash" TEMPORAL = "temporal" MEMORY_TYPE = "memory_type" @dataclass class ShardConfig: shard_id: str database_config: Dict[str, Any] weight: int = 1 status: str = "active" # active, readonly, maintenance class MemoryShardManager: """Manages memory distribution across multiple database shards""" def __init__(self, shard_configs: List[ShardConfig], sharding_strategy: ShardingStrategy = ShardingStrategy.USER_BASED, redis_client: Optional[redis.Redis] = None): self.shard_configs = {config.shard_id: config for config in shard_configs} self.sharding_strategy = sharding_strategy self.redis_client = redis_client or redis.Redis() self.shard_stores: Dict[str, Any] = {} self._initialize_shards() def _initialize_shards(self): """Initialize connections to all shards""" for shard_id, config in self.shard_configs.items(): # Initialize database connection based on config # This would use your chosen database implementation self.shard_stores[shard_id] = self._create_store_instance(config) def _create_store_instance(self, config: ShardConfig): """Create database store instance from config""" # Factory method to create appropriate store type db_type = config.database_config.get('type', 'postgresql') if db_type == 'postgresql': from .database_implementations import PostgreSQLMemoryStore return PostgreSQLMemoryStore(config.database_config['connection_string']) elif db_type == 'mongodb': from .database_implementations import MongoMemoryStore return MongoMemoryStore(config.database_config['connection_string']) # Add other database types as needed def get_shard_for_memory(self, memory_data: Dict[str, Any]) -> str: """Determine which shard should store this memory""" if self.sharding_strategy == ShardingStrategy.USER_BASED: user_id = memory_data.get('metadata', {}).get('user_id', 'unknown') return self._hash_to_shard(user_id) elif self.sharding_strategy == ShardingStrategy.CONTENT_HASH: content = memory_data.get('content', '') return self._hash_to_shard(content) elif self.sharding_strategy == ShardingStrategy.TEMPORAL: timestamp = memory_data.get('timestamp', datetime.now()) # Shard by month/year period_key = timestamp.strftime('%Y-%m') return self._hash_to_shard(period_key) elif self.sharding_strategy == ShardingStrategy.MEMORY_TYPE: memory_type = memory_data.get('memory_type', 'episodic') return self._hash_to_shard(memory_type) else: # Default to first active shard return self._get_active_shards()[0] def _hash_to_shard(self, key: str) -> str: """Hash key to determine shard""" active_shards = self._get_active_shards() if not active_shards: raise Exception("No active shards available") hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16) shard_index = hash_value % len(active_shards) return active_shards[shard_index] def _get_active_shards(self) -> List[str]: """Get list of active shard IDs""" return [ shard_id for shard_id, config in self.shard_configs.items() if config.status == "active" ] async def store_memory(self, memory_data: Dict[str, Any]) -> Tuple[str, str]: """Store memory in appropriate shard""" shard_id = self.get_shard_for_memory(memory_data) store = self.shard_stores[shard_id] memory_id = await store.store_memory(**memory_data) # Cache shard location for quick retrieval await self._cache_memory_location(memory_id, shard_id) return memory_id, shard_id async def retrieve_memory(self, memory_id: str) -> Optional[Dict[str, Any]]: """Retrieve memory from appropriate shard""" shard_id = await self._get_memory_location(memory_id) if not shard_id: # Fallback: search all shards return await self._search_all_shards_for_memory(memory_id) store = self.shard_stores.get(shard_id) if not store: return None return await store.retrieve_memory(memory_id) async def search_memories(self, query: str, user_id: Optional[str] = None, **kwargs) -> List[Dict[str, Any]]: """Search memories across relevant shards""" if user_id and self.sharding_strategy == ShardingStrategy.USER_BASED: # Search only user's shard shard_id = self._hash_to_shard(user_id) store = self.shard_stores[shard_id] return await store.search_memories(query, **kwargs) else: # Search across all active shards and merge results return await self._search_across_shards(query, **kwargs) async def _cache_memory_location(self, memory_id: str, shard_id: str): """Cache memory location in Redis for fast retrieval""" try: await self.redis_client.setex( f"memory_location:{memory_id}", timedelta(hours=24).total_seconds(), shard_id ) except Exception as e: logging.warning(f"Failed to cache memory location: {e}") async def _get_memory_location(self, memory_id: str) -> Optional[str]: """Get cached memory location""" try: shard_id = await self.redis_client.get(f"memory_location:{memory_id}") return shard_id.decode() if shard_id else None except Exception: return None async def _search_all_shards_for_memory(self, memory_id: str) -> Optional[Dict[str, Any]]: """Search all shards for a memory (expensive fallback)""" tasks = [] for shard_id in self._get_active_shards(): store = self.shard_stores[shard_id] task = asyncio.create_task(store.retrieve_memory(memory_id)) tasks.append((shard_id, task)) results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) for (shard_id, _), result in zip(tasks, results): if isinstance(result, Exception): continue if result is not None: # Cache the found location await self._cache_memory_location(memory_id, shard_id) return result return None async def _search_across_shards(self, query: str, **kwargs) -> List[Dict[str, Any]]: """Search across all shards and merge results""" limit = kwargs.pop('limit', 10) shard_limit = max(limit // len(self._get_active_shards()) + 1, 5) tasks = [] for shard_id in self._get_active_shards(): store = self.shard_stores[shard_id] task = asyncio.create_task( store.search_memories(query, limit=shard_limit, **kwargs) ) tasks.append(task) shard_results = await asyncio.gather(*tasks, return_exceptions=True) # Merge and sort results all_results = [] for results in shard_results: if isinstance(results, Exception): continue all_results.extend(results) # Sort by relevance/importance and limit all_results.sort(key=lambda x: x.get('score', 0), reverse=True) return all_results[:limit] class LoadBalancedMemoryCluster: """Load-balanced cluster with read replicas and write distribution""" def __init__(self, primary_shards: List[ShardConfig], read_replicas: Dict[str, List[ShardConfig]]): self.primary_shards = {config.shard_id: config for config in primary_shards} self.read_replicas = read_replicas self.shard_manager = MemoryShardManager(primary_shards) self.replica_managers: Dict[str, MemoryShardManager] = {} # Initialize replica managers for shard_id, replica_configs in read_replicas.items(): self.replica_managers[shard_id] = MemoryShardManager(replica_configs) async def store_memory(self, memory_data: Dict[str, Any]) -> str: """Store memory in primary shard""" memory_id, shard_id = await self.shard_manager.store_memory(memory_data) # Asynchronously replicate to read replicas asyncio.create_task(self._replicate_to_replicas(memory_data, shard_id)) return memory_id async def search_memories(self, query: str, **kwargs) -> List[Dict[str, Any]]: """Search using read replicas for better performance""" # Choose replica based on load balancing replica_manager = self._choose_replica_manager() if replica_manager: try: return await replica_manager.search_memories(query, **kwargs) except Exception as e: logging.warning(f"Replica search failed, falling back to primary: {e}") # Fallback to primary return await self.shard_manager.search_memories(query, **kwargs) def _choose_replica_manager(self) -> Optional[MemoryShardManager]: """Choose replica manager based on load balancing""" # Simple round-robin or random selection if not self.replica_managers: return None # For production, implement proper load balancing import random return random.choice(list(self.replica_managers.values())) async def _replicate_to_replicas(self, memory_data: Dict[str, Any], shard_id: str): """Replicate memory to read replicas""" replica_manager = self.replica_managers.get(shard_id) if not replica_manager: return try: await replica_manager.store_memory(memory_data) except Exception as e: logging.error(f"Replication failed for shard {shard_id}: {e}")

Caching Layer Implementation

from typing import Optional, Dict, List, Any import pickle import zlib import json from datetime import datetime, timedelta import hashlib class MemoryCacheLayer: """Multi-level caching for memory operations""" def __init__(self, redis_client: redis.Redis, local_cache_size: int = 1000, default_ttl: int = 3600): self.redis_client = redis_client self.local_cache: Dict[str, Tuple[Any, datetime]] = {} self.local_cache_size = local_cache_size self.default_ttl = default_ttl self.stats = { 'local_hits': 0, 'redis_hits': 0, 'misses': 0, 'stores': 0 } async def get_memory(self, memory_id: str) -> Optional[Dict[str, Any]]: """Get memory with multi-level caching""" # Level 1: Local cache if memory_id in self.local_cache: data, timestamp = self.local_cache[memory_id] if datetime.now() - timestamp < timedelta(seconds=300): # 5 min TTL self.stats['local_hits'] += 1 return data # Level 2: Redis cache try: cached_data = await self.redis_client.get(f"memory:{memory_id}") if cached_data: # Decompress and deserialize decompressed = zlib.decompress(cached_data) memory_data = pickle.loads(decompressed) # Update local cache self._update_local_cache(memory_id, memory_data) self.stats['redis_hits'] += 1 return memory_data except Exception as e: logging.warning(f"Redis cache error: {e}") self.stats['misses'] += 1 return None async def store_memory(self, memory_id: str, memory_data: Dict[str, Any], ttl: Optional[int] = None): """Store memory in cache with compression""" # Store in local cache self._update_local_cache(memory_id, memory_data) # Store in Redis with compression try: # Serialize and compress serialized = pickle.dumps(memory_data) compressed = zlib.compress(serialized, level=6) ttl = ttl or self.default_ttl await self.redis_client.setex( f"memory:{memory_id}", ttl, compressed ) self.stats['stores'] += 1 except Exception as e: logging.error(f"Failed to cache memory {memory_id}: {e}") async def store_search_results(self, query_hash: str, results: List[Dict[str, Any]], ttl: int = 300): """Cache search results""" try: serialized = pickle.dumps(results) compressed = zlib.compress(serialized) await self.redis_client.setex( f"search:{query_hash}", ttl, compressed ) except Exception as e: logging.error(f"Failed to cache search results: {e}") async def get_search_results(self, query_hash: str) -> Optional[List[Dict[str, Any]]]: """Get cached search results""" try: cached_data = await self.redis_client.get(f"search:{query_hash}") if cached_data: decompressed = zlib.decompress(cached_data) return pickle.loads(decompressed) except Exception as e: logging.warning(f"Search cache error: {e}") return None def _update_local_cache(self, memory_id: str, memory_data: Dict[str, Any]): """Update local cache with LRU eviction""" # Remove oldest entries if cache is full if len(self.local_cache) >= self.local_cache_size: # Simple LRU: remove oldest entry oldest_key = min( self.local_cache.keys(), key=lambda k: self.local_cache[k][1] ) del self.local_cache[oldest_key] self.local_cache[memory_id] = (memory_data, datetime.now()) def get_cache_stats(self) -> Dict[str, Any]: """Get cache performance statistics""" total_requests = sum([ self.stats['local_hits'], self.stats['redis_hits'], self.stats['misses'] ]) return { **self.stats, 'local_hit_rate': self.stats['local_hits'] / max(total_requests, 1), 'redis_hit_rate': self.stats['redis_hits'] / max(total_requests, 1), 'overall_hit_rate': (self.stats['local_hits'] + self.stats['redis_hits']) / max(total_requests, 1), 'local_cache_size': len(self.local_cache) } class SearchQueryOptimizer: """Optimize and cache search queries""" def __init__(self, redis_client: redis.Redis): self.redis_client = redis_client def generate_query_hash(self, query: str, filters: Dict[str, Any]) -> str: """Generate deterministic hash for query and filters""" query_data = { 'query': query.lower().strip(), 'filters': filters } query_string = json.dumps(query_data, sort_keys=True) return hashlib.md5(query_string.encode()).hexdigest() async def optimize_search_query(self, query: str) -> str: """Optimize search query using common patterns""" # Check for cached optimizations query_hash = hashlib.md5(query.encode()).hexdigest() cached_optimization = await self.redis_client.get(f"query_opt:{query_hash}") if cached_optimization: return cached_optimization.decode() # Simple optimizations optimized = query.strip().lower() # Remove common stop words for better semantic search stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} words = optimized.split() optimized_words = [w for w in words if w not in stop_words] optimized = ' '.join(optimized_words) # Cache the optimization await self.redis_client.setex(f"query_opt:{query_hash}", 3600, optimized) return optimized

Reliability and Resilience

Circuit Breaker Pattern

from typing import Callable, Any, Optional from enum import Enum from datetime import datetime, timedelta import asyncio import logging class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if service recovered class CircuitBreakerConfig: def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, expected_exception: tuple = (Exception,)): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.expected_exception = expected_exception class CircuitBreaker: """Circuit breaker for memory operations""" def __init__(self, name: str, config: CircuitBreakerConfig): self.name = name self.config = config self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time: Optional[datetime] = None self.next_attempt_time: Optional[datetime] = None self.logger = logging.getLogger(f"CircuitBreaker.{name}") async def call(self, func: Callable, *args, **kwargs) -> Any: """Execute function with circuit breaker protection""" if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN self.logger.info(f"Circuit breaker {self.name} transitioning to HALF_OPEN") else: raise Exception(f"Circuit breaker {self.name} is OPEN") try: result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs) self._on_success() return result except self.config.expected_exception as e: self._on_failure() raise e def _should_attempt_reset(self) -> bool: """Check if enough time has passed to attempt reset""" if self.next_attempt_time is None: return True return datetime.now() >= self.next_attempt_time def _on_success(self): """Handle successful operation""" if self.state == CircuitState.HALF_OPEN: self.logger.info(f"Circuit breaker {self.name} recovered, transitioning to CLOSED") self.failure_count = 0 self.last_failure_time = None self.next_attempt_time = None self.state = CircuitState.CLOSED def _on_failure(self): """Handle failed operation""" self.failure_count += 1 self.last_failure_time = datetime.now() if self.failure_count >= self.config.failure_threshold: self.state = CircuitState.OPEN self.next_attempt_time = datetime.now() + timedelta(seconds=self.config.recovery_timeout) self.logger.warning(f"Circuit breaker {self.name} opened due to {self.failure_count} failures") class ResilientMemoryManager: """Memory manager with circuit breaker protection""" def __init__(self, memory_store, cache_layer: Optional[MemoryCacheLayer] = None): self.memory_store = memory_store self.cache_layer = cache_layer # Circuit breakers for different operations self.circuit_breakers = { 'store': CircuitBreaker('memory_store', CircuitBreakerConfig(failure_threshold=3)), 'retrieve': CircuitBreaker('memory_retrieve', CircuitBreakerConfig(failure_threshold=5)), 'search': CircuitBreaker('memory_search', CircuitBreakerConfig(failure_threshold=5)), } async def store_memory(self, memory_data: Dict[str, Any]) -> str: """Store memory with circuit breaker protection""" async def _store_operation(): memory_id = await self.memory_store.store_memory(**memory_data) # Update cache asynchronously if self.cache_layer: asyncio.create_task( self.cache_layer.store_memory(memory_id, memory_data) ) return memory_id return await self.circuit_breakers['store'].call(_store_operation) async def retrieve_memory(self, memory_id: str) -> Optional[Dict[str, Any]]: """Retrieve memory with caching and fallback""" # Try cache first if self.cache_layer: cached_result = await self.cache_layer.get_memory(memory_id) if cached_result: return cached_result # Try database with circuit breaker async def _retrieve_operation(): result = await self.memory_store.retrieve_memory(memory_id) # Cache the result if result and self.cache_layer: asyncio.create_task( self.cache_layer.store_memory(memory_id, result) ) return result try: return await self.circuit_breakers['retrieve'].call(_retrieve_operation) except Exception as e: logging.error(f"Failed to retrieve memory {memory_id}: {e}") return None async def search_memories(self, query: str, **kwargs) -> List[Dict[str, Any]]: """Search memories with caching and circuit breaker""" # Generate cache key if self.cache_layer: optimizer = SearchQueryOptimizer(self.cache_layer.redis_client) query_hash = optimizer.generate_query_hash(query, kwargs) # Try cache first cached_results = await self.cache_layer.get_search_results(query_hash) if cached_results: return cached_results # Try database with circuit breaker async def _search_operation(): results = await self.memory_store.search_memories(query, **kwargs) # Cache the results if self.cache_layer: asyncio.create_task( self.cache_layer.store_search_results(query_hash, results) ) return results try: return await self.circuit_breakers['search'].call(_search_operation) except Exception as e: logging.error(f"Search failed for query '{query}': {e}") return [] def get_health_status(self) -> Dict[str, Any]: """Get health status of circuit breakers""" return { name: { 'state': breaker.state.value, 'failure_count': breaker.failure_count, 'last_failure': breaker.last_failure_time.isoformat() if breaker.last_failure_time else None } for name, breaker in self.circuit_breakers.items() }

Data Backup and Recovery

import asyncio import json import gzip from typing import AsyncGenerator, Dict, Any, List from datetime import datetime, timedelta import logging import boto3 from botocore.exceptions import ClientError class MemoryBackupManager: """Manage memory data backups and recovery""" def __init__(self, memory_store, s3_bucket: Optional[str] = None, backup_interval_hours: int = 24, retention_days: int = 30): self.memory_store = memory_store self.s3_bucket = s3_bucket self.backup_interval = timedelta(hours=backup_interval_hours) self.retention_days = retention_days self.logger = logging.getLogger(__name__) # Initialize S3 client if bucket specified self.s3_client = boto3.client('s3') if s3_bucket else None async def create_backup(self, backup_name: Optional[str] = None) -> str: """Create a full backup of memory data""" if not backup_name: backup_name = f"memory_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" backup_data = { 'metadata': { 'backup_name': backup_name, 'created_at': datetime.now().isoformat(), 'total_memories': 0, 'backup_version': '1.0' }, 'memories': [] } total_memories = 0 # Stream memories for large datasets async for batch in self._get_memory_batches(): backup_data['memories'].extend(batch) total_memories += len(batch) # Log progress for large backups if total_memories % 10000 == 0: self.logger.info(f"Backup progress: {total_memories} memories processed") backup_data['metadata']['total_memories'] = total_memories # Compress and save backup backup_content = json.dumps(backup_data, default=str) compressed_content = gzip.compress(backup_content.encode('utf-8')) # Save locally local_path = f"backups/{backup_name}.json.gz" await self._save_backup_locally(local_path, compressed_content) # Save to S3 if configured if self.s3_client: await self._save_backup_to_s3(backup_name, compressed_content) self.logger.info(f"Backup {backup_name} completed: {total_memories} memories") return backup_name async def restore_backup(self, backup_name: str, target_store=None, dry_run: bool = True) -> Dict[str, Any]: """Restore from backup""" target = target_store or self.memory_store # Load backup data backup_data = await self._load_backup(backup_name) if not backup_data: raise Exception(f"Backup {backup_name} not found") restoration_stats = { 'total_memories': len(backup_data['memories']), 'restored_memories': 0, 'failed_memories': 0, 'errors': [] } if dry_run: self.logger.info(f"DRY RUN: Would restore {restoration_stats['total_memories']} memories") return restoration_stats # Restore memories in batches batch_size = 100 for i in range(0, len(backup_data['memories']), batch_size): batch = backup_data['memories'][i:i + batch_size] batch_results = await self._restore_memory_batch(batch, target) restoration_stats['restored_memories'] += batch_results['successful'] restoration_stats['failed_memories'] += batch_results['failed'] restoration_stats['errors'].extend(batch_results['errors']) # Progress logging if i % 1000 == 0: self.logger.info(f"Restoration progress: {i} memories processed") self.logger.info(f"Restoration completed: {restoration_stats}") return restoration_stats async def _get_memory_batches(self, batch_size: int = 1000) -> AsyncGenerator[List[Dict], None]: """Stream memory data in batches""" # This depends on your memory store implementation # Example for PostgreSQL with offset/limit offset = 0 while True: if hasattr(self.memory_store, 'get_batch'): batch = await self.memory_store.get_batch(offset=offset, limit=batch_size) else: # Fallback implementation all_memories = await self.memory_store.get_all_memories() batch = all_memories[offset:offset + batch_size] if not batch: break yield batch offset += len(batch) async def _save_backup_locally(self, path: str, content: bytes): """Save backup to local filesystem""" import os import aiofiles os.makedirs(os.path.dirname(path), exist_ok=True) async with aiofiles.open(path, 'wb') as f: await f.write(content) async def _save_backup_to_s3(self, backup_name: str, content: bytes): """Save backup to S3""" try: s3_key = f"memory-backups/{backup_name}.json.gz" loop = asyncio.get_event_loop() await loop.run_in_executor( None, self.s3_client.put_object, { 'Bucket': self.s3_bucket, 'Key': s3_key, 'Body': content, 'ContentType': 'application/gzip', 'Metadata': { 'backup_name': backup_name, 'created_at': datetime.now().isoformat() } } ) self.logger.info(f"Backup saved to S3: s3://{self.s3_bucket}/{s3_key}") except ClientError as e: self.logger.error(f"Failed to save backup to S3: {e}") raise async def _load_backup(self, backup_name: str) -> Optional[Dict[str, Any]]: """Load backup data""" # Try S3 first if available if self.s3_client: try: s3_data = await self._load_backup_from_s3(backup_name) if s3_data: return s3_data except Exception as e: self.logger.warning(f"Failed to load from S3, trying local: {e}") # Try local backup return await self._load_backup_locally(backup_name) async def _load_backup_from_s3(self, backup_name: str) -> Optional[Dict[str, Any]]: """Load backup from S3""" try: s3_key = f"memory-backups/{backup_name}.json.gz" loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, self.s3_client.get_object, {'Bucket': self.s3_bucket, 'Key': s3_key} ) compressed_data = response['Body'].read() json_data = gzip.decompress(compressed_data).decode('utf-8') return json.loads(json_data) except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': return None raise async def _load_backup_locally(self, backup_name: str) -> Optional[Dict[str, Any]]: """Load backup from local filesystem""" import aiofiles path = f"backups/{backup_name}.json.gz" try: async with aiofiles.open(path, 'rb') as f: compressed_data = await f.read() json_data = gzip.decompress(compressed_data).decode('utf-8') return json.loads(json_data) except FileNotFoundError: return None async def _restore_memory_batch(self, batch: List[Dict], target_store) -> Dict[str, Any]: """Restore a batch of memories""" results = {'successful': 0, 'failed': 0, 'errors': []} tasks = [] for memory_data in batch: task = asyncio.create_task( self._restore_single_memory(memory_data, target_store) ) tasks.append(task) batch_results = await asyncio.gather(*tasks, return_exceptions=True) for result in batch_results: if isinstance(result, Exception): results['failed'] += 1 results['errors'].append(str(result)) elif result: results['successful'] += 1 else: results['failed'] += 1 results['errors'].append("Unknown restore failure") return results async def _restore_single_memory(self, memory_data: Dict, target_store) -> bool: """Restore a single memory""" try: await target_store.store_memory(**memory_data) return True except Exception as e: self.logger.error(f"Failed to restore memory {memory_data.get('id', 'unknown')}: {e}") return False async def cleanup_old_backups(self): """Remove old backups based on retention policy""" cutoff_date = datetime.now() - timedelta(days=self.retention_days) # Cleanup local backups await self._cleanup_local_backups(cutoff_date) # Cleanup S3 backups if self.s3_client: await self._cleanup_s3_backups(cutoff_date) async def _cleanup_local_backups(self, cutoff_date: datetime): """Cleanup old local backup files""" import os import aiofiles.os backup_dir = "backups" if not os.path.exists(backup_dir): return for filename in os.listdir(backup_dir): if not filename.endswith('.json.gz'): continue file_path = os.path.join(backup_dir, filename) stat_result = await aiofiles.os.stat(file_path) file_date = datetime.fromtimestamp(stat_result.st_mtime) if file_date < cutoff_date: await aiofiles.os.remove(file_path) self.logger.info(f"Deleted old local backup: {filename}") async def _cleanup_s3_backups(self, cutoff_date: datetime): """Cleanup old S3 backup files""" try: loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, self.s3_client.list_objects_v2, { 'Bucket': self.s3_bucket, 'Prefix': 'memory-backups/' } ) if 'Contents' not in response: return delete_keys = [] for obj in response['Contents']: if obj['LastModified'].replace(tzinfo=None) < cutoff_date: delete_keys.append({'Key': obj['Key']}) if delete_keys: await loop.run_in_executor( None, self.s3_client.delete_objects, { 'Bucket': self.s3_bucket, 'Delete': {'Objects': delete_keys} } ) self.logger.info(f"Deleted {len(delete_keys)} old S3 backups") except ClientError as e: self.logger.error(f"Failed to cleanup S3 backups: {e}")

Monitoring and Observability

Comprehensive Monitoring System

from typing import Dict, Any, Optional, List from dataclasses import dataclass, asdict from datetime import datetime, timedelta import asyncio import json import logging from prometheus_client import Counter, Histogram, Gauge, start_http_server import psutil import time # Prometheus metrics memory_operations_total = Counter('memory_operations_total', 'Total memory operations', ['operation', 'status']) memory_operation_duration = Histogram('memory_operation_duration_seconds', 'Memory operation duration', ['operation']) memory_store_size = Gauge('memory_store_size_bytes', 'Total memory store size') active_memory_connections = Gauge('active_memory_connections', 'Active database connections') memory_cache_hit_rate = Gauge('memory_cache_hit_rate', 'Cache hit rate') @dataclass class MemoryMetrics: """Memory system metrics""" timestamp: datetime total_memories: int memories_by_type: Dict[str, int] operation_counts: Dict[str, int] average_response_times: Dict[str, float] cache_stats: Dict[str, Any] error_rates: Dict[str, float] system_health: Dict[str, Any] class MetricsCollector: """Collect and aggregate memory system metrics""" def __init__(self, memory_manager: ResilientMemoryManager): self.memory_manager = memory_manager self.operation_counts = {} self.response_times = {} self.error_counts = {} self.start_time = time.time() def record_operation(self, operation: str, duration: float, success: bool = True): """Record operation metrics""" # Prometheus metrics status = 'success' if success else 'error' memory_operations_total.labels(operation=operation, status=status).inc() memory_operation_duration.labels(operation=operation).observe(duration) # Internal tracking if operation not in self.operation_counts: self.operation_counts[operation] = {'success': 0, 'error': 0} self.response_times[operation] = [] self.operation_counts[operation][status] += 1 self.response_times[operation].append(duration) # Keep only recent response times (last 1000 operations) if len(self.response_times[operation]) > 1000: self.response_times[operation] = self.response_times[operation][-1000:] async def collect_metrics(self) -> MemoryMetrics: """Collect current metrics snapshot""" # Get cache stats cache_stats = {} if hasattr(self.memory_manager, 'cache_layer') and self.memory_manager.cache_layer: cache_stats = self.memory_manager.cache_layer.get_cache_stats() memory_cache_hit_rate.set(cache_stats.get('overall_hit_rate', 0)) # Calculate error rates error_rates = {} for operation, counts in self.operation_counts.items(): total = counts['success'] + counts['error'] error_rates[operation] = counts['error'] / total if total > 0 else 0 # Calculate average response times avg_response_times = {} for operation, times in self.response_times.items(): avg_response_times[operation] = sum(times) / len(times) if times else 0 # System health system_health = await self._collect_system_health() return MemoryMetrics( timestamp=datetime.now(), total_memories=await self._get_total_memory_count(), memories_by_type=await self._get_memories_by_type(), operation_counts=self.operation_counts.copy(), average_response_times=avg_response_times, cache_stats=cache_stats, error_rates=error_rates, system_health=system_health ) async def _get_total_memory_count(self) -> int: """Get total number of memories""" try: # This depends on your store implementation if hasattr(self.memory_manager.memory_store, 'get_total_count'): return await self.memory_manager.memory_store.get_total_count() return 0 except Exception: return 0 async def _get_memories_by_type(self) -> Dict[str, int]: """Get memory counts by type""" try: if hasattr(self.memory_manager.memory_store, 'get_counts_by_type'): return await self.memory_manager.memory_store.get_counts_by_type() return {} except Exception: return {} async def _collect_system_health(self) -> Dict[str, Any]: """Collect system health metrics""" # CPU and memory usage cpu_percent = psutil.cpu_percent(interval=1) memory_info = psutil.virtual_memory() # Database connection health circuit_breaker_health = self.memory_manager.get_health_status() # Update Prometheus gauges memory_store_size.set(memory_info.used) return { 'cpu_usage_percent': cpu_percent, 'memory_usage_percent': memory_info.percent, 'disk_usage_percent': psutil.disk_usage('/').percent, 'circuit_breakers': circuit_breaker_health, 'uptime_seconds': time.time() - self.start_time } class HealthChecker: """Health check system for memory services""" def __init__(self, memory_manager: ResilientMemoryManager, metrics_collector: MetricsCollector): self.memory_manager = memory_manager self.metrics_collector = metrics_collector self.health_checks = {} async def perform_health_check(self) -> Dict[str, Any]: """Perform comprehensive health check""" health_status = { 'overall_status': 'healthy', 'timestamp': datetime.now().isoformat(), 'checks': {} } # Database connectivity check db_health = await self._check_database_health() health_status['checks']['database'] = db_health # Cache health check cache_health = await self._check_cache_health() health_status['checks']['cache'] = cache_health # Performance health check performance_health = await self._check_performance_health() health_status['checks']['performance'] = performance_health # System resources health check system_health = await self._check_system_resources() health_status['checks']['system'] = system_health # Circuit breaker health cb_health = self._check_circuit_breakers() health_status['checks']['circuit_breakers'] = cb_health # Determine overall status failed_checks = [ name for name, check in health_status['checks'].items() if check.get('status') != 'healthy' ] if failed_checks: if any(check.get('severity') == 'critical' for check in health_status['checks'].values()): health_status['overall_status'] = 'unhealthy' else: health_status['overall_status'] = 'degraded' return health_status async def _check_database_health(self) -> Dict[str, Any]: """Check database connectivity and performance""" try: start_time = time.time() # Test basic operations test_memory = { 'id': f'health_check_{int(time.time())}', 'content': 'Health check memory', 'memory_type': 'working', 'importance': 0.5, 'timestamp': datetime.now() } # Test store operation memory_id = await self.memory_manager.store_memory(test_memory) # Test retrieve operation retrieved = await self.memory_manager.retrieve_memory(memory_id) # Test search operation await self.memory_manager.search_memories('health check', limit=1) # Cleanup if hasattr(self.memory_manager.memory_store, 'delete_memory'): await self.memory_manager.memory_store.delete_memory(memory_id) duration = time.time() - start_time return { 'status': 'healthy', 'response_time_ms': duration * 1000, 'operations_tested': ['store', 'retrieve', 'search'] } except Exception as e: return { 'status': 'unhealthy', 'severity': 'critical', 'error': str(e), 'operations_tested': ['store', 'retrieve', 'search'] } async def _check_cache_health(self) -> Dict[str, Any]: """Check cache layer health""" if not hasattr(self.memory_manager, 'cache_layer') or not self.memory_manager.cache_layer: return {'status': 'not_configured'} try: cache_stats = self.memory_manager.cache_layer.get_cache_stats() hit_rate = cache_stats.get('overall_hit_rate', 0) status = 'healthy' if hit_rate > 0.5 else 'degraded' return { 'status': status, 'hit_rate': hit_rate, 'cache_size': cache_stats.get('local_cache_size', 0), 'stats': cache_stats } except Exception as e: return { 'status': 'unhealthy', 'severity': 'warning', 'error': str(e) } async def _check_performance_health(self) -> Dict[str, Any]: """Check performance metrics""" metrics = await self.metrics_collector.collect_metrics() # Check average response times slow_operations = [] for operation, avg_time in metrics.average_response_times.items(): if avg_time > 1.0: # More than 1 second slow_operations.append({ 'operation': operation, 'avg_time_ms': avg_time * 1000 }) # Check error rates high_error_operations = [] for operation, error_rate in metrics.error_rates.items(): if error_rate > 0.05: # More than 5% error rate high_error_operations.append({ 'operation': operation, 'error_rate': error_rate }) status = 'healthy' if slow_operations or high_error_operations: status = 'degraded' if not high_error_operations else 'unhealthy' return { 'status': status, 'slow_operations': slow_operations, 'high_error_operations': high_error_operations, 'metrics_summary': { 'total_memories': metrics.total_memories, 'avg_response_times': metrics.average_response_times } } async def _check_system_resources(self) -> Dict[str, Any]: """Check system resource usage""" cpu_percent = psutil.cpu_percent(interval=1) memory_percent = psutil.virtual_memory().percent disk_percent = psutil.disk_usage('/').percent warnings = [] if cpu_percent > 80: warnings.append(f"High CPU usage: {cpu_percent}%") if memory_percent > 85: warnings.append(f"High memory usage: {memory_percent}%") if disk_percent > 90: warnings.append(f"High disk usage: {disk_percent}%") status = 'healthy' if warnings: status = 'degraded' if cpu_percent < 95 and memory_percent < 95 else 'unhealthy' return { 'status': status, 'cpu_usage_percent': cpu_percent, 'memory_usage_percent': memory_percent, 'disk_usage_percent': disk_percent, 'warnings': warnings } def _check_circuit_breakers(self) -> Dict[str, Any]: """Check circuit breaker status""" cb_status = self.memory_manager.get_health_status() failed_circuits = [ name for name, status in cb_status.items() if status['state'] != 'closed' ] overall_status = 'healthy' if not failed_circuits else 'unhealthy' return { 'status': overall_status, 'severity': 'critical' if failed_circuits else 'info', 'failed_circuits': failed_circuits, 'circuit_details': cb_status } class AlertManager: """Manage alerts based on metrics and health checks""" def __init__(self, webhook_url: Optional[str] = None, email_config: Optional[Dict[str, str]] = None): self.webhook_url = webhook_url self.email_config = email_config self.alert_history = [] self.alert_thresholds = { 'error_rate': 0.10, 'response_time': 2.0, 'cpu_usage': 90, 'memory_usage': 90, 'disk_usage': 95 } async def check_and_send_alerts(self, metrics: MemoryMetrics, health_status: Dict[str, Any]): """Check metrics and send alerts if thresholds are exceeded""" alerts = [] # Check error rates for operation, error_rate in metrics.error_rates.items(): if error_rate > self.alert_thresholds['error_rate']: alerts.append({ 'type': 'error_rate', 'operation': operation, 'value': error_rate, 'threshold': self.alert_thresholds['error_rate'], 'severity': 'warning' }) # Check response times for operation, response_time in metrics.average_response_times.items(): if response_time > self.alert_thresholds['response_time']: alerts.append({ 'type': 'slow_response', 'operation': operation, 'value': response_time, 'threshold': self.alert_thresholds['response_time'], 'severity': 'warning' }) # Check system resources cpu_usage = metrics.system_health.get('cpu_usage_percent', 0) if cpu_usage > self.alert_thresholds['cpu_usage']: alerts.append({ 'type': 'high_cpu', 'value': cpu_usage, 'threshold': self.alert_thresholds['cpu_usage'], 'severity': 'critical' }) # Check overall health if health_status['overall_status'] == 'unhealthy': alerts.append({ 'type': 'system_unhealthy', 'details': health_status, 'severity': 'critical' }) # Send alerts for alert in alerts: await self._send_alert(alert) async def _send_alert(self, alert: Dict[str, Any]): """Send individual alert""" # Prevent duplicate alerts (simple deduplication) alert_key = f"{alert['type']}_{alert.get('operation', 'system')}" recent_alerts = [ a for a in self.alert_history[-10:] # Check last 10 alerts if a.get('key') == alert_key and (datetime.now() - a['timestamp']).seconds < 300 # Within 5 minutes ] if recent_alerts: return # Skip duplicate alert # Add to history alert['key'] = alert_key alert['timestamp'] = datetime.now() self.alert_history.append(alert) # Keep only last 100 alerts in history if len(self.alert_history) > 100: self.alert_history = self.alert_history[-100:] # Send alert via configured channels if self.webhook_url: await self._send_webhook_alert(alert) if self.email_config: await self._send_email_alert(alert) logging.warning(f"ALERT: {alert}") async def _send_webhook_alert(self, alert: Dict[str, Any]): """Send alert via webhook""" try: import aiohttp payload = { 'alert': alert, 'timestamp': alert['timestamp'].isoformat(), 'service': 'memory_system' } async with aiohttp.ClientSession() as session: async with session.post(self.webhook_url, json=payload) as response: if response.status != 200: logging.error(f"Failed to send webhook alert: {response.status}") except Exception as e: logging.error(f"Webhook alert failed: {e}") async def _send_email_alert(self, alert: Dict[str, Any]): """Send alert via email""" # Implementation would depend on your email service # This is a placeholder logging.info(f"Would send email alert: {alert}") # Example production setup async def setup_production_monitoring(): """Setup complete production monitoring""" # Initialize memory system memory_manager = ResilientMemoryManager(memory_store=your_memory_store) # Start metrics collection metrics_collector = MetricsCollector(memory_manager) # Start Prometheus metrics server start_http_server(8000) # Setup health checking health_checker = HealthChecker(memory_manager, metrics_collector) # Setup alerting alert_manager = AlertManager( webhook_url="https://your-webhook-endpoint.com/alerts", email_config={'smtp_server': 'smtp.example.com'} ) # Background monitoring loop async def monitoring_loop(): while True: try: # Collect metrics metrics = await metrics_collector.collect_metrics() # Perform health check health_status = await health_checker.perform_health_check() # Check for alerts await alert_manager.check_and_send_alerts(metrics, health_status) # Log summary logging.info(f"Monitoring cycle: {health_status['overall_status']}, " f"Total memories: {metrics.total_memories}") await asyncio.sleep(60) # Check every minute except Exception as e: logging.error(f"Monitoring loop error: {e}") await asyncio.sleep(30) # Shorter sleep on error # Start monitoring asyncio.create_task(monitoring_loop()) return { 'memory_manager': memory_manager, 'metrics_collector': metrics_collector, 'health_checker': health_checker, 'alert_manager': alert_manager }

This comprehensive production patterns guide provides enterprise-grade implementations for scaling, reliability, backup/recovery, and monitoring of agent memory systems. All patterns are designed for high-throughput production environments with proper error handling, observability, and operational excellence.