Skip to Content

Hierarchical Memory Pattern

The Hierarchical Memory pattern organizes agent memory into multiple levels or tiers, with different retention policies, access patterns, and detail levels at each tier. This approach mimics human memory systems and enables efficient storage and retrieval across different time scales and importance levels.

Overview

The Hierarchical Memory pattern structures memory as a multi-level system where information flows between levels based on importance, frequency of access, and temporal factors. Key components include:

  • Working Memory: Immediate, highly accessible recent context
  • Short-term Memory: Recent interactions with moderate detail
  • Long-term Memory: Compressed summaries and important information
  • Archival Memory: Permanent storage with minimal detail
  • Promotion/Demotion Rules: Logic for moving information between levels

This pattern balances comprehensive retention with performance efficiency by storing different levels of detail appropriate for each memory tier.

Architecture

from abc import ABC, abstractmethod from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import json import heapq from dataclasses import dataclass @dataclass class MemoryItem: content: str metadata: Dict[str, Any] importance_score: float access_count: int created_at: datetime last_accessed: datetime memory_level: str class MemoryLevel(ABC): """Abstract base class for memory levels""" def __init__(self, name: str, capacity: int, retention_time: Optional[timedelta] = None): self.name = name self.capacity = capacity self.retention_time = retention_time self.items: List[MemoryItem] = [] @abstractmethod def add_item(self, item: MemoryItem) -> bool: """Add item to this memory level""" pass @abstractmethod def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]: """Retrieve relevant items from this level""" pass @abstractmethod def cleanup(self) -> List[MemoryItem]: """Clean up expired/old items, return items to be promoted/demoted""" pass class WorkingMemory(MemoryLevel): """Immediate, high-access memory for current context""" def __init__(self, capacity: int = 20): super().__init__("working", capacity, timedelta(minutes=30)) self.current_context = [] def add_item(self, item: MemoryItem) -> bool: item.memory_level = self.name self.items.append(item) # Maintain capacity by removing oldest items if len(self.items) > self.capacity: # Remove least important old items self.items.sort(key=lambda x: (x.importance_score, x.last_accessed)) removed = self.items.pop(0) return False # Indicate overflow return True def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]: # Working memory returns recent items prioritized by importance recent_items = [ item for item in self.items if datetime.now() - item.created_at < timedelta(minutes=5) ] # Sort by importance and recency recent_items.sort( key=lambda x: (x.importance_score, x.last_accessed), reverse=True ) for item in recent_items[:limit]: item.access_count += 1 item.last_accessed = datetime.now() return recent_items[:limit] def cleanup(self) -> List[MemoryItem]: """Move old items to short-term memory""" current_time = datetime.now() expired_items = [] self.items = [ item for item in self.items if current_time - item.created_at <= self.retention_time or expired_items.append(item) ] return expired_items class ShortTermMemory(MemoryLevel): """Recent interactions with moderate detail retention""" def __init__(self, capacity: int = 100): super().__init__("short_term", capacity, timedelta(days=7)) def add_item(self, item: MemoryItem) -> bool: item.memory_level = self.name # Check if similar item already exists for existing in self.items: if self._are_similar(item, existing): # Merge with existing item self._merge_items(existing, item) return True self.items.append(item) # Handle overflow if len(self.items) > self.capacity: self._handle_overflow() return True def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]: # Simple keyword matching for short-term retrieval matching_items = [] query_words = set(query.lower().split()) for item in self.items: content_words = set(item.content.lower().split()) overlap = len(query_words & content_words) if overlap > 0: # Calculate relevance score relevance = overlap / len(query_words) importance_factor = item.importance_score recency_factor = 1.0 / (1 + (datetime.now() - item.last_accessed).days) combined_score = relevance * importance_factor * recency_factor matching_items.append((combined_score, item)) # Sort by combined score and return top matches matching_items.sort(reverse=True) results = [] for score, item in matching_items[:limit]: item.access_count += 1 item.last_accessed = datetime.now() results.append(item) return results def cleanup(self) -> List[MemoryItem]: """Move important items to long-term, discard unimportant ones""" current_time = datetime.now() items_to_promote = [] items_to_keep = [] for item in self.items: age = current_time - item.created_at if age > self.retention_time: # Decide whether to promote to long-term or discard if item.importance_score > 0.7 or item.access_count > 3: items_to_promote.append(item) # Otherwise, discard (don't add to items_to_keep) else: items_to_keep.append(item) self.items = items_to_keep return items_to_promote def _are_similar(self, item1: MemoryItem, item2: MemoryItem) -> bool: """Check if two items are similar enough to merge""" # Simple similarity check based on content overlap words1 = set(item1.content.lower().split()) words2 = set(item2.content.lower().split()) if len(words1) == 0 or len(words2) == 0: return False overlap = len(words1 & words2) similarity = overlap / max(len(words1), len(words2)) return similarity > 0.6 def _merge_items(self, existing: MemoryItem, new: MemoryItem): """Merge new item into existing item""" existing.access_count += new.access_count existing.importance_score = max(existing.importance_score, new.importance_score) existing.last_accessed = max(existing.last_accessed, new.last_accessed) # Merge metadata existing.metadata.update(new.metadata) def _handle_overflow(self): """Remove least important items when capacity exceeded""" # Sort by importance and access patterns self.items.sort(key=lambda x: (x.importance_score, x.access_count)) # Remove bottom 10% of items remove_count = max(1, len(self.items) // 10) self.items = self.items[remove_count:] class LongTermMemory(MemoryLevel): """Compressed summaries and important information for extended retention""" def __init__(self, capacity: int = 1000): super().__init__("long_term", capacity, timedelta(days=365)) self.summaries = {} # topic -> summary mapping def add_item(self, item: MemoryItem) -> bool: # Compress item content for long-term storage compressed_item = self._compress_item(item) compressed_item.memory_level = self.name # Group similar items into summaries topic = self._extract_topic(compressed_item) if topic in self.summaries: self._update_summary(topic, compressed_item) else: self.summaries[topic] = { 'items': [compressed_item], 'summary': compressed_item.content, 'importance': compressed_item.importance_score, 'last_updated': datetime.now() } self.items.append(compressed_item) # Handle capacity if len(self.items) > self.capacity: self._consolidate_memories() return True def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]: """Retrieve from summaries and individual important items""" query_words = set(query.lower().split()) results = [] # Search in summaries for topic, summary_data in self.summaries.items(): topic_words = set(topic.lower().split()) summary_words = set(summary_data['summary'].lower().split()) overlap_topic = len(query_words & topic_words) overlap_summary = len(query_words & summary_words) if overlap_topic > 0 or overlap_summary > 0: relevance = (overlap_topic + overlap_summary) / len(query_words) score = relevance * summary_data['importance'] # Create synthetic item representing the summary summary_item = MemoryItem( content=f"Summary for {topic}: {summary_data['summary']}", metadata={'type': 'summary', 'topic': topic}, importance_score=summary_data['importance'], access_count=len(summary_data['items']), created_at=summary_data['last_updated'], last_accessed=datetime.now(), memory_level=self.name ) results.append((score, summary_item)) # Search in individual items for item in self.items: if item.content not in [s['summary'] for s in self.summaries.values()]: content_words = set(item.content.lower().split()) overlap = len(query_words & content_words) if overlap > 0: relevance = overlap / len(query_words) score = relevance * item.importance_score results.append((score, item)) # Sort and return top results results.sort(reverse=True) return [item for score, item in results[:limit]] def cleanup(self) -> List[MemoryItem]: """Archive very old items, return items that should be archived""" current_time = datetime.now() items_to_archive = [] # Find very old or low-importance items for archival for item in self.items[:]: # Use slice to avoid modification during iteration age = current_time - item.created_at if (age > timedelta(days=180) and item.importance_score < 0.3): items_to_archive.append(item) self.items.remove(item) return items_to_archive def _compress_item(self, item: MemoryItem) -> MemoryItem: """Compress item content for long-term storage""" # Simple compression: extract key phrases content_words = item.content.split() if len(content_words) > 20: # Keep first 10 and last 10 words, add ellipsis compressed_content = " ".join(content_words[:10]) + " ... " + " ".join(content_words[-10:]) else: compressed_content = item.content return MemoryItem( content=compressed_content, metadata=item.metadata, importance_score=item.importance_score, access_count=item.access_count, created_at=item.created_at, last_accessed=item.last_accessed, memory_level=item.memory_level ) def _extract_topic(self, item: MemoryItem) -> str: """Extract topic from item for grouping""" # Simple topic extraction based on keywords content_words = item.content.lower().split() # Remove common words stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} keywords = [word for word in content_words if word not in stop_words and len(word) > 2] # Return most frequent non-stop words as topic if keywords: return " ".join(keywords[:3]) # Use first 3 keywords as topic else: return "general" def _update_summary(self, topic: str, new_item: MemoryItem): """Update summary with new item""" summary_data = self.summaries[topic] summary_data['items'].append(new_item) summary_data['importance'] = max(summary_data['importance'], new_item.importance_score) summary_data['last_updated'] = datetime.now() # Update summary text (simple concatenation for now) if len(summary_data['items']) > 5: # Create new summary from all items all_content = " ".join([item.content for item in summary_data['items'][-5:]]) summary_data['summary'] = all_content[:200] + "..." # Limit summary length def _consolidate_memories(self): """Consolidate similar memories when capacity exceeded""" # Group items by topic and merge similar ones topic_groups = {} for item in self.items: topic = self._extract_topic(item) if topic not in topic_groups: topic_groups[topic] = [] topic_groups[topic].append(item) # Consolidate each topic group consolidated_items = [] for topic, items in topic_groups.items(): if len(items) > 3: # Create consolidated summary for this topic consolidated = self._create_topic_summary(topic, items) consolidated_items.append(consolidated) else: consolidated_items.extend(items) self.items = consolidated_items[:self.capacity] def _create_topic_summary(self, topic: str, items: List[MemoryItem]) -> MemoryItem: """Create a summary item from multiple related items""" combined_content = f"Summary of {len(items)} memories about {topic}: " important_items = sorted(items, key=lambda x: x.importance_score, reverse=True)[:3] combined_content += " | ".join([item.content[:50] for item in important_items]) avg_importance = sum(item.importance_score for item in items) / len(items) total_access = sum(item.access_count for item in items) most_recent = max(item.last_accessed for item in items) return MemoryItem( content=combined_content, metadata={'type': 'consolidated_summary', 'topic': topic, 'source_count': len(items)}, importance_score=avg_importance, access_count=total_access, created_at=datetime.now(), last_accessed=most_recent, memory_level=self.name ) class HierarchicalMemory: """Main hierarchical memory system coordinating all levels""" def __init__(self): self.working_memory = WorkingMemory() self.short_term_memory = ShortTermMemory() self.long_term_memory = LongTermMemory() self.importance_calculator = ImportanceCalculator() def add_interaction(self, user_input: str, agent_response: str, context: Dict = None): """Add new interaction to memory hierarchy""" # Create memory item content = f"User: {user_input}\nAgent: {agent_response}" importance = self.importance_calculator.calculate_importance( user_input, agent_response, context or {} ) item = MemoryItem( content=content, metadata={'user_input': user_input, 'agent_response': agent_response, **(context or {})}, importance_score=importance, access_count=1, created_at=datetime.now(), last_accessed=datetime.now(), memory_level="working" ) # Add to working memory self.working_memory.add_item(item) def retrieve_context(self, query: str, max_items: int = 10) -> Dict[str, List[MemoryItem]]: """Retrieve relevant context from all memory levels""" context = { 'working': self.working_memory.retrieve(query, limit=3), 'short_term': self.short_term_memory.retrieve(query, limit=4), 'long_term': self.long_term_memory.retrieve(query, limit=3) } # Combine and rank all results all_items = [] for level, items in context.items(): for item in items: all_items.append(item) # Sort by importance and recency all_items.sort(key=lambda x: (x.importance_score, x.last_accessed), reverse=True) # Return top items but preserve level information result = {level: [] for level in context.keys()} for item in all_items[:max_items]: result[item.memory_level].append(item) return result def consolidate_memories(self): """Perform memory consolidation across levels""" # Move items between levels based on age and importance # Working -> Short-term expired_working = self.working_memory.cleanup() for item in expired_working: self.short_term_memory.add_item(item) # Short-term -> Long-term items_to_promote = self.short_term_memory.cleanup() for item in items_to_promote: self.long_term_memory.add_item(item) # Long-term -> Archive (handled internally by long-term memory) self.long_term_memory.cleanup() def get_memory_stats(self) -> Dict: """Get statistics about memory usage""" return { 'working_memory': { 'count': len(self.working_memory.items), 'capacity': self.working_memory.capacity }, 'short_term_memory': { 'count': len(self.short_term_memory.items), 'capacity': self.short_term_memory.capacity }, 'long_term_memory': { 'count': len(self.long_term_memory.items), 'summaries': len(self.long_term_memory.summaries), 'capacity': self.long_term_memory.capacity } } class ImportanceCalculator: """Calculate importance scores for memory items""" def __init__(self): self.emotion_keywords = { 'positive': ['happy', 'excited', 'great', 'awesome', 'love', 'amazing'], 'negative': ['angry', 'frustrated', 'hate', 'terrible', 'awful', 'bad'], 'surprise': ['surprising', 'unexpected', 'wow', 'incredible', 'unbelievable'] } self.entity_keywords = ['name', 'person', 'company', 'place', 'date', 'number'] def calculate_importance(self, user_input: str, agent_response: str, context: Dict) -> float: """Calculate importance score based on various factors""" base_score = 0.5 # Factor 1: Emotional content emotion_score = self._calculate_emotion_score(user_input + " " + agent_response) # Factor 2: Entity presence entity_score = self._calculate_entity_score(user_input + " " + agent_response) # Factor 3: Context markers context_score = self._calculate_context_score(context) # Factor 4: Length and complexity complexity_score = self._calculate_complexity_score(user_input, agent_response) # Combine scores with weights final_score = ( base_score * 0.2 + emotion_score * 0.3 + entity_score * 0.2 + context_score * 0.2 + complexity_score * 0.1 ) return min(1.0, final_score) def _calculate_emotion_score(self, text: str) -> float: """Score based on emotional content""" text_lower = text.lower() emotion_count = 0 for emotion_type, keywords in self.emotion_keywords.items(): for keyword in keywords: if keyword in text_lower: emotion_count += 1 return min(1.0, emotion_count * 0.2) def _calculate_entity_score(self, text: str) -> float: """Score based on named entities and important information""" # Simple heuristic: look for capitalized words, numbers, etc. words = text.split() entity_indicators = 0 for word in words: if word[0].isupper() and len(word) > 2: # Potential proper noun entity_indicators += 1 elif any(char.isdigit() for char in word): # Contains numbers entity_indicators += 1 return min(1.0, entity_indicators * 0.1) def _calculate_context_score(self, context: Dict) -> float: """Score based on context metadata""" score = 0.5 # Check for special context markers if context.get('user_first_time'): score += 0.3 if context.get('error_occurred'): score += 0.2 if context.get('task_completed'): score += 0.2 if context.get('user_sentiment') == 'negative': score += 0.2 return min(1.0, score) def _calculate_complexity_score(self, user_input: str, agent_response: str) -> float: """Score based on interaction complexity""" total_length = len(user_input) + len(agent_response) if total_length < 50: return 0.2 elif total_length < 200: return 0.5 elif total_length < 500: return 0.7 else: return 1.0

