Hybrid Approaches
Hybrid memory approaches combine multiple memory patterns to leverage the strengths of each while mitigating individual weaknesses. Rather than relying on a single memory architecture, hybrid systems intelligently orchestrate different memory patterns to provide optimal performance across diverse scenarios.
Overview
Hybrid approaches integrate multiple memory patterns such as:
- Vector + Graph: Semantic search enhanced with relationship modeling
- Hierarchical + Vector: Multi-level organization with semantic retrieval
- Sliding Window + Full History: Recent context with comprehensive background
- Graph + Sliding Window: Relationship awareness with efficient recent access
- Multi-Pattern Orchestration: Dynamic selection of memory patterns based on context
The key is intelligent coordination between patterns, with each pattern handling what it does best while contributing to a unified memory experience.
Architecture
Basic Hybrid Memory Framework
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import asyncio
class MemoryPattern(ABC):
"""Abstract base class for memory patterns"""
@abstractmethod
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None) -> bool:
"""Add interaction to this memory pattern"""
pass
@abstractmethod
def retrieve(self, query: str, **kwargs) -> List[Dict]:
"""Retrieve relevant information from this pattern"""
pass
@abstractmethod
def get_capabilities(self) -> Dict[str, bool]:
"""Return capabilities of this memory pattern"""
pass
class HybridMemoryOrchestrator:
"""Orchestrates multiple memory patterns"""
def __init__(self):
self.patterns: Dict[str, MemoryPattern] = {}
self.routing_rules = {}
self.fusion_strategies = {}
self.performance_monitor = PerformanceMonitor()
def register_pattern(self, name: str, pattern: MemoryPattern,
routing_rules: Dict = None, weight: float = 1.0):
"""Register a memory pattern with the orchestrator"""
self.patterns[name] = pattern
self.routing_rules[name] = routing_rules or {}
self.fusion_strategies[name] = {'weight': weight, 'enabled': True}
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None):
"""Add interaction to appropriate memory patterns"""
context = context or {}
# Determine which patterns should handle this interaction
target_patterns = self._route_interaction(user_input, agent_response, context)
# Add to selected patterns
results = {}
for pattern_name in target_patterns:
if pattern_name in self.patterns:
try:
success = self.patterns[pattern_name].add_interaction(
user_input, agent_response, context
)
results[pattern_name] = success
except Exception as e:
print(f"Error adding to {pattern_name}: {e}")
results[pattern_name] = False
# Monitor performance
self.performance_monitor.record_add_operation(results)
return results
def retrieve_context(self, query: str, strategy: str = 'adaptive',
max_items: int = 10, **kwargs) -> Dict[str, List[Dict]]:
"""Retrieve context using specified fusion strategy"""
if strategy == 'adaptive':
return self._adaptive_retrieve(query, max_items, **kwargs)
elif strategy == 'parallel':
return self._parallel_retrieve(query, max_items, **kwargs)
elif strategy == 'cascade':
return self._cascade_retrieve(query, max_items, **kwargs)
elif strategy == 'weighted':
return self._weighted_retrieve(query, max_items, **kwargs)
else:
raise ValueError(f"Unknown retrieval strategy: {strategy}")
def _route_interaction(self, user_input: str, agent_response: str,
context: Dict) -> List[str]:
"""Determine which patterns should handle this interaction"""
target_patterns = []
for pattern_name, rules in self.routing_rules.items():
if self._matches_routing_rules(user_input, agent_response, context, rules):
target_patterns.append(pattern_name)
# Default: add to all enabled patterns if no specific routing
if not target_patterns:
target_patterns = [
name for name, strategy in self.fusion_strategies.items()
if strategy.get('enabled', True)
]
return target_patterns
def _matches_routing_rules(self, user_input: str, agent_response: str,
context: Dict, rules: Dict) -> bool:
"""Check if interaction matches routing rules for a pattern"""
if not rules:
return True
# Check content-based rules
if 'keywords' in rules:
text = f"{user_input} {agent_response}".lower()
keywords = rules['keywords']
if not any(keyword.lower() in text for keyword in keywords):
return False
# Check context-based rules
if 'context_requirements' in rules:
for key, value in rules['context_requirements'].items():
if context.get(key) != value:
return False
# Check interaction type rules
if 'interaction_types' in rules:
interaction_type = context.get('type', 'general')
if interaction_type not in rules['interaction_types']:
return False
return True
def _adaptive_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]:
"""Adaptively choose best patterns for this query"""
query_characteristics = self._analyze_query(query, kwargs)
best_patterns = self._select_patterns_for_query(query_characteristics)
results = {}
total_retrieved = 0
for pattern_name, allocation in best_patterns.items():
if total_retrieved >= max_items:
break
items_for_pattern = min(allocation, max_items - total_retrieved)
try:
pattern_results = self.patterns[pattern_name].retrieve(
query, limit=items_for_pattern, **kwargs
)
results[pattern_name] = pattern_results
total_retrieved += len(pattern_results)
except Exception as e:
print(f"Error retrieving from {pattern_name}: {e}")
results[pattern_name] = []
return results
def _parallel_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]:
"""Retrieve from all patterns in parallel"""
async def retrieve_from_pattern(name: str, pattern: MemoryPattern):
try:
return name, pattern.retrieve(query, limit=max_items//len(self.patterns), **kwargs)
except Exception as e:
print(f"Error retrieving from {name}: {e}")
return name, []
async def parallel_retrieval():
tasks = [
retrieve_from_pattern(name, pattern)
for name, pattern in self.patterns.items()
if self.fusion_strategies[name].get('enabled', True)
]
return await asyncio.gather(*tasks)
# Run parallel retrieval
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
results_list = loop.run_until_complete(parallel_retrieval())
results = dict(results_list)
finally:
loop.close()
return results
def _cascade_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]:
"""Retrieve using cascade strategy - try patterns in priority order"""
pattern_priorities = self._get_pattern_priorities(query)
results = {}
total_retrieved = 0
for pattern_name in pattern_priorities:
if total_retrieved >= max_items:
break
remaining_items = max_items - total_retrieved
try:
pattern_results = self.patterns[pattern_name].retrieve(
query, limit=remaining_items, **kwargs
)
if pattern_results:
results[pattern_name] = pattern_results
total_retrieved += len(pattern_results)
# If we got good results, we might stop here
if self._are_results_sufficient(pattern_results, query):
break
except Exception as e:
print(f"Error retrieving from {pattern_name}: {e}")
return results
def _weighted_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]:
"""Retrieve using weighted combination of all patterns"""
all_results = []
pattern_results = {}
# Get results from all patterns
for pattern_name, pattern in self.patterns.items():
if not self.fusion_strategies[pattern_name].get('enabled', True):
continue
try:
results = pattern.retrieve(query, limit=max_items, **kwargs)
pattern_results[pattern_name] = results
# Add weight information to each result
weight = self.fusion_strategies[pattern_name]['weight']
for result in results:
result['pattern_source'] = pattern_name
result['pattern_weight'] = weight
all_results.append(result)
except Exception as e:
print(f"Error retrieving from {pattern_name}: {e}")
pattern_results[pattern_name] = []
# Sort by weighted score and return top items
all_results.sort(
key=lambda x: x.get('similarity_score', 0.5) * x.get('pattern_weight', 1.0),
reverse=True
)
# Group back by pattern for return
final_results = {name: [] for name in pattern_results.keys()}
for result in all_results[:max_items]:
pattern_name = result['pattern_source']
final_results[pattern_name].append(result)
return final_results
def _analyze_query(self, query: str, kwargs: Dict) -> Dict:
"""Analyze query to determine best retrieval strategy"""
characteristics = {
'length': len(query.split()),
'has_entities': self._contains_entities(query),
'is_semantic': self._is_semantic_query(query),
'is_recent': kwargs.get('prefer_recent', False),
'is_relational': self._is_relational_query(query)
}
return characteristics
def _select_patterns_for_query(self, characteristics: Dict) -> Dict[str, int]:
"""Select best patterns and their allocation for query characteristics"""
allocations = {}
# Vector retrieval good for semantic queries
if characteristics['is_semantic'] and 'vector' in self.patterns:
allocations['vector'] = 6
# Graph memory good for relational queries
if characteristics['is_relational'] and 'graph' in self.patterns:
allocations['graph'] = 5
# Sliding window good for recent context
if characteristics['is_recent'] and 'sliding_window' in self.patterns:
allocations['sliding_window'] = 4
# Full history as fallback
if 'full_history' in self.patterns:
allocations['full_history'] = 3
# Hierarchical for balanced access
if 'hierarchical' in self.patterns:
allocations['hierarchical'] = 4
# Ensure we have at least one pattern
if not allocations:
available_patterns = list(self.patterns.keys())
if available_patterns:
allocations[available_patterns[0]] = 10
return allocations
def _contains_entities(self, query: str) -> bool:
"""Check if query contains named entities"""
# Simple heuristic: capitalized words might be entities
words = query.split()
return any(word[0].isupper() and len(word) > 2 for word in words)
def _is_semantic_query(self, query: str) -> bool:
"""Check if query is semantic in nature"""
semantic_indicators = [
'similar', 'like', 'related', 'about', 'regarding',
'concerning', 'explain', 'what', 'how', 'why'
]
return any(indicator in query.lower() for indicator in semantic_indicators)
def _is_relational_query(self, query: str) -> bool:
"""Check if query involves relationships"""
relation_indicators = [
'connected', 'relationship', 'between', 'and', 'with',
'works at', 'lives in', 'related to', 'associated'
]
return any(indicator in query.lower() for indicator in relation_indicators)
def _get_pattern_priorities(self, query: str) -> List[str]:
"""Get pattern priorities for cascade retrieval"""
characteristics = self._analyze_query(query, {})
priorities = []
# Prioritize based on query characteristics
if characteristics['is_recent']:
priorities.extend(['sliding_window', 'hierarchical'])
if characteristics['is_semantic']:
priorities.append('vector')
if characteristics['is_relational']:
priorities.append('graph')
# Add remaining patterns
for pattern_name in self.patterns.keys():
if pattern_name not in priorities:
priorities.append(pattern_name)
return priorities
def _are_results_sufficient(self, results: List[Dict], query: str) -> bool:
"""Determine if results are sufficient to stop cascade"""
if len(results) < 3:
return False
# Check if top results have high relevance scores
high_relevance_count = sum(
1 for result in results
if result.get('similarity_score', 0) > 0.8
)
return high_relevance_count >= 2
class PerformanceMonitor:
"""Monitor performance of hybrid memory system"""
def __init__(self):
self.metrics = {
'add_operations': [],
'retrieve_operations': [],
'pattern_performance': {}
}
def record_add_operation(self, results: Dict[str, bool]):
"""Record add operation results"""
self.metrics['add_operations'].append({
'timestamp': datetime.now(),
'results': results,
'success_rate': sum(results.values()) / len(results) if results else 0
})
def record_retrieve_operation(self, pattern_name: str, query: str,
result_count: int, duration: float):
"""Record retrieve operation metrics"""
if pattern_name not in self.metrics['pattern_performance']:
self.metrics['pattern_performance'][pattern_name] = []
self.metrics['pattern_performance'][pattern_name].append({
'timestamp': datetime.now(),
'query': query,
'result_count': result_count,
'duration': duration
})
def get_performance_summary(self) -> Dict:
"""Get performance summary across all patterns"""
summary = {}
for pattern_name, operations in self.metrics['pattern_performance'].items():
if not operations:
continue
durations = [op['duration'] for op in operations]
result_counts = [op['result_count'] for op in operations]
summary[pattern_name] = {
'avg_duration': sum(durations) / len(durations),
'avg_result_count': sum(result_counts) / len(result_counts),
'total_operations': len(operations)
}
return summaryCommon Hybrid Patterns
Vector + Graph Hybrid
class VectorGraphHybrid(MemoryPattern):
"""Combines vector similarity search with graph relationship modeling"""
def __init__(self):
self.vector_memory = VectorRetrievalMemory()
self.graph_memory = GraphMemory()
self.entity_extractor = EntityExtractor()
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None) -> bool:
try:
# Add to vector memory for semantic search
self.vector_memory.add_interaction(user_input, agent_response)
# Extract entities and relationships for graph
entities = self.entity_extractor.extract_entities(f"{user_input} {agent_response}")
relationships = self.entity_extractor.extract_relationships(f"{user_input} {agent_response}")
# Add to graph memory
for entity in entities:
self.graph_memory.add_entity(entity['text'], entity['type'])
for rel in relationships:
self.graph_memory.add_relationship(
rel['subject'], rel['object'], rel['predicate']
)
return True
except Exception as e:
print(f"Error in VectorGraphHybrid.add_interaction: {e}")
return False
def retrieve(self, query: str, limit: int = 10, expand_graph: bool = True, **kwargs) -> List[Dict]:
"""Retrieve using both vector similarity and graph expansion"""
results = []
# Get semantically similar results
vector_results = self.vector_memory.retrieve_relevant(query, top_k=limit//2)
# Extract entities from query for graph expansion
query_entities = self.entity_extractor.extract_entities(query)
if expand_graph and query_entities:
# Find related entities through graph
expanded_entities = set()
for entity_info in query_entities:
entity_name = entity_info['text']
neighbors = self.graph_memory.get_neighbors(entity_name, max_distance=2)
expanded_entities.update(neighbors.keys())
# Find interactions involving expanded entities
graph_results = []
for entity in expanded_entities:
entity_interactions = self._find_interactions_with_entity(entity)
graph_results.extend(entity_interactions)
else:
graph_results = []
# Combine and rank results
all_results = self._merge_vector_graph_results(vector_results, graph_results)
return all_results[:limit]
def _merge_vector_graph_results(self, vector_results: List[Dict],
graph_results: List[Dict]) -> List[Dict]:
"""Merge and rank vector and graph results"""
seen_interactions = set()
merged_results = []
# Add vector results with semantic score
for result in vector_results:
interaction_key = (result['user_input'], result['agent_response'])
if interaction_key not in seen_interactions:
result['source'] = 'vector'
result['combined_score'] = result['similarity_score'] * 0.7
merged_results.append(result)
seen_interactions.add(interaction_key)
# Add graph results with relationship score
for result in graph_results:
interaction_key = (result['user_input'], result['agent_response'])
if interaction_key not in seen_interactions:
result['source'] = 'graph'
result['combined_score'] = result.get('relevance_score', 0.5) * 0.6
merged_results.append(result)
seen_interactions.add(interaction_key)
# Sort by combined score
merged_results.sort(key=lambda x: x['combined_score'], reverse=True)
return merged_results
def _find_interactions_with_entity(self, entity: str) -> List[Dict]:
"""Find interactions that mention a specific entity"""
# This would typically involve searching through stored interactions
# Simplified implementation for example
return []
def get_capabilities(self) -> Dict[str, bool]:
return {
'semantic_search': True,
'relationship_modeling': True,
'entity_expansion': True,
'graph_traversal': True
}
class EntityExtractor:
"""Simplified entity and relationship extractor"""
def extract_entities(self, text: str) -> List[Dict]:
# Simplified implementation
import re
entities = []
# Find capitalized words (potential entities)
capitalized_words = re.findall(r'\b[A-Z][a-z]+\b', text)
for word in capitalized_words:
entities.append({
'text': word,
'type': 'PERSON', # Simplified
'confidence': 0.8
})
return entities
def extract_relationships(self, text: str) -> List[Dict]:
# Simplified relationship extraction
relationships = []
# Simple pattern matching for relationships
if ' works at ' in text.lower():
parts = text.lower().split(' works at ')
if len(parts) == 2:
relationships.append({
'subject': parts[0].strip(),
'predicate': 'works_at',
'object': parts[1].strip()
})
return relationshipsHierarchical + Vector Hybrid
class HierarchicalVectorHybrid(MemoryPattern):
"""Combines hierarchical organization with vector retrieval at each level"""
def __init__(self):
self.hierarchical_memory = HierarchicalMemory()
self.vector_indices = {
'working': VectorRetrievalMemory(),
'short_term': VectorRetrievalMemory(),
'long_term': VectorRetrievalMemory()
}
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None) -> bool:
try:
# Add to hierarchical memory
self.hierarchical_memory.add_interaction(user_input, agent_response, context)
# Add to working memory vector index
self.vector_indices['working'].add_interaction(user_input, agent_response)
return True
except Exception as e:
print(f"Error in HierarchicalVectorHybrid.add_interaction: {e}")
return False
def retrieve(self, query: str, limit: int = 10, level_weights: Dict[str, float] = None, **kwargs) -> List[Dict]:
"""Retrieve using vector search within hierarchical levels"""
if level_weights is None:
level_weights = {'working': 0.5, 'short_term': 0.3, 'long_term': 0.2}
all_results = []
for level, weight in level_weights.items():
if level in self.vector_indices:
level_limit = max(1, int(limit * weight))
# Get vector results for this level
level_results = self.vector_indices[level].retrieve_relevant(
query, top_k=level_limit
)
# Add level information and weight results
for result in level_results:
result['memory_level'] = level
result['level_weight'] = weight
result['weighted_score'] = result.get('similarity_score', 0.5) * weight
all_results.append(result)
# Sort by weighted score
all_results.sort(key=lambda x: x['weighted_score'], reverse=True)
return all_results[:limit]
def consolidate_memories(self):
"""Perform memory consolidation and update vector indices"""
# Consolidate hierarchical memory
self.hierarchical_memory.consolidate_memories()
# Move vectors between indices based on memory consolidation
self._sync_vector_indices_with_hierarchy()
def _sync_vector_indices_with_hierarchy(self):
"""Synchronize vector indices with hierarchical memory state"""
# This would involve moving vectors between indices
# as memories are promoted/demoted in the hierarchy
pass
def get_capabilities(self) -> Dict[str, bool]:
return {
'hierarchical_organization': True,
'semantic_search': True,
'level_based_retrieval': True,
'memory_consolidation': True
}Adaptive Multi-Pattern System
class AdaptiveMultiPatternMemory:
"""Dynamically selects and combines memory patterns based on usage patterns"""
def __init__(self):
self.orchestrator = HybridMemoryOrchestrator()
self.usage_analyzer = UsageAnalyzer()
self.pattern_selector = PatternSelector()
# Initialize with multiple patterns
self._setup_default_patterns()
def _setup_default_patterns(self):
"""Set up default memory patterns with routing rules"""
# Vector retrieval for semantic queries
vector_memory = VectorRetrievalMemory()
self.orchestrator.register_pattern(
'vector',
vector_memory,
routing_rules={
'keywords': ['similar', 'like', 'about', 'explain', 'what'],
'interaction_types': ['question', 'explanation_request']
},
weight=1.0
)
# Graph memory for relationship queries
graph_memory = GraphMemory()
self.orchestrator.register_pattern(
'graph',
graph_memory,
routing_rules={
'keywords': ['relationship', 'connected', 'between', 'works at'],
'interaction_types': ['relationship_query', 'entity_query']
},
weight=0.8
)
# Sliding window for recent context
sliding_memory = SlidingWindowMemory(max_size=50)
self.orchestrator.register_pattern(
'sliding_window',
sliding_memory,
routing_rules={
'keywords': ['recent', 'just', 'current', 'now'],
'context_requirements': {'prefer_recent': True}
},
weight=0.9
)
# Hierarchical for general-purpose
hierarchical_memory = HierarchicalMemory()
self.orchestrator.register_pattern(
'hierarchical',
hierarchical_memory,
weight=0.7
)
def add_interaction(self, user_input: str, agent_response: str, context: Dict = None):
"""Add interaction with adaptive pattern selection"""
# Analyze interaction characteristics
interaction_analysis = self.usage_analyzer.analyze_interaction(
user_input, agent_response, context
)
# Update context with analysis results
enriched_context = {**(context or {}), **interaction_analysis}
# Add to orchestrator
results = self.orchestrator.add_interaction(user_input, agent_response, enriched_context)
# Learn from this interaction
self.usage_analyzer.record_interaction(
user_input, agent_response, enriched_context, results
)
return results
def retrieve_context(self, query: str, **kwargs):
"""Retrieve context using adaptive strategy selection"""
# Analyze query to determine best approach
query_analysis = self.usage_analyzer.analyze_query(query, kwargs)
# Select retrieval strategy based on analysis
strategy = self.pattern_selector.select_strategy(query_analysis)
# Execute retrieval
results = self.orchestrator.retrieve_context(query, strategy=strategy, **kwargs)
# Learn from retrieval performance
self.usage_analyzer.record_retrieval(query, strategy, results)
return results
def optimize_patterns(self):
"""Optimize pattern weights and routing based on usage patterns"""
usage_insights = self.usage_analyzer.get_insights()
# Adjust pattern weights based on performance
for pattern_name, performance in usage_insights['pattern_performance'].items():
if performance['effectiveness'] > 0.8:
# Increase weight for high-performing patterns
current_weight = self.orchestrator.fusion_strategies[pattern_name]['weight']
new_weight = min(1.0, current_weight * 1.1)
self.orchestrator.fusion_strategies[pattern_name]['weight'] = new_weight
elif performance['effectiveness'] < 0.4:
# Decrease weight for poor-performing patterns
current_weight = self.orchestrator.fusion_strategies[pattern_name]['weight']
new_weight = max(0.1, current_weight * 0.9)
self.orchestrator.fusion_strategies[pattern_name]['weight'] = new_weight
# Update routing rules based on successful patterns
self._update_routing_rules(usage_insights)
def _update_routing_rules(self, insights: Dict):
"""Update routing rules based on usage insights"""
# This would analyze which patterns work best for which types of queries
# and update routing rules accordingly
pass
class UsageAnalyzer:
"""Analyzes usage patterns to optimize memory system"""
def __init__(self):
self.interaction_history = []
self.retrieval_history = []
self.performance_metrics = {}
def analyze_interaction(self, user_input: str, agent_response: str,
context: Dict) -> Dict:
"""Analyze interaction characteristics"""
analysis = {
'length': len(user_input.split()) + len(agent_response.split()),
'has_entities': self._detect_entities(user_input + " " + agent_response),
'is_question': user_input.strip().endswith('?'),
'sentiment': self._analyze_sentiment(user_input),
'topic': self._extract_topic(user_input + " " + agent_response)
}
return analysis
def analyze_query(self, query: str, kwargs: Dict) -> Dict:
"""Analyze query characteristics for strategy selection"""
analysis = {
'query_length': len(query.split()),
'query_type': self._classify_query_type(query),
'semantic_complexity': self._assess_semantic_complexity(query),
'temporal_focus': self._detect_temporal_focus(query),
'entity_count': len(self._detect_entities(query))
}
return analysis
def record_interaction(self, user_input: str, agent_response: str,
context: Dict, results: Dict):
"""Record interaction for learning"""
self.interaction_history.append({
'timestamp': datetime.now(),
'user_input': user_input,
'agent_response': agent_response,
'context': context,
'pattern_results': results
})
def record_retrieval(self, query: str, strategy: str, results: Dict):
"""Record retrieval for performance analysis"""
self.retrieval_history.append({
'timestamp': datetime.now(),
'query': query,
'strategy': strategy,
'results': results,
'total_results': sum(len(pattern_results) for pattern_results in results.values())
})
def get_insights(self) -> Dict:
"""Get usage insights for optimization"""
return {
'pattern_performance': self._calculate_pattern_performance(),
'query_patterns': self._identify_query_patterns(),
'strategy_effectiveness': self._evaluate_strategy_effectiveness()
}
def _detect_entities(self, text: str) -> List[str]:
"""Simple entity detection"""
import re
return re.findall(r'\b[A-Z][a-z]+\b', text)
def _analyze_sentiment(self, text: str) -> str:
"""Simple sentiment analysis"""
positive_words = ['good', 'great', 'excellent', 'happy', 'pleased']
negative_words = ['bad', 'terrible', 'angry', 'frustrated', 'disappointed']
text_lower = text.lower()
pos_count = sum(1 for word in positive_words if word in text_lower)
neg_count = sum(1 for word in negative_words if word in text_lower)
if pos_count > neg_count:
return 'positive'
elif neg_count > pos_count:
return 'negative'
else:
return 'neutral'
def _extract_topic(self, text: str) -> str:
"""Simple topic extraction"""
# Remove common words and return most frequent remaining words
common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words = [word.lower() for word in text.split() if word.lower() not in common_words and len(word) > 2]
if words:
# Return most common word as topic (simplified)
from collections import Counter
return Counter(words).most_common(1)[0][0]
return 'general'
def _classify_query_type(self, query: str) -> str:
"""Classify query type"""
query_lower = query.lower()
if any(word in query_lower for word in ['what', 'how', 'why', 'when', 'where']):
return 'information_seeking'
elif any(word in query_lower for word in ['find', 'search', 'look']):
return 'search'
elif any(word in query_lower for word in ['relationship', 'connection', 'between']):
return 'relationship'
elif any(word in query_lower for word in ['recent', 'latest', 'current']):
return 'temporal'
else:
return 'general'
def _assess_semantic_complexity(self, query: str) -> float:
"""Assess semantic complexity of query"""
# Simple heuristic based on length and vocabulary
words = query.split()
unique_words = set(words)
complexity = len(unique_words) / max(len(words), 1)
return min(1.0, complexity * 2) # Normalize to 0-1
def _detect_temporal_focus(self, query: str) -> str:
"""Detect temporal focus of query"""
query_lower = query.lower()
if any(word in query_lower for word in ['recent', 'latest', 'current', 'now']):
return 'recent'
elif any(word in query_lower for word in ['history', 'past', 'before', 'previous']):
return 'historical'
else:
return 'general'
def _calculate_pattern_performance(self) -> Dict:
"""Calculate performance metrics for each pattern"""
# Simplified implementation
return {}
def _identify_query_patterns(self) -> Dict:
"""Identify common query patterns"""
# Simplified implementation
return {}
def _evaluate_strategy_effectiveness(self) -> Dict:
"""Evaluate effectiveness of different retrieval strategies"""
# Simplified implementation
return {}
class PatternSelector:
"""Selects optimal retrieval strategy based on query analysis"""
def __init__(self):
self.strategy_rules = {
'adaptive': self._should_use_adaptive,
'parallel': self._should_use_parallel,
'cascade': self._should_use_cascade,
'weighted': self._should_use_weighted
}
def select_strategy(self, query_analysis: Dict) -> str:
"""Select best strategy based on query analysis"""
# Score each strategy
strategy_scores = {}
for strategy, rule_func in self.strategy_rules.items():
strategy_scores[strategy] = rule_func(query_analysis)
# Return strategy with highest score
best_strategy = max(strategy_scores, key=strategy_scores.get)
return best_strategy
def _should_use_adaptive(self, analysis: Dict) -> float:
"""Score for adaptive strategy"""
base_score = 0.7
# Adaptive works well for complex queries
if analysis.get('semantic_complexity', 0) > 0.6:
base_score += 0.2
# Good for information seeking queries
if analysis.get('query_type') == 'information_seeking':
base_score += 0.1
return base_score
def _should_use_parallel(self, analysis: Dict) -> float:
"""Score for parallel strategy"""
base_score = 0.5
# Parallel good when we need comprehensive results
if analysis.get('query_length', 0) > 5:
base_score += 0.2
# Good for complex queries that might benefit from multiple approaches
if analysis.get('semantic_complexity', 0) > 0.7:
base_score += 0.2
return base_score
def _should_use_cascade(self, analysis: Dict) -> float:
"""Score for cascade strategy"""
base_score = 0.6
# Cascade good for temporal queries (try recent first)
if analysis.get('temporal_focus') == 'recent':
base_score += 0.3
# Good for simple queries that might be satisfied quickly
if analysis.get('semantic_complexity', 0) < 0.4:
base_score += 0.1
return base_score
def _should_use_weighted(self, analysis: Dict) -> float:
"""Score for weighted strategy"""
base_score = 0.4
# Weighted good for general queries
if analysis.get('query_type') == 'general':
base_score += 0.3
# Good when we want balanced results
if 0.3 < analysis.get('semantic_complexity', 0) < 0.7:
base_score += 0.2
return base_scorePerformance Characteristics
Pros
- Best of All Worlds: Leverages strengths of multiple patterns
- Adaptive Performance: Can optimize based on usage patterns
- Redundancy and Reliability: Fallback options when one pattern fails
- Flexible Configuration: Can be tuned for specific applications
- Comprehensive Coverage: Handles diverse query types effectively
Cons
- Implementation Complexity: Much more complex than single patterns
- Resource Overhead: Higher memory and computational requirements
- Coordination Challenges: Managing interactions between patterns
- Tuning Difficulty: Many parameters to optimize
- Debugging Complexity: Harder to troubleshoot issues
Performance Metrics
# Performance characteristics vary by component patterns
HYBRID_CHARACTERISTICS = {
"storage_overhead": "1.5-3x single pattern",
"retrieval_latency": "Variable (10ms-500ms)",
"memory_usage": "Proportional to pattern count",
"accuracy": "Higher than individual patterns",
"complexity": "High implementation and maintenance"
}When to Use
Ideal Scenarios
- Production systems requiring high reliability and performance
- Diverse workloads with varying query types and patterns
- Long-term deployments where optimization benefits justify complexity
- Resource-rich environments that can handle increased overhead
- Critical applications where memory quality is paramount
Not Recommended For
- Simple applications with uniform memory requirements
- Resource-constrained environments with tight limits
- Rapid prototypes where simplicity is more important than optimization
- Single-use-case systems where one pattern suffices
Best Practices
Configuration Management
class HybridMemoryConfig:
"""Configuration management for hybrid memory systems"""
def __init__(self, config_file: str = None):
self.default_config = {
'patterns': {
'vector': {
'enabled': True,
'weight': 1.0,
'model': 'all-MiniLM-L6-v2',
'routing_rules': {
'keywords': ['similar', 'like', 'about'],
'min_similarity_threshold': 0.3
}
},
'graph': {
'enabled': True,
'weight': 0.8,
'max_hops': 3,
'routing_rules': {
'keywords': ['relationship', 'connected', 'between']
}
},
'sliding_window': {
'enabled': True,
'weight': 0.9,
'window_size': 50,
'routing_rules': {
'keywords': ['recent', 'current']
}
}
},
'fusion_strategies': {
'default_strategy': 'adaptive',
'parallel_threshold': 0.7,
'cascade_timeout': 5.0
},
'optimization': {
'auto_tune': True,
'tune_interval_hours': 24,
'performance_window_size': 100
}
}
if config_file:
self.config = self._load_config(config_file)
else:
self.config = self.default_config
def _load_config(self, config_file: str) -> Dict:
"""Load configuration from file"""
import json
with open(config_file, 'r') as f:
return json.load(f)
def create_memory_system(self) -> AdaptiveMultiPatternMemory:
"""Create memory system from configuration"""
memory_system = AdaptiveMultiPatternMemory()
# Configure patterns based on config
for pattern_name, pattern_config in self.config['patterns'].items():
if pattern_config.get('enabled', True):
self._configure_pattern(memory_system, pattern_name, pattern_config)
return memory_system
def _configure_pattern(self, memory_system, pattern_name: str, config: Dict):
"""Configure individual pattern"""
# This would create and configure the specific pattern type
passMonitoring and Observability
class HybridMemoryMonitor:
"""Monitoring and observability for hybrid memory systems"""
def __init__(self, memory_system: AdaptiveMultiPatternMemory):
self.memory_system = memory_system
self.metrics_collector = MetricsCollector()
self.alerting = AlertingSystem()
def start_monitoring(self):
"""Start continuous monitoring"""
import threading
import time
def monitor_loop():
while True:
self._collect_metrics()
self._check_alerts()
time.sleep(60) # Monitor every minute
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
def _collect_metrics(self):
"""Collect current metrics"""
metrics = {
'timestamp': datetime.now(),
'pattern_performance': self._get_pattern_performance(),
'memory_usage': self._get_memory_usage(),
'retrieval_latency': self._get_retrieval_latency(),
'success_rates': self._get_success_rates()
}
self.metrics_collector.record_metrics(metrics)
def _check_alerts(self):
"""Check for alert conditions"""
latest_metrics = self.metrics_collector.get_latest_metrics()
# Check for high latency
if latest_metrics.get('retrieval_latency', {}).get('avg', 0) > 500: # 500ms
self.alerting.send_alert('High retrieval latency detected')
# Check for low success rates
success_rates = latest_metrics.get('success_rates', {})
for pattern, rate in success_rates.items():
if rate < 0.8: # 80% success rate threshold
self.alerting.send_alert(f'Low success rate for {pattern}: {rate:.2f}')
# Check for memory pressure
memory_usage = latest_metrics.get('memory_usage', {})
if memory_usage.get('total_mb', 0) > 1000: # 1GB threshold
self.alerting.send_alert('High memory usage detected')
def _get_pattern_performance(self) -> Dict:
"""Get performance metrics for each pattern"""
performance_summary = self.memory_system.orchestrator.performance_monitor.get_performance_summary()
return performance_summary
def _get_memory_usage(self) -> Dict:
"""Get memory usage statistics"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / (1024 * 1024),
'vms_mb': memory_info.vms / (1024 * 1024),
'percent': process.memory_percent()
}
def _get_retrieval_latency(self) -> Dict:
"""Get retrieval latency statistics"""
# Would calculate from recent retrieval operations
return {'avg': 100, 'p95': 200, 'p99': 300} # Placeholder
def _get_success_rates(self) -> Dict:
"""Get success rates for each pattern"""
# Would calculate from recent operations
return {'vector': 0.95, 'graph': 0.88, 'sliding_window': 0.92} # Placeholder
class MetricsCollector:
"""Collect and store metrics"""
def __init__(self):
self.metrics_history = []
def record_metrics(self, metrics: Dict):
"""Record metrics"""
self.metrics_history.append(metrics)
# Keep only last 1000 entries
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
def get_latest_metrics(self) -> Dict:
"""Get latest metrics"""
return self.metrics_history[-1] if self.metrics_history else {}
class AlertingSystem:
"""Simple alerting system"""
def send_alert(self, message: str):
"""Send alert (placeholder implementation)"""
print(f"ALERT: {message} at {datetime.now()}")Testing and Validation
Integration Tests
import pytest
import time
def test_hybrid_memory_integration():
"""Test full hybrid memory system integration"""
memory_system = AdaptiveMultiPatternMemory()
# Add diverse types of interactions
test_interactions = [
("What is machine learning?", "ML is a subset of AI", {'type': 'question'}),
("John works at Apple", "That's interesting", {'type': 'fact'}),
("Show me recent conversations", "Here are recent chats", {'type': 'retrieval', 'prefer_recent': True}),
("How is John related to Apple?", "John works at Apple", {'type': 'relationship_query'})
]
for user_input, agent_response, context in test_interactions:
results = memory_system.add_interaction(user_input, agent_response, context)
assert any(results.values()), f"Failed to add interaction: {user_input}"
# Test different types of retrieval
test_queries = [
("machine learning", {'expected_patterns': ['vector']}),
("John Apple relationship", {'expected_patterns': ['graph']}),
("recent", {'expected_patterns': ['sliding_window']})
]
for query, expectations in test_queries:
results = memory_system.retrieve_context(query)
assert results, f"No results for query: {query}"
# Check that expected patterns were used
pattern_names = list(results.keys())
for expected_pattern in expectations.get('expected_patterns', []):
assert any(expected_pattern in name for name in pattern_names), \
f"Expected pattern {expected_pattern} not used for query: {query}"
def test_pattern_coordination():
"""Test coordination between different patterns"""
orchestrator = HybridMemoryOrchestrator()
# Register test patterns
vector_mock = MockVectorPattern()
graph_mock = MockGraphPattern()
orchestrator.register_pattern('vector', vector_mock)
orchestrator.register_pattern('graph', graph_mock)
# Test routing
results = orchestrator.add_interaction(
"What is similar to machine learning?",
"Deep learning is similar to ML",
{'type': 'semantic_query'}
)
assert 'vector' in results
assert results['vector'] == True
def test_performance_monitoring():
"""Test performance monitoring functionality"""
monitor = PerformanceMonitor()
# Record some operations
monitor.record_add_operation({'pattern1': True, 'pattern2': False})
monitor.record_retrieve_operation('pattern1', 'test query', 5, 0.1)
summary = monitor.get_performance_summary()
assert 'pattern1' in summary
assert summary['pattern1']['total_operations'] == 1
class MockVectorPattern(MemoryPattern):
def add_interaction(self, user_input, agent_response, context=None):
return True
def retrieve(self, query, **kwargs):
return [{'content': f'Vector result for {query}', 'similarity_score': 0.8}]
def get_capabilities(self):
return {'semantic_search': True}
class MockGraphPattern(MemoryPattern):
def add_interaction(self, user_input, agent_response, context=None):
return True
def retrieve(self, query, **kwargs):
return [{'content': f'Graph result for {query}', 'relevance_score': 0.7}]
def get_capabilities(self):
return {'relationship_modeling': True}Migration and Deployment
Gradual Migration Strategy
class GradualMigrationManager:
"""Manage gradual migration to hybrid memory system"""
def __init__(self, existing_memory, target_hybrid_config):
self.existing_memory = existing_memory
self.target_config = target_hybrid_config
self.migration_state = 'not_started'
self.hybrid_system = None
def start_migration(self, migration_strategy: str = 'shadow_mode'):
"""Start migration process"""
if migration_strategy == 'shadow_mode':
self._start_shadow_mode()
elif migration_strategy == 'gradual_cutover':
self._start_gradual_cutover()
elif migration_strategy == 'big_bang':
self._start_big_bang()
else:
raise ValueError(f"Unknown migration strategy: {migration_strategy}")
def _start_shadow_mode(self):
"""Run hybrid system in parallel to collect performance data"""
self.hybrid_system = self._create_hybrid_system()
self.migration_state = 'shadow_mode'
# Copy existing data to hybrid system
self._copy_existing_data()
def _start_gradual_cutover(self):
"""Gradually shift traffic to hybrid system"""
self.hybrid_system = self._create_hybrid_system()
self.migration_state = 'gradual_cutover'
self.cutover_percentage = 10 # Start with 10% of traffic
def _start_big_bang(self):
"""Complete migration in one step"""
self.hybrid_system = self._create_hybrid_system()
self._copy_existing_data()
self.migration_state = 'completed'
def process_interaction(self, user_input: str, agent_response: str, context: Dict = None):
"""Process interaction during migration"""
if self.migration_state == 'shadow_mode':
# Add to both systems
existing_result = self.existing_memory.add_interaction(user_input, agent_response, context)
hybrid_result = self.hybrid_system.add_interaction(user_input, agent_response, context)
return existing_result # Use existing system's result
elif self.migration_state == 'gradual_cutover':
import random
if random.random() < (self.cutover_percentage / 100):
return self.hybrid_system.add_interaction(user_input, agent_response, context)
else:
return self.existing_memory.add_interaction(user_input, agent_response, context)
elif self.migration_state == 'completed':
return self.hybrid_system.add_interaction(user_input, agent_response, context)
else:
return self.existing_memory.add_interaction(user_input, agent_response, context)
def increase_cutover_percentage(self, increment: int = 10):
"""Gradually increase traffic to hybrid system"""
if self.migration_state == 'gradual_cutover':
self.cutover_percentage = min(100, self.cutover_percentage + increment)
if self.cutover_percentage >= 100:
self.migration_state = 'completed'
def _create_hybrid_system(self):
"""Create hybrid system from configuration"""
config = HybridMemoryConfig()
return config.create_memory_system()
def _copy_existing_data(self):
"""Copy data from existing system to hybrid system"""
# Get all interactions from existing system
if hasattr(self.existing_memory, 'get_all_interactions'):
interactions = self.existing_memory.get_all_interactions()
for interaction in interactions:
self.hybrid_system.add_interaction(
interaction.get('user_input', ''),
interaction.get('agent_response', ''),
interaction.get('context', {})
)Related Patterns
- Vector Retrieval: Core component in many hybrid systems
- Graph Memory: Often combined with other patterns for relationship modeling
- Hierarchical Memory: Natural foundation for hybrid organization
- Sliding Window: Provides recent context in hybrid systems
- Full History: Archive component in comprehensive hybrid systems
Next Steps
- Assess your specific use case requirements and constraints
- Choose appropriate base patterns for your hybrid system
- Design coordination and fusion strategies
- Implement gradual migration approach if upgrading existing system
- Set up comprehensive monitoring and observability
- Tune pattern weights and routing rules based on usage patterns
- Plan for ongoing optimization and pattern evolution
Hybrid approaches represent the most sophisticated memory architectures, providing maximum flexibility and capability at the cost of increased complexity. They are best suited for production systems with diverse requirements and the resources to support comprehensive memory management.