Hierarchical Memory Pattern
The Hierarchical Memory pattern organizes agent memory into multiple levels or tiers, with different retention policies, access patterns, and detail levels at each tier. This approach mimics human memory systems and enables efficient storage and retrieval across different time scales and importance levels.
Overview
The Hierarchical Memory pattern structures memory as a multi-level system where information flows between levels based on importance, frequency of access, and temporal factors. Key components include:
- Working Memory: Immediate, highly accessible recent context
- Short-term Memory: Recent interactions with moderate detail
- Long-term Memory: Compressed summaries and important information
- Archival Memory: Permanent storage with minimal detail
- Promotion/Demotion Rules: Logic for moving information between levels
This pattern balances comprehensive retention with performance efficiency by storing different levels of detail appropriate for each memory tier.
Architecture
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
import heapq
from dataclasses import dataclass
@dataclass
class MemoryItem:
content: str
metadata: Dict[str, Any]
importance_score: float
access_count: int
created_at: datetime
last_accessed: datetime
memory_level: str
class MemoryLevel(ABC):
"""Abstract base class for memory levels"""
def __init__(self, name: str, capacity: int, retention_time: Optional[timedelta] = None):
self.name = name
self.capacity = capacity
self.retention_time = retention_time
self.items: List[MemoryItem] = []
@abstractmethod
def add_item(self, item: MemoryItem) -> bool:
"""Add item to this memory level"""
pass
@abstractmethod
def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]:
"""Retrieve relevant items from this level"""
pass
@abstractmethod
def cleanup(self) -> List[MemoryItem]:
"""Clean up expired/old items, return items to be promoted/demoted"""
pass
class WorkingMemory(MemoryLevel):
"""Immediate, high-access memory for current context"""
def __init__(self, capacity: int = 20):
super().__init__("working", capacity, timedelta(minutes=30))
self.current_context = []
def add_item(self, item: MemoryItem) -> bool:
item.memory_level = self.name
self.items.append(item)
# Maintain capacity by removing oldest items
if len(self.items) > self.capacity:
# Remove least important old items
self.items.sort(key=lambda x: (x.importance_score, x.last_accessed))
removed = self.items.pop(0)
return False # Indicate overflow
return True
def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]:
# Working memory returns recent items prioritized by importance
recent_items = [
item for item in self.items
if datetime.now() - item.created_at < timedelta(minutes=5)
]
# Sort by importance and recency
recent_items.sort(
key=lambda x: (x.importance_score, x.last_accessed),
reverse=True
)
for item in recent_items[:limit]:
item.access_count += 1
item.last_accessed = datetime.now()
return recent_items[:limit]
def cleanup(self) -> List[MemoryItem]:
"""Move old items to short-term memory"""
current_time = datetime.now()
expired_items = []
self.items = [
item for item in self.items
if current_time - item.created_at <= self.retention_time
or expired_items.append(item)
]
return expired_items
class ShortTermMemory(MemoryLevel):
"""Recent interactions with moderate detail retention"""
def __init__(self, capacity: int = 100):
super().__init__("short_term", capacity, timedelta(days=7))
def add_item(self, item: MemoryItem) -> bool:
item.memory_level = self.name
# Check if similar item already exists
for existing in self.items:
if self._are_similar(item, existing):
# Merge with existing item
self._merge_items(existing, item)
return True
self.items.append(item)
# Handle overflow
if len(self.items) > self.capacity:
self._handle_overflow()
return True
def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]:
# Simple keyword matching for short-term retrieval
matching_items = []
query_words = set(query.lower().split())
for item in self.items:
content_words = set(item.content.lower().split())
overlap = len(query_words & content_words)
if overlap > 0:
# Calculate relevance score
relevance = overlap / len(query_words)
importance_factor = item.importance_score
recency_factor = 1.0 / (1 + (datetime.now() - item.last_accessed).days)
combined_score = relevance * importance_factor * recency_factor
matching_items.append((combined_score, item))
# Sort by combined score and return top matches
matching_items.sort(reverse=True)
results = []
for score, item in matching_items[:limit]:
item.access_count += 1
item.last_accessed = datetime.now()
results.append(item)
return results
def cleanup(self) -> List[MemoryItem]:
"""Move important items to long-term, discard unimportant ones"""
current_time = datetime.now()
items_to_promote = []
items_to_keep = []
for item in self.items:
age = current_time - item.created_at
if age > self.retention_time:
# Decide whether to promote to long-term or discard
if item.importance_score > 0.7 or item.access_count > 3:
items_to_promote.append(item)
# Otherwise, discard (don't add to items_to_keep)
else:
items_to_keep.append(item)
self.items = items_to_keep
return items_to_promote
def _are_similar(self, item1: MemoryItem, item2: MemoryItem) -> bool:
"""Check if two items are similar enough to merge"""
# Simple similarity check based on content overlap
words1 = set(item1.content.lower().split())
words2 = set(item2.content.lower().split())
if len(words1) == 0 or len(words2) == 0:
return False
overlap = len(words1 & words2)
similarity = overlap / max(len(words1), len(words2))
return similarity > 0.6
def _merge_items(self, existing: MemoryItem, new: MemoryItem):
"""Merge new item into existing item"""
existing.access_count += new.access_count
existing.importance_score = max(existing.importance_score, new.importance_score)
existing.last_accessed = max(existing.last_accessed, new.last_accessed)
# Merge metadata
existing.metadata.update(new.metadata)
def _handle_overflow(self):
"""Remove least important items when capacity exceeded"""
# Sort by importance and access patterns
self.items.sort(key=lambda x: (x.importance_score, x.access_count))
# Remove bottom 10% of items
remove_count = max(1, len(self.items) // 10)
self.items = self.items[remove_count:]
class LongTermMemory(MemoryLevel):
"""Compressed summaries and important information for extended retention"""
def __init__(self, capacity: int = 1000):
super().__init__("long_term", capacity, timedelta(days=365))
self.summaries = {} # topic -> summary mapping
def add_item(self, item: MemoryItem) -> bool:
# Compress item content for long-term storage
compressed_item = self._compress_item(item)
compressed_item.memory_level = self.name
# Group similar items into summaries
topic = self._extract_topic(compressed_item)
if topic in self.summaries:
self._update_summary(topic, compressed_item)
else:
self.summaries[topic] = {
'items': [compressed_item],
'summary': compressed_item.content,
'importance': compressed_item.importance_score,
'last_updated': datetime.now()
}
self.items.append(compressed_item)
# Handle capacity
if len(self.items) > self.capacity:
self._consolidate_memories()
return True
def retrieve(self, query: str, limit: int = 5) -> List[MemoryItem]:
"""Retrieve from summaries and individual important items"""
query_words = set(query.lower().split())
results = []
# Search in summaries
for topic, summary_data in self.summaries.items():
topic_words = set(topic.lower().split())
summary_words = set(summary_data['summary'].lower().split())
overlap_topic = len(query_words & topic_words)
overlap_summary = len(query_words & summary_words)
if overlap_topic > 0 or overlap_summary > 0:
relevance = (overlap_topic + overlap_summary) / len(query_words)
score = relevance * summary_data['importance']
# Create synthetic item representing the summary
summary_item = MemoryItem(
content=f"Summary for {topic}: {summary_data['summary']}",
metadata={'type': 'summary', 'topic': topic},
importance_score=summary_data['importance'],
access_count=len(summary_data['items']),
created_at=summary_data['last_updated'],
last_accessed=datetime.now(),
memory_level=self.name
)
results.append((score, summary_item))
# Search in individual items
for item in self.items:
if item.content not in [s['summary'] for s in self.summaries.values()]:
content_words = set(item.content.lower().split())
overlap = len(query_words & content_words)
if overlap > 0:
relevance = overlap / len(query_words)
score = relevance * item.importance_score
results.append((score, item))
# Sort and return top results
results.sort(reverse=True)
return [item for score, item in results[:limit]]
def cleanup(self) -> List[MemoryItem]:
"""Archive very old items, return items that should be archived"""
current_time = datetime.now()
items_to_archive = []
# Find very old or low-importance items for archival
for item in self.items[:]: # Use slice to avoid modification during iteration
age = current_time - item.created_at
if (age > timedelta(days=180) and item.importance_score < 0.3):
items_to_archive.append(item)
self.items.remove(item)
return items_to_archive
def _compress_item(self, item: MemoryItem) -> MemoryItem:
"""Compress item content for long-term storage"""
# Simple compression: extract key phrases
content_words = item.content.split()
if len(content_words) > 20:
# Keep first 10 and last 10 words, add ellipsis
compressed_content = " ".join(content_words[:10]) + " ... " + " ".join(content_words[-10:])
else:
compressed_content = item.content
return MemoryItem(
content=compressed_content,
metadata=item.metadata,
importance_score=item.importance_score,
access_count=item.access_count,
created_at=item.created_at,
last_accessed=item.last_accessed,
memory_level=item.memory_level
)
def _extract_topic(self, item: MemoryItem) -> str:
"""Extract topic from item for grouping"""
# Simple topic extraction based on keywords
content_words = item.content.lower().split()
# Remove common words
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
keywords = [word for word in content_words if word not in stop_words and len(word) > 2]
# Return most frequent non-stop words as topic
if keywords:
return " ".join(keywords[:3]) # Use first 3 keywords as topic
else:
return "general"
def _update_summary(self, topic: str, new_item: MemoryItem):
"""Update summary with new item"""
summary_data = self.summaries[topic]
summary_data['items'].append(new_item)
summary_data['importance'] = max(summary_data['importance'], new_item.importance_score)
summary_data['last_updated'] = datetime.now()
# Update summary text (simple concatenation for now)
if len(summary_data['items']) > 5:
# Create new summary from all items
all_content = " ".join([item.content for item in summary_data['items'][-5:]])
summary_data['summary'] = all_content[:200] + "..." # Limit summary length
def _consolidate_memories(self):
"""Consolidate similar memories when capacity exceeded"""
# Group items by topic and merge similar ones
topic_groups = {}
for item in self.items:
topic = self._extract_topic(item)
if topic not in topic_groups:
topic_groups[topic] = []
topic_groups[topic].append(item)
# Consolidate each topic group
consolidated_items = []
for topic, items in topic_groups.items():
if len(items) > 3:
# Create consolidated summary for this topic
consolidated = self._create_topic_summary(topic, items)
consolidated_items.append(consolidated)
else:
consolidated_items.extend(items)
self.items = consolidated_items[:self.capacity]
def _create_topic_summary(self, topic: str, items: List[MemoryItem]) -> MemoryItem:
"""Create a summary item from multiple related items"""
combined_content = f"Summary of {len(items)} memories about {topic}: "
important_items = sorted(items, key=lambda x: x.importance_score, reverse=True)[:3]
combined_content += " | ".join([item.content[:50] for item in important_items])
avg_importance = sum(item.importance_score for item in items) / len(items)
total_access = sum(item.access_count for item in items)
most_recent = max(item.last_accessed for item in items)
return MemoryItem(
content=combined_content,
metadata={'type': 'consolidated_summary', 'topic': topic, 'source_count': len(items)},
importance_score=avg_importance,
access_count=total_access,
created_at=datetime.now(),
last_accessed=most_recent,
memory_level=self.name
)
class HierarchicalMemory:
"""Main hierarchical memory system coordinating all levels"""
def __init__(self):
self.working_memory = WorkingMemory()
self.short_term_memory = ShortTermMemory()
self.long_term_memory = LongTermMemory()
self.importance_calculator = ImportanceCalculator()
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None):
"""Add new interaction to memory hierarchy"""
# Create memory item
content = f"User: {user_input}\nAgent: {agent_response}"
importance = self.importance_calculator.calculate_importance(
user_input, agent_response, context or {}
)
item = MemoryItem(
content=content,
metadata={'user_input': user_input, 'agent_response': agent_response, **(context or {})},
importance_score=importance,
access_count=1,
created_at=datetime.now(),
last_accessed=datetime.now(),
memory_level="working"
)
# Add to working memory
self.working_memory.add_item(item)
def retrieve_context(self, query: str, max_items: int = 10) -> Dict[str, List[MemoryItem]]:
"""Retrieve relevant context from all memory levels"""
context = {
'working': self.working_memory.retrieve(query, limit=3),
'short_term': self.short_term_memory.retrieve(query, limit=4),
'long_term': self.long_term_memory.retrieve(query, limit=3)
}
# Combine and rank all results
all_items = []
for level, items in context.items():
for item in items:
all_items.append(item)
# Sort by importance and recency
all_items.sort(key=lambda x: (x.importance_score, x.last_accessed), reverse=True)
# Return top items but preserve level information
result = {level: [] for level in context.keys()}
for item in all_items[:max_items]:
result[item.memory_level].append(item)
return result
def consolidate_memories(self):
"""Perform memory consolidation across levels"""
# Move items between levels based on age and importance
# Working -> Short-term
expired_working = self.working_memory.cleanup()
for item in expired_working:
self.short_term_memory.add_item(item)
# Short-term -> Long-term
items_to_promote = self.short_term_memory.cleanup()
for item in items_to_promote:
self.long_term_memory.add_item(item)
# Long-term -> Archive (handled internally by long-term memory)
self.long_term_memory.cleanup()
def get_memory_stats(self) -> Dict:
"""Get statistics about memory usage"""
return {
'working_memory': {
'count': len(self.working_memory.items),
'capacity': self.working_memory.capacity
},
'short_term_memory': {
'count': len(self.short_term_memory.items),
'capacity': self.short_term_memory.capacity
},
'long_term_memory': {
'count': len(self.long_term_memory.items),
'summaries': len(self.long_term_memory.summaries),
'capacity': self.long_term_memory.capacity
}
}
class ImportanceCalculator:
"""Calculate importance scores for memory items"""
def __init__(self):
self.emotion_keywords = {
'positive': ['happy', 'excited', 'great', 'awesome', 'love', 'amazing'],
'negative': ['angry', 'frustrated', 'hate', 'terrible', 'awful', 'bad'],
'surprise': ['surprising', 'unexpected', 'wow', 'incredible', 'unbelievable']
}
self.entity_keywords = ['name', 'person', 'company', 'place', 'date', 'number']
def calculate_importance(self, user_input: str, agent_response: str, context: Dict) -> float:
"""Calculate importance score based on various factors"""
base_score = 0.5
# Factor 1: Emotional content
emotion_score = self._calculate_emotion_score(user_input + " " + agent_response)
# Factor 2: Entity presence
entity_score = self._calculate_entity_score(user_input + " " + agent_response)
# Factor 3: Context markers
context_score = self._calculate_context_score(context)
# Factor 4: Length and complexity
complexity_score = self._calculate_complexity_score(user_input, agent_response)
# Combine scores with weights
final_score = (
base_score * 0.2 +
emotion_score * 0.3 +
entity_score * 0.2 +
context_score * 0.2 +
complexity_score * 0.1
)
return min(1.0, final_score)
def _calculate_emotion_score(self, text: str) -> float:
"""Score based on emotional content"""
text_lower = text.lower()
emotion_count = 0
for emotion_type, keywords in self.emotion_keywords.items():
for keyword in keywords:
if keyword in text_lower:
emotion_count += 1
return min(1.0, emotion_count * 0.2)
def _calculate_entity_score(self, text: str) -> float:
"""Score based on named entities and important information"""
# Simple heuristic: look for capitalized words, numbers, etc.
words = text.split()
entity_indicators = 0
for word in words:
if word[0].isupper() and len(word) > 2: # Potential proper noun
entity_indicators += 1
elif any(char.isdigit() for char in word): # Contains numbers
entity_indicators += 1
return min(1.0, entity_indicators * 0.1)
def _calculate_context_score(self, context: Dict) -> float:
"""Score based on context metadata"""
score = 0.5
# Check for special context markers
if context.get('user_first_time'):
score += 0.3
if context.get('error_occurred'):
score += 0.2
if context.get('task_completed'):
score += 0.2
if context.get('user_sentiment') == 'negative':
score += 0.2
return min(1.0, score)
def _calculate_complexity_score(self, user_input: str, agent_response: str) -> float:
"""Score based on interaction complexity"""
total_length = len(user_input) + len(agent_response)
if total_length < 50:
return 0.2
elif total_length < 200:
return 0.5
elif total_length < 500:
return 0.7
else:
return 1.0Performance Characteristics
Pros
- Efficient Resource Usage: Appropriate detail levels for each tier
- Scalable Architecture: Handles growth through hierarchical organization
- Human-like Memory: Mirrors natural memory consolidation processes
- Flexible Retention: Different policies for different information types
- Fast Access: Recent information readily available
Cons
- Implementation Complexity: More complex than flat memory structures
- Information Loss: Compression and consolidation may lose details
- Tuning Required: Multiple parameters need careful adjustment
- Processing Overhead: Memory consolidation requires background processing
- Coordination Complexity: Managing transitions between levels
Performance Metrics
# Performance characteristics by level
WORKING_MEMORY = {
"access_time": "O(1)",
"capacity": "10-50 items",
"retention": "minutes to hours",
"detail_level": "complete"
}
SHORT_TERM_MEMORY = {
"access_time": "O(n)",
"capacity": "50-500 items",
"retention": "hours to days",
"detail_level": "high"
}
LONG_TERM_MEMORY = {
"access_time": "O(log n)",
"capacity": "1000+ items",
"retention": "weeks to years",
"detail_level": "summarized"
}When to Use
Ideal Scenarios
- Long-running agents with extensive interaction histories
- Resource-constrained systems needing efficient memory management
- Applications requiring both speed and comprehensiveness
- Agents with varying information importance levels
- Systems needing natural memory behavior patterns
Not Recommended For
- Simple, short-lived agents with minimal state
- Applications requiring complete detail retention
- Real-time systems intolerant of consolidation delays
- Uniform importance scenarios where all data is equally valuable
Implementation Examples
Adaptive Hierarchical Memory
class AdaptiveHierarchicalMemory(HierarchicalMemory):
def __init__(self):
super().__init__()
self.performance_monitor = PerformanceMonitor()
self.auto_tuner = AutoTuner()
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None):
# Monitor performance
start_time = time.time()
super().add_interaction(user_input, agent_response, context)
operation_time = time.time() - start_time
self.performance_monitor.record_operation('add_interaction', operation_time)
# Auto-tune parameters based on performance
if self.performance_monitor.should_tune():
self.auto_tuner.adjust_parameters(self)
def smart_consolidation(self):
"""Perform consolidation only when beneficial"""
memory_pressure = self._calculate_memory_pressure()
retrieval_patterns = self.performance_monitor.get_retrieval_patterns()
if memory_pressure > 0.8: # High memory usage
self.consolidate_memories()
elif self._should_proactive_consolidate(retrieval_patterns):
self.consolidate_memories()
def _calculate_memory_pressure(self) -> float:
"""Calculate overall memory pressure across levels"""
working_pressure = len(self.working_memory.items) / self.working_memory.capacity
short_term_pressure = len(self.short_term_memory.items) / self.short_term_memory.capacity
long_term_pressure = len(self.long_term_memory.items) / self.long_term_memory.capacity
return (working_pressure + short_term_pressure + long_term_pressure) / 3
def _should_proactive_consolidate(self, retrieval_patterns: Dict) -> bool:
"""Determine if proactive consolidation would be beneficial"""
# If we're frequently accessing older memories, consolidate to improve access
old_access_ratio = retrieval_patterns.get('old_access_ratio', 0)
return old_access_ratio > 0.3
class PerformanceMonitor:
def __init__(self):
self.operations = {}
self.retrieval_stats = {}
def record_operation(self, operation: str, duration: float):
if operation not in self.operations:
self.operations[operation] = []
self.operations[operation].append(duration)
# Keep only recent measurements
if len(self.operations[operation]) > 100:
self.operations[operation] = self.operations[operation][-50:]
def should_tune(self) -> bool:
"""Determine if auto-tuning should be performed"""
if 'add_interaction' in self.operations:
recent_times = self.operations['add_interaction'][-10:]
if recent_times and sum(recent_times) / len(recent_times) > 0.1: # Average > 100ms
return True
return False
def get_retrieval_patterns(self) -> Dict:
"""Analyze retrieval patterns to guide consolidation"""
return self.retrieval_stats # Simplified for example
class AutoTuner:
def adjust_parameters(self, memory_system: AdaptiveHierarchicalMemory):
"""Automatically adjust memory parameters based on performance"""
# Adjust working memory capacity based on access patterns
if self._working_memory_overloaded(memory_system):
memory_system.working_memory.capacity = min(50, memory_system.working_memory.capacity * 1.2)
# Adjust importance thresholds for promotion
if self._too_much_promotion(memory_system):
memory_system.importance_calculator._adjust_thresholds(increase=True)
def _working_memory_overloaded(self, memory_system: AdaptiveHierarchicalMemory) -> bool:
return (len(memory_system.working_memory.items) /
memory_system.working_memory.capacity > 0.9)
def _too_much_promotion(self, memory_system: AdaptiveHierarchicalMemory) -> bool:
# Check if short-term memory is growing too fast
return (len(memory_system.short_term_memory.items) /
memory_system.short_term_memory.capacity > 0.8)Domain-Specific Hierarchical Memory
class CustomerServiceMemory(HierarchicalMemory):
"""Specialized hierarchical memory for customer service agents"""
def __init__(self):
super().__init__()
self.customer_profiles = {}
self.issue_patterns = {}
def add_customer_interaction(self, customer_id: str, issue_type: str,
user_input: str, agent_response: str):
"""Add customer service interaction with domain-specific context"""
context = {
'customer_id': customer_id,
'issue_type': issue_type,
'interaction_type': 'customer_service'
}
# Higher importance for first-time customers or escalated issues
if customer_id not in self.customer_profiles:
context['user_first_time'] = True
if 'escalate' in user_input.lower() or 'supervisor' in user_input.lower():
context['escalation'] = True
self.add_interaction(user_input, agent_response, context)
# Update customer profile
self._update_customer_profile(customer_id, issue_type, user_input, agent_response)
def get_customer_context(self, customer_id: str, current_issue: str) -> Dict:
"""Get relevant context for a specific customer"""
# Search for customer-specific memories
customer_query = f"customer:{customer_id} {current_issue}"
context = self.retrieve_context(customer_query)
# Add customer profile information
if customer_id in self.customer_profiles:
profile = self.customer_profiles[customer_id]
context['customer_profile'] = profile
# Add similar issue patterns
if current_issue in self.issue_patterns:
context['similar_patterns'] = self.issue_patterns[current_issue]
return context
def _update_customer_profile(self, customer_id: str, issue_type: str,
user_input: str, agent_response: str):
"""Update customer profile with new interaction"""
if customer_id not in self.customer_profiles:
self.customer_profiles[customer_id] = {
'first_contact': datetime.now(),
'interaction_count': 0,
'issue_types': set(),
'satisfaction_indicators': [],
'preferred_communication_style': 'neutral'
}
profile = self.customer_profiles[customer_id]
profile['interaction_count'] += 1
profile['issue_types'].add(issue_type)
profile['last_contact'] = datetime.now()
# Analyze communication style
if 'please' in user_input.lower() or 'thank' in user_input.lower():
profile['satisfaction_indicators'].append('polite')
elif '!' in user_input or user_input.isupper():
profile['satisfaction_indicators'].append('frustrated')
# Update issue patterns
if issue_type not in self.issue_patterns:
self.issue_patterns[issue_type] = []
self.issue_patterns[issue_type].append({
'user_input': user_input[:100], # Truncated for pattern analysis
'resolution': agent_response[:100],
'timestamp': datetime.now()
})Multi-Modal Hierarchical Memory
class MultiModalHierarchicalMemory(HierarchicalMemory):
"""Hierarchical memory supporting text, images, and audio"""
def __init__(self):
super().__init__()
self.modality_processors = {
'text': TextProcessor(),
'image': ImageProcessor(),
'audio': AudioProcessor()
}
def add_multimodal_interaction(self, interactions: List[Dict]):
"""Add interaction with multiple modalities"""
combined_content = []
combined_importance = 0
metadata = {'modalities': []}
for interaction in interactions:
modality = interaction['modality']
content = interaction['content']
processor = self.modality_processors[modality]
processed_content, importance = processor.process(content)
combined_content.append(f"[{modality.upper()}] {processed_content}")
combined_importance = max(combined_importance, importance)
metadata['modalities'].append(modality)
# Create memory item with combined content
item = MemoryItem(
content=" | ".join(combined_content),
metadata=metadata,
importance_score=combined_importance,
access_count=1,
created_at=datetime.now(),
last_accessed=datetime.now(),
memory_level="working"
)
self.working_memory.add_item(item)
class TextProcessor:
def process(self, text: str) -> Tuple[str, float]:
# Standard text processing
return text, 0.5
class ImageProcessor:
def process(self, image_path: str) -> Tuple[str, float]:
# Extract description from image (placeholder)
description = f"Image: {image_path}"
importance = 0.7 # Images often important
return description, importance
class AudioProcessor:
def process(self, audio_path: str) -> Tuple[str, float]:
# Transcribe audio to text (placeholder)
transcription = f"Audio transcription from {audio_path}"
importance = 0.8 # Audio often high importance
return transcription, importanceBest Practices
Memory Level Configuration
class MemoryConfiguration:
"""Configuration management for hierarchical memory"""
def __init__(self, use_case: str = 'general'):
self.configurations = {
'general': {
'working_capacity': 20,
'working_retention': timedelta(minutes=30),
'short_term_capacity': 100,
'short_term_retention': timedelta(days=7),
'long_term_capacity': 1000,
'importance_threshold': 0.5
},
'customer_service': {
'working_capacity': 15, # Focus on current customer
'working_retention': timedelta(hours=2),
'short_term_capacity': 200, # More customer history
'short_term_retention': timedelta(days=30),
'long_term_capacity': 2000,
'importance_threshold': 0.4 # Lower threshold for customer data
},
'research': {
'working_capacity': 50, # Large working set
'working_retention': timedelta(hours=8),
'short_term_capacity': 500,
'short_term_retention': timedelta(days=14),
'long_term_capacity': 5000,
'importance_threshold': 0.3 # Keep more information
}
}
self.config = self.configurations.get(use_case, self.configurations['general'])
def create_memory_system(self) -> HierarchicalMemory:
"""Create configured memory system"""
memory = HierarchicalMemory()
# Apply configuration
memory.working_memory.capacity = self.config['working_capacity']
memory.working_memory.retention_time = self.config['working_retention']
memory.short_term_memory.capacity = self.config['short_term_capacity']
memory.short_term_memory.retention_time = self.config['short_term_retention']
memory.long_term_memory.capacity = self.config['long_term_capacity']
return memoryMemory Maintenance
class MemoryMaintenanceManager:
def __init__(self, memory_system: HierarchicalMemory):
self.memory_system = memory_system
self.maintenance_schedule = {
'consolidation': timedelta(hours=1),
'cleanup': timedelta(hours=6),
'optimization': timedelta(days=1)
}
self.last_maintenance = {}
def perform_maintenance(self):
"""Perform scheduled maintenance tasks"""
current_time = datetime.now()
# Check if consolidation is needed
if self._should_run_task('consolidation', current_time):
self.memory_system.consolidate_memories()
self.last_maintenance['consolidation'] = current_time
# Check if cleanup is needed
if self._should_run_task('cleanup', current_time):
self._cleanup_corrupted_memories()
self.last_maintenance['cleanup'] = current_time
# Check if optimization is needed
if self._should_run_task('optimization', current_time):
self._optimize_memory_layout()
self.last_maintenance['optimization'] = current_time
def _should_run_task(self, task_name: str, current_time: datetime) -> bool:
if task_name not in self.last_maintenance:
return True
time_since_last = current_time - self.last_maintenance[task_name]
return time_since_last >= self.maintenance_schedule[task_name]
def _cleanup_corrupted_memories(self):
"""Remove memories with invalid data"""
for level in [self.memory_system.working_memory,
self.memory_system.short_term_memory,
self.memory_system.long_term_memory]:
valid_items = []
for item in level.items:
if self._is_valid_memory_item(item):
valid_items.append(item)
level.items = valid_items
def _is_valid_memory_item(self, item: MemoryItem) -> bool:
"""Validate memory item integrity"""
return (
item.content and
isinstance(item.importance_score, (int, float)) and
0 <= item.importance_score <= 1 and
isinstance(item.access_count, int) and
item.access_count >= 0
)
def _optimize_memory_layout(self):
"""Optimize memory organization for better performance"""
# Sort items within each level for better retrieval performance
for level in [self.memory_system.short_term_memory,
self.memory_system.long_term_memory]:
level.items.sort(key=lambda x: (x.importance_score, x.last_accessed), reverse=True)Integration with Other Patterns
Hierarchical + Vector Retrieval
class HierarchicalVectorMemory(HierarchicalMemory):
def __init__(self):
super().__init__()
self.vector_indices = {
'working': VectorIndex(dimension=384),
'short_term': VectorIndex(dimension=384),
'long_term': VectorIndex(dimension=384)
}
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None):
# Add to hierarchical memory
super().add_interaction(user_input, agent_response, context)
# Add to appropriate vector index
content = f"{user_input} {agent_response}"
embedding = self.embedder.encode([content])[0]
# Find which level the item was added to
recent_item = self.working_memory.items[-1]
level_name = recent_item.memory_level
if level_name in self.vector_indices:
self.vector_indices[level_name].add_vector(embedding, recent_item)
def retrieve_context(self, query: str, use_vector: bool = True, max_items: int = 10):
if not use_vector:
return super().retrieve_context(query, max_items)
# Use vector search within each level
query_embedding = self.embedder.encode([query])[0]
results = {}
for level_name, vector_index in self.vector_indices.items():
similar_items = vector_index.search(query_embedding, k=3)
results[level_name] = similar_items
return results
class VectorIndex:
def __init__(self, dimension: int):
self.dimension = dimension
self.index = faiss.IndexFlatIP(dimension)
self.items = []
def add_vector(self, embedding: np.ndarray, item: MemoryItem):
normalized = embedding / np.linalg.norm(embedding)
self.index.add(normalized.reshape(1, -1).astype('float32'))
self.items.append(item)
def search(self, query_embedding: np.ndarray, k: int = 5):
if self.index.ntotal == 0:
return []
normalized_query = query_embedding / np.linalg.norm(query_embedding)
scores, indices = self.index.search(
normalized_query.reshape(1, -1).astype('float32'),
min(k, self.index.ntotal)
)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.items):
item = self.items[idx]
item.last_accessed = datetime.now()
item.access_count += 1
results.append(item)
return resultsTesting and Validation
Unit Tests
import pytest
from datetime import datetime, timedelta
def test_working_memory_capacity():
working_mem = WorkingMemory(capacity=3)
# Add items up to capacity
for i in range(3):
item = MemoryItem(
content=f"Item {i}",
metadata={},
importance_score=0.5,
access_count=1,
created_at=datetime.now(),
last_accessed=datetime.now(),
memory_level="working"
)
assert working_mem.add_item(item) == True
# Add one more item (should cause overflow)
overflow_item = MemoryItem(
content="Overflow item",
metadata={},
importance_score=0.5,
access_count=1,
created_at=datetime.now(),
last_accessed=datetime.now(),
memory_level="working"
)
working_mem.add_item(overflow_item)
# Should still have 3 items
assert len(working_mem.items) == 3
def test_memory_consolidation():
memory = HierarchicalMemory()
# Add items to working memory
for i in range(5):
memory.add_interaction(f"Input {i}", f"Response {i}")
initial_working_count = len(memory.working_memory.items)
# Simulate time passing
for item in memory.working_memory.items:
item.created_at = datetime.now() - timedelta(hours=1)
# Perform consolidation
memory.consolidate_memories()
# Working memory should have fewer items
assert len(memory.working_memory.items) < initial_working_count
# Short-term memory should have more items
assert len(memory.short_term_memory.items) > 0
def test_importance_calculation():
calculator = ImportanceCalculator()
# High importance interaction
high_importance = calculator.calculate_importance(
"I'm really frustrated with this issue!",
"I understand your frustration. Let me help resolve this immediately.",
{'user_first_time': True}
)
# Low importance interaction
low_importance = calculator.calculate_importance(
"Hello",
"Hi there!",
{}
)
assert high_importance > low_importance
assert 0 <= high_importance <= 1
assert 0 <= low_importance <= 1
def test_retrieval_across_levels():
memory = HierarchicalMemory()
# Add test interactions
memory.add_interaction("Python programming", "Python is a programming language")
memory.add_interaction("Machine learning", "ML is a subset of AI")
memory.add_interaction("Weather today", "It's sunny outside")
# Test retrieval
results = memory.retrieve_context("programming", max_items=5)
# Should find Python-related interaction
all_items = []
for level_items in results.values():
all_items.extend(level_items)
assert len(all_items) > 0
assert any("Python" in item.content for item in all_items)Performance Tests
def test_hierarchical_memory_performance():
import time
memory = HierarchicalMemory()
# Test insertion performance
start_time = time.time()
for i in range(1000):
memory.add_interaction(f"Input {i}", f"Response {i}")
insertion_time = time.time() - start_time
print(f"Insertion: {insertion_time:.2f}s for 1000 interactions")
# Test retrieval performance
start_time = time.time()
for i in range(100):
results = memory.retrieve_context(f"query {i % 10}")
retrieval_time = time.time() - start_time
print(f"Retrieval: {retrieval_time:.2f}s for 100 queries")
# Test consolidation performance
start_time = time.time()
memory.consolidate_memories()
consolidation_time = time.time() - start_time
print(f"Consolidation: {consolidation_time:.2f}s")
stats = memory.get_memory_stats()
print(f"Memory stats: {stats}")Migration and Scaling
Migration from Flat Memory
def migrate_to_hierarchical(flat_memory, importance_calculator=None):
"""Migrate from flat memory structure to hierarchical"""
if importance_calculator is None:
importance_calculator = ImportanceCalculator()
hierarchical = HierarchicalMemory()
hierarchical.importance_calculator = importance_calculator
# Get all interactions
interactions = flat_memory.get_all_interactions()
# Sort by timestamp
interactions.sort(key=lambda x: x.get('timestamp', datetime.min))
# Add to hierarchical memory
for interaction in interactions:
hierarchical.add_interaction(
interaction['user_input'],
interaction['agent_response'],
interaction.get('metadata', {})
)
# Perform initial consolidation
hierarchical.consolidate_memories()
return hierarchicalDistributed Hierarchical Memory
class DistributedHierarchicalMemory:
def __init__(self, shard_count: int = 4):
self.shards = [HierarchicalMemory() for _ in range(shard_count)]
self.shard_count = shard_count
def add_interaction(self, user_input: str, agent_response: str,
context: Dict = None, shard_key: str = None):
# Determine shard
if shard_key:
shard_id = hash(shard_key) % self.shard_count
else:
shard_id = hash(user_input) % self.shard_count
self.shards[shard_id].add_interaction(user_input, agent_response, context)
def retrieve_context(self, query: str, shard_key: str = None):
if shard_key:
# Query specific shard
shard_id = hash(shard_key) % self.shard_count
return self.shards[shard_id].retrieve_context(query)
else:
# Query all shards and merge results
all_results = {}
for shard in self.shards:
shard_results = shard.retrieve_context(query)
for level, items in shard_results.items():
if level not in all_results:
all_results[level] = []
all_results[level].extend(items)
# Re-rank and limit results
for level in all_results:
all_results[level].sort(
key=lambda x: (x.importance_score, x.last_accessed),
reverse=True
)
all_results[level] = all_results[level][:5] # Limit per level
return all_results
def consolidate_all_shards(self):
"""Perform consolidation across all shards"""
for shard in self.shards:
shard.consolidate_memories()Related Patterns
- Sliding Window: Can serve as working memory level
- Vector Retrieval: Enhance each level with semantic search
- Graph Memory: Use for relationship modeling in long-term memory
- Full History: Archive layer in hierarchical system
- Hybrid Approaches: Combine hierarchical with other patterns
Next Steps
- Design your memory hierarchy based on usage patterns
- Configure appropriate capacities and retention policies for each level
- Implement importance calculation specific to your domain
- Set up automated consolidation and maintenance processes
- Monitor memory performance and adjust parameters
- Consider integration with vector retrieval or graph memory
- Plan for scaling and distribution as your system grows
The Hierarchical Memory pattern provides a sophisticated approach to memory management that can adapt to various applications while maintaining both performance and comprehensiveness.