Performance Characteristics

Pros

  • Efficient Resource Usage: Appropriate detail levels for each tier
  • Scalable Architecture: Handles growth through hierarchical organization
  • Human-like Memory: Mirrors natural memory consolidation processes
  • Flexible Retention: Different policies for different information types
  • Fast Access: Recent information readily available

Cons

  • Implementation Complexity: More complex than flat memory structures
  • Information Loss: Compression and consolidation may lose details
  • Tuning Required: Multiple parameters need careful adjustment
  • Processing Overhead: Memory consolidation requires background processing
  • Coordination Complexity: Managing transitions between levels

Performance Metrics

# Performance characteristics by level WORKING_MEMORY = { "access_time": "O(1)", "capacity": "10-50 items", "retention": "minutes to hours", "detail_level": "complete" } SHORT_TERM_MEMORY = { "access_time": "O(n)", "capacity": "50-500 items", "retention": "hours to days", "detail_level": "high" } LONG_TERM_MEMORY = { "access_time": "O(log n)", "capacity": "1000+ items", "retention": "weeks to years", "detail_level": "summarized" }

When to Use

Ideal Scenarios

  • Long-running agents with extensive interaction histories
  • Resource-constrained systems needing efficient memory management
  • Applications requiring both speed and comprehensiveness
  • Agents with varying information importance levels
  • Systems needing natural memory behavior patterns
  • Simple, short-lived agents with minimal state
  • Applications requiring complete detail retention
  • Real-time systems intolerant of consolidation delays
  • Uniform importance scenarios where all data is equally valuable

Implementation Examples

Adaptive Hierarchical Memory

class AdaptiveHierarchicalMemory(HierarchicalMemory): def __init__(self): super().__init__() self.performance_monitor = PerformanceMonitor() self.auto_tuner = AutoTuner() def add_interaction(self, user_input: str, agent_response: str, context: Dict = None): # Monitor performance start_time = time.time() super().add_interaction(user_input, agent_response, context) operation_time = time.time() - start_time self.performance_monitor.record_operation('add_interaction', operation_time) # Auto-tune parameters based on performance if self.performance_monitor.should_tune(): self.auto_tuner.adjust_parameters(self) def smart_consolidation(self): """Perform consolidation only when beneficial""" memory_pressure = self._calculate_memory_pressure() retrieval_patterns = self.performance_monitor.get_retrieval_patterns() if memory_pressure > 0.8: # High memory usage self.consolidate_memories() elif self._should_proactive_consolidate(retrieval_patterns): self.consolidate_memories() def _calculate_memory_pressure(self) -> float: """Calculate overall memory pressure across levels""" working_pressure = len(self.working_memory.items) / self.working_memory.capacity short_term_pressure = len(self.short_term_memory.items) / self.short_term_memory.capacity long_term_pressure = len(self.long_term_memory.items) / self.long_term_memory.capacity return (working_pressure + short_term_pressure + long_term_pressure) / 3 def _should_proactive_consolidate(self, retrieval_patterns: Dict) -> bool: """Determine if proactive consolidation would be beneficial""" # If we're frequently accessing older memories, consolidate to improve access old_access_ratio = retrieval_patterns.get('old_access_ratio', 0) return old_access_ratio > 0.3 class PerformanceMonitor: def __init__(self): self.operations = {} self.retrieval_stats = {} def record_operation(self, operation: str, duration: float): if operation not in self.operations: self.operations[operation] = [] self.operations[operation].append(duration) # Keep only recent measurements if len(self.operations[operation]) > 100: self.operations[operation] = self.operations[operation][-50:] def should_tune(self) -> bool: """Determine if auto-tuning should be performed""" if 'add_interaction' in self.operations: recent_times = self.operations['add_interaction'][-10:] if recent_times and sum(recent_times) / len(recent_times) > 0.1: # Average > 100ms return True return False def get_retrieval_patterns(self) -> Dict: """Analyze retrieval patterns to guide consolidation""" return self.retrieval_stats # Simplified for example class AutoTuner: def adjust_parameters(self, memory_system: AdaptiveHierarchicalMemory): """Automatically adjust memory parameters based on performance""" # Adjust working memory capacity based on access patterns if self._working_memory_overloaded(memory_system): memory_system.working_memory.capacity = min(50, memory_system.working_memory.capacity * 1.2) # Adjust importance thresholds for promotion if self._too_much_promotion(memory_system): memory_system.importance_calculator._adjust_thresholds(increase=True) def _working_memory_overloaded(self, memory_system: AdaptiveHierarchicalMemory) -> bool: return (len(memory_system.working_memory.items) / memory_system.working_memory.capacity > 0.9) def _too_much_promotion(self, memory_system: AdaptiveHierarchicalMemory) -> bool: # Check if short-term memory is growing too fast return (len(memory_system.short_term_memory.items) / memory_system.short_term_memory.capacity > 0.8)

Domain-Specific Hierarchical Memory

class CustomerServiceMemory(HierarchicalMemory): """Specialized hierarchical memory for customer service agents""" def __init__(self): super().__init__() self.customer_profiles = {} self.issue_patterns = {} def add_customer_interaction(self, customer_id: str, issue_type: str, user_input: str, agent_response: str): """Add customer service interaction with domain-specific context""" context = { 'customer_id': customer_id, 'issue_type': issue_type, 'interaction_type': 'customer_service' } # Higher importance for first-time customers or escalated issues if customer_id not in self.customer_profiles: context['user_first_time'] = True if 'escalate' in user_input.lower() or 'supervisor' in user_input.lower(): context['escalation'] = True self.add_interaction(user_input, agent_response, context) # Update customer profile self._update_customer_profile(customer_id, issue_type, user_input, agent_response) def get_customer_context(self, customer_id: str, current_issue: str) -> Dict: """Get relevant context for a specific customer""" # Search for customer-specific memories customer_query = f"customer:{customer_id} {current_issue}" context = self.retrieve_context(customer_query) # Add customer profile information if customer_id in self.customer_profiles: profile = self.customer_profiles[customer_id] context['customer_profile'] = profile # Add similar issue patterns if current_issue in self.issue_patterns: context['similar_patterns'] = self.issue_patterns[current_issue] return context def _update_customer_profile(self, customer_id: str, issue_type: str, user_input: str, agent_response: str): """Update customer profile with new interaction""" if customer_id not in self.customer_profiles: self.customer_profiles[customer_id] = { 'first_contact': datetime.now(), 'interaction_count': 0, 'issue_types': set(), 'satisfaction_indicators': [], 'preferred_communication_style': 'neutral' } profile = self.customer_profiles[customer_id] profile['interaction_count'] += 1 profile['issue_types'].add(issue_type) profile['last_contact'] = datetime.now() # Analyze communication style if 'please' in user_input.lower() or 'thank' in user_input.lower(): profile['satisfaction_indicators'].append('polite') elif '!' in user_input or user_input.isupper(): profile['satisfaction_indicators'].append('frustrated') # Update issue patterns if issue_type not in self.issue_patterns: self.issue_patterns[issue_type] = [] self.issue_patterns[issue_type].append({ 'user_input': user_input[:100], # Truncated for pattern analysis 'resolution': agent_response[:100], 'timestamp': datetime.now() })

Multi-Modal Hierarchical Memory

class MultiModalHierarchicalMemory(HierarchicalMemory): """Hierarchical memory supporting text, images, and audio""" def __init__(self): super().__init__() self.modality_processors = { 'text': TextProcessor(), 'image': ImageProcessor(), 'audio': AudioProcessor() } def add_multimodal_interaction(self, interactions: List[Dict]): """Add interaction with multiple modalities""" combined_content = [] combined_importance = 0 metadata = {'modalities': []} for interaction in interactions: modality = interaction['modality'] content = interaction['content'] processor = self.modality_processors[modality] processed_content, importance = processor.process(content) combined_content.append(f"[{modality.upper()}] {processed_content}") combined_importance = max(combined_importance, importance) metadata['modalities'].append(modality) # Create memory item with combined content item = MemoryItem( content=" | ".join(combined_content), metadata=metadata, importance_score=combined_importance, access_count=1, created_at=datetime.now(), last_accessed=datetime.now(), memory_level="working" ) self.working_memory.add_item(item) class TextProcessor: def process(self, text: str) -> Tuple[str, float]: # Standard text processing return text, 0.5 class ImageProcessor: def process(self, image_path: str) -> Tuple[str, float]: # Extract description from image (placeholder) description = f"Image: {image_path}" importance = 0.7 # Images often important return description, importance class AudioProcessor: def process(self, audio_path: str) -> Tuple[str, float]: # Transcribe audio to text (placeholder) transcription = f"Audio transcription from {audio_path}" importance = 0.8 # Audio often high importance return transcription, importance

Best Practices

Memory Level Configuration

class MemoryConfiguration: """Configuration management for hierarchical memory""" def __init__(self, use_case: str = 'general'): self.configurations = { 'general': { 'working_capacity': 20, 'working_retention': timedelta(minutes=30), 'short_term_capacity': 100, 'short_term_retention': timedelta(days=7), 'long_term_capacity': 1000, 'importance_threshold': 0.5 }, 'customer_service': { 'working_capacity': 15, # Focus on current customer 'working_retention': timedelta(hours=2), 'short_term_capacity': 200, # More customer history 'short_term_retention': timedelta(days=30), 'long_term_capacity': 2000, 'importance_threshold': 0.4 # Lower threshold for customer data }, 'research': { 'working_capacity': 50, # Large working set 'working_retention': timedelta(hours=8), 'short_term_capacity': 500, 'short_term_retention': timedelta(days=14), 'long_term_capacity': 5000, 'importance_threshold': 0.3 # Keep more information } } self.config = self.configurations.get(use_case, self.configurations['general']) def create_memory_system(self) -> HierarchicalMemory: """Create configured memory system""" memory = HierarchicalMemory() # Apply configuration memory.working_memory.capacity = self.config['working_capacity'] memory.working_memory.retention_time = self.config['working_retention'] memory.short_term_memory.capacity = self.config['short_term_capacity'] memory.short_term_memory.retention_time = self.config['short_term_retention'] memory.long_term_memory.capacity = self.config['long_term_capacity'] return memory

Memory Maintenance

class MemoryMaintenanceManager: def __init__(self, memory_system: HierarchicalMemory): self.memory_system = memory_system self.maintenance_schedule = { 'consolidation': timedelta(hours=1), 'cleanup': timedelta(hours=6), 'optimization': timedelta(days=1) } self.last_maintenance = {} def perform_maintenance(self): """Perform scheduled maintenance tasks""" current_time = datetime.now() # Check if consolidation is needed if self._should_run_task('consolidation', current_time): self.memory_system.consolidate_memories() self.last_maintenance['consolidation'] = current_time # Check if cleanup is needed if self._should_run_task('cleanup', current_time): self._cleanup_corrupted_memories() self.last_maintenance['cleanup'] = current_time # Check if optimization is needed if self._should_run_task('optimization', current_time): self._optimize_memory_layout() self.last_maintenance['optimization'] = current_time def _should_run_task(self, task_name: str, current_time: datetime) -> bool: if task_name not in self.last_maintenance: return True time_since_last = current_time - self.last_maintenance[task_name] return time_since_last >= self.maintenance_schedule[task_name] def _cleanup_corrupted_memories(self): """Remove memories with invalid data""" for level in [self.memory_system.working_memory, self.memory_system.short_term_memory, self.memory_system.long_term_memory]: valid_items = [] for item in level.items: if self._is_valid_memory_item(item): valid_items.append(item) level.items = valid_items def _is_valid_memory_item(self, item: MemoryItem) -> bool: """Validate memory item integrity""" return ( item.content and isinstance(item.importance_score, (int, float)) and 0 <= item.importance_score <= 1 and isinstance(item.access_count, int) and item.access_count >= 0 ) def _optimize_memory_layout(self): """Optimize memory organization for better performance""" # Sort items within each level for better retrieval performance for level in [self.memory_system.short_term_memory, self.memory_system.long_term_memory]: level.items.sort(key=lambda x: (x.importance_score, x.last_accessed), reverse=True)

Integration with Other Patterns

Hierarchical + Vector Retrieval

class HierarchicalVectorMemory(HierarchicalMemory): def __init__(self): super().__init__() self.vector_indices = { 'working': VectorIndex(dimension=384), 'short_term': VectorIndex(dimension=384), 'long_term': VectorIndex(dimension=384) } self.embedder = SentenceTransformer('all-MiniLM-L6-v2') def add_interaction(self, user_input: str, agent_response: str, context: Dict = None): # Add to hierarchical memory super().add_interaction(user_input, agent_response, context) # Add to appropriate vector index content = f"{user_input} {agent_response}" embedding = self.embedder.encode([content])[0] # Find which level the item was added to recent_item = self.working_memory.items[-1] level_name = recent_item.memory_level if level_name in self.vector_indices: self.vector_indices[level_name].add_vector(embedding, recent_item) def retrieve_context(self, query: str, use_vector: bool = True, max_items: int = 10): if not use_vector: return super().retrieve_context(query, max_items) # Use vector search within each level query_embedding = self.embedder.encode([query])[0] results = {} for level_name, vector_index in self.vector_indices.items(): similar_items = vector_index.search(query_embedding, k=3) results[level_name] = similar_items return results class VectorIndex: def __init__(self, dimension: int): self.dimension = dimension self.index = faiss.IndexFlatIP(dimension) self.items = [] def add_vector(self, embedding: np.ndarray, item: MemoryItem): normalized = embedding / np.linalg.norm(embedding) self.index.add(normalized.reshape(1, -1).astype('float32')) self.items.append(item) def search(self, query_embedding: np.ndarray, k: int = 5): if self.index.ntotal == 0: return [] normalized_query = query_embedding / np.linalg.norm(query_embedding) scores, indices = self.index.search( normalized_query.reshape(1, -1).astype('float32'), min(k, self.index.ntotal) ) results = [] for score, idx in zip(scores[0], indices[0]): if idx < len(self.items): item = self.items[idx] item.last_accessed = datetime.now() item.access_count += 1 results.append(item) return results

Testing and Validation

Unit Tests

import pytest from datetime import datetime, timedelta def test_working_memory_capacity(): working_mem = WorkingMemory(capacity=3) # Add items up to capacity for i in range(3): item = MemoryItem( content=f"Item {i}", metadata={}, importance_score=0.5, access_count=1, created_at=datetime.now(), last_accessed=datetime.now(), memory_level="working" ) assert working_mem.add_item(item) == True # Add one more item (should cause overflow) overflow_item = MemoryItem( content="Overflow item", metadata={}, importance_score=0.5, access_count=1, created_at=datetime.now(), last_accessed=datetime.now(), memory_level="working" ) working_mem.add_item(overflow_item) # Should still have 3 items assert len(working_mem.items) == 3 def test_memory_consolidation(): memory = HierarchicalMemory() # Add items to working memory for i in range(5): memory.add_interaction(f"Input {i}", f"Response {i}") initial_working_count = len(memory.working_memory.items) # Simulate time passing for item in memory.working_memory.items: item.created_at = datetime.now() - timedelta(hours=1) # Perform consolidation memory.consolidate_memories() # Working memory should have fewer items assert len(memory.working_memory.items) < initial_working_count # Short-term memory should have more items assert len(memory.short_term_memory.items) > 0 def test_importance_calculation(): calculator = ImportanceCalculator() # High importance interaction high_importance = calculator.calculate_importance( "I'm really frustrated with this issue!", "I understand your frustration. Let me help resolve this immediately.", {'user_first_time': True} ) # Low importance interaction low_importance = calculator.calculate_importance( "Hello", "Hi there!", {} ) assert high_importance > low_importance assert 0 <= high_importance <= 1 assert 0 <= low_importance <= 1 def test_retrieval_across_levels(): memory = HierarchicalMemory() # Add test interactions memory.add_interaction("Python programming", "Python is a programming language") memory.add_interaction("Machine learning", "ML is a subset of AI") memory.add_interaction("Weather today", "It's sunny outside") # Test retrieval results = memory.retrieve_context("programming", max_items=5) # Should find Python-related interaction all_items = [] for level_items in results.values(): all_items.extend(level_items) assert len(all_items) > 0 assert any("Python" in item.content for item in all_items)

Performance Tests

def test_hierarchical_memory_performance(): import time memory = HierarchicalMemory() # Test insertion performance start_time = time.time() for i in range(1000): memory.add_interaction(f"Input {i}", f"Response {i}") insertion_time = time.time() - start_time print(f"Insertion: {insertion_time:.2f}s for 1000 interactions") # Test retrieval performance start_time = time.time() for i in range(100): results = memory.retrieve_context(f"query {i % 10}") retrieval_time = time.time() - start_time print(f"Retrieval: {retrieval_time:.2f}s for 100 queries") # Test consolidation performance start_time = time.time() memory.consolidate_memories() consolidation_time = time.time() - start_time print(f"Consolidation: {consolidation_time:.2f}s") stats = memory.get_memory_stats() print(f"Memory stats: {stats}")

Migration and Scaling

Migration from Flat Memory

def migrate_to_hierarchical(flat_memory, importance_calculator=None): """Migrate from flat memory structure to hierarchical""" if importance_calculator is None: importance_calculator = ImportanceCalculator() hierarchical = HierarchicalMemory() hierarchical.importance_calculator = importance_calculator # Get all interactions interactions = flat_memory.get_all_interactions() # Sort by timestamp interactions.sort(key=lambda x: x.get('timestamp', datetime.min)) # Add to hierarchical memory for interaction in interactions: hierarchical.add_interaction( interaction['user_input'], interaction['agent_response'], interaction.get('metadata', {}) ) # Perform initial consolidation hierarchical.consolidate_memories() return hierarchical

Distributed Hierarchical Memory

class DistributedHierarchicalMemory: def __init__(self, shard_count: int = 4): self.shards = [HierarchicalMemory() for _ in range(shard_count)] self.shard_count = shard_count def add_interaction(self, user_input: str, agent_response: str, context: Dict = None, shard_key: str = None): # Determine shard if shard_key: shard_id = hash(shard_key) % self.shard_count else: shard_id = hash(user_input) % self.shard_count self.shards[shard_id].add_interaction(user_input, agent_response, context) def retrieve_context(self, query: str, shard_key: str = None): if shard_key: # Query specific shard shard_id = hash(shard_key) % self.shard_count return self.shards[shard_id].retrieve_context(query) else: # Query all shards and merge results all_results = {} for shard in self.shards: shard_results = shard.retrieve_context(query) for level, items in shard_results.items(): if level not in all_results: all_results[level] = [] all_results[level].extend(items) # Re-rank and limit results for level in all_results: all_results[level].sort( key=lambda x: (x.importance_score, x.last_accessed), reverse=True ) all_results[level] = all_results[level][:5] # Limit per level return all_results def consolidate_all_shards(self): """Perform consolidation across all shards""" for shard in self.shards: shard.consolidate_memories()

Next Steps

  1. Design your memory hierarchy based on usage patterns
  2. Configure appropriate capacities and retention policies for each level
  3. Implement importance calculation specific to your domain
  4. Set up automated consolidation and maintenance processes
  5. Monitor memory performance and adjust parameters
  6. Consider integration with vector retrieval or graph memory
  7. Plan for scaling and distribution as your system grows

The Hierarchical Memory pattern provides a sophisticated approach to memory management that can adapt to various applications while maintaining both performance and comprehensiveness.