Skip to Content

Hybrid Approaches

Hybrid memory approaches combine multiple memory patterns to leverage the strengths of each while mitigating individual weaknesses. Rather than relying on a single memory architecture, hybrid systems intelligently orchestrate different memory patterns to provide optimal performance across diverse scenarios.

Overview

Hybrid approaches integrate multiple memory patterns such as:

  • Vector + Graph: Semantic search enhanced with relationship modeling
  • Hierarchical + Vector: Multi-level organization with semantic retrieval
  • Sliding Window + Full History: Recent context with comprehensive background
  • Graph + Sliding Window: Relationship awareness with efficient recent access
  • Multi-Pattern Orchestration: Dynamic selection of memory patterns based on context

The key is intelligent coordination between patterns, with each pattern handling what it does best while contributing to a unified memory experience.

Architecture

Basic Hybrid Memory Framework

from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional, Union from datetime import datetime import asyncio class MemoryPattern(ABC): """Abstract base class for memory patterns""" @abstractmethod def add_interaction(self, user_input: str, agent_response: str, context: Dict = None) -> bool: """Add interaction to this memory pattern""" pass @abstractmethod def retrieve(self, query: str, **kwargs) -> List[Dict]: """Retrieve relevant information from this pattern""" pass @abstractmethod def get_capabilities(self) -> Dict[str, bool]: """Return capabilities of this memory pattern""" pass class HybridMemoryOrchestrator: """Orchestrates multiple memory patterns""" def __init__(self): self.patterns: Dict[str, MemoryPattern] = {} self.routing_rules = {} self.fusion_strategies = {} self.performance_monitor = PerformanceMonitor() def register_pattern(self, name: str, pattern: MemoryPattern, routing_rules: Dict = None, weight: float = 1.0): """Register a memory pattern with the orchestrator""" self.patterns[name] = pattern self.routing_rules[name] = routing_rules or {} self.fusion_strategies[name] = {'weight': weight, 'enabled': True} def add_interaction(self, user_input: str, agent_response: str, context: Dict = None): """Add interaction to appropriate memory patterns""" context = context or {} # Determine which patterns should handle this interaction target_patterns = self._route_interaction(user_input, agent_response, context) # Add to selected patterns results = {} for pattern_name in target_patterns: if pattern_name in self.patterns: try: success = self.patterns[pattern_name].add_interaction( user_input, agent_response, context ) results[pattern_name] = success except Exception as e: print(f"Error adding to {pattern_name}: {e}") results[pattern_name] = False # Monitor performance self.performance_monitor.record_add_operation(results) return results def retrieve_context(self, query: str, strategy: str = 'adaptive', max_items: int = 10, **kwargs) -> Dict[str, List[Dict]]: """Retrieve context using specified fusion strategy""" if strategy == 'adaptive': return self._adaptive_retrieve(query, max_items, **kwargs) elif strategy == 'parallel': return self._parallel_retrieve(query, max_items, **kwargs) elif strategy == 'cascade': return self._cascade_retrieve(query, max_items, **kwargs) elif strategy == 'weighted': return self._weighted_retrieve(query, max_items, **kwargs) else: raise ValueError(f"Unknown retrieval strategy: {strategy}") def _route_interaction(self, user_input: str, agent_response: str, context: Dict) -> List[str]: """Determine which patterns should handle this interaction""" target_patterns = [] for pattern_name, rules in self.routing_rules.items(): if self._matches_routing_rules(user_input, agent_response, context, rules): target_patterns.append(pattern_name) # Default: add to all enabled patterns if no specific routing if not target_patterns: target_patterns = [ name for name, strategy in self.fusion_strategies.items() if strategy.get('enabled', True) ] return target_patterns def _matches_routing_rules(self, user_input: str, agent_response: str, context: Dict, rules: Dict) -> bool: """Check if interaction matches routing rules for a pattern""" if not rules: return True # Check content-based rules if 'keywords' in rules: text = f"{user_input} {agent_response}".lower() keywords = rules['keywords'] if not any(keyword.lower() in text for keyword in keywords): return False # Check context-based rules if 'context_requirements' in rules: for key, value in rules['context_requirements'].items(): if context.get(key) != value: return False # Check interaction type rules if 'interaction_types' in rules: interaction_type = context.get('type', 'general') if interaction_type not in rules['interaction_types']: return False return True def _adaptive_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]: """Adaptively choose best patterns for this query""" query_characteristics = self._analyze_query(query, kwargs) best_patterns = self._select_patterns_for_query(query_characteristics) results = {} total_retrieved = 0 for pattern_name, allocation in best_patterns.items(): if total_retrieved >= max_items: break items_for_pattern = min(allocation, max_items - total_retrieved) try: pattern_results = self.patterns[pattern_name].retrieve( query, limit=items_for_pattern, **kwargs ) results[pattern_name] = pattern_results total_retrieved += len(pattern_results) except Exception as e: print(f"Error retrieving from {pattern_name}: {e}") results[pattern_name] = [] return results def _parallel_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]: """Retrieve from all patterns in parallel""" async def retrieve_from_pattern(name: str, pattern: MemoryPattern): try: return name, pattern.retrieve(query, limit=max_items//len(self.patterns), **kwargs) except Exception as e: print(f"Error retrieving from {name}: {e}") return name, [] async def parallel_retrieval(): tasks = [ retrieve_from_pattern(name, pattern) for name, pattern in self.patterns.items() if self.fusion_strategies[name].get('enabled', True) ] return await asyncio.gather(*tasks) # Run parallel retrieval loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: results_list = loop.run_until_complete(parallel_retrieval()) results = dict(results_list) finally: loop.close() return results def _cascade_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]: """Retrieve using cascade strategy - try patterns in priority order""" pattern_priorities = self._get_pattern_priorities(query) results = {} total_retrieved = 0 for pattern_name in pattern_priorities: if total_retrieved >= max_items: break remaining_items = max_items - total_retrieved try: pattern_results = self.patterns[pattern_name].retrieve( query, limit=remaining_items, **kwargs ) if pattern_results: results[pattern_name] = pattern_results total_retrieved += len(pattern_results) # If we got good results, we might stop here if self._are_results_sufficient(pattern_results, query): break except Exception as e: print(f"Error retrieving from {pattern_name}: {e}") return results def _weighted_retrieve(self, query: str, max_items: int, **kwargs) -> Dict[str, List[Dict]]: """Retrieve using weighted combination of all patterns""" all_results = [] pattern_results = {} # Get results from all patterns for pattern_name, pattern in self.patterns.items(): if not self.fusion_strategies[pattern_name].get('enabled', True): continue try: results = pattern.retrieve(query, limit=max_items, **kwargs) pattern_results[pattern_name] = results # Add weight information to each result weight = self.fusion_strategies[pattern_name]['weight'] for result in results: result['pattern_source'] = pattern_name result['pattern_weight'] = weight all_results.append(result) except Exception as e: print(f"Error retrieving from {pattern_name}: {e}") pattern_results[pattern_name] = [] # Sort by weighted score and return top items all_results.sort( key=lambda x: x.get('similarity_score', 0.5) * x.get('pattern_weight', 1.0), reverse=True ) # Group back by pattern for return final_results = {name: [] for name in pattern_results.keys()} for result in all_results[:max_items]: pattern_name = result['pattern_source'] final_results[pattern_name].append(result) return final_results def _analyze_query(self, query: str, kwargs: Dict) -> Dict: """Analyze query to determine best retrieval strategy""" characteristics = { 'length': len(query.split()), 'has_entities': self._contains_entities(query), 'is_semantic': self._is_semantic_query(query), 'is_recent': kwargs.get('prefer_recent', False), 'is_relational': self._is_relational_query(query) } return characteristics def _select_patterns_for_query(self, characteristics: Dict) -> Dict[str, int]: """Select best patterns and their allocation for query characteristics""" allocations = {} # Vector retrieval good for semantic queries if characteristics['is_semantic'] and 'vector' in self.patterns: allocations['vector'] = 6 # Graph memory good for relational queries if characteristics['is_relational'] and 'graph' in self.patterns: allocations['graph'] = 5 # Sliding window good for recent context if characteristics['is_recent'] and 'sliding_window' in self.patterns: allocations['sliding_window'] = 4 # Full history as fallback if 'full_history' in self.patterns: allocations['full_history'] = 3 # Hierarchical for balanced access if 'hierarchical' in self.patterns: allocations['hierarchical'] = 4 # Ensure we have at least one pattern if not allocations: available_patterns = list(self.patterns.keys()) if available_patterns: allocations[available_patterns[0]] = 10 return allocations def _contains_entities(self, query: str) -> bool: """Check if query contains named entities""" # Simple heuristic: capitalized words might be entities words = query.split() return any(word[0].isupper() and len(word) > 2 for word in words) def _is_semantic_query(self, query: str) -> bool: """Check if query is semantic in nature""" semantic_indicators = [ 'similar', 'like', 'related', 'about', 'regarding', 'concerning', 'explain', 'what', 'how', 'why' ] return any(indicator in query.lower() for indicator in semantic_indicators) def _is_relational_query(self, query: str) -> bool: """Check if query involves relationships""" relation_indicators = [ 'connected', 'relationship', 'between', 'and', 'with', 'works at', 'lives in', 'related to', 'associated' ] return any(indicator in query.lower() for indicator in relation_indicators) def _get_pattern_priorities(self, query: str) -> List[str]: """Get pattern priorities for cascade retrieval""" characteristics = self._analyze_query(query, {}) priorities = [] # Prioritize based on query characteristics if characteristics['is_recent']: priorities.extend(['sliding_window', 'hierarchical']) if characteristics['is_semantic']: priorities.append('vector') if characteristics['is_relational']: priorities.append('graph') # Add remaining patterns for pattern_name in self.patterns.keys(): if pattern_name not in priorities: priorities.append(pattern_name) return priorities def _are_results_sufficient(self, results: List[Dict], query: str) -> bool: """Determine if results are sufficient to stop cascade""" if len(results) < 3: return False # Check if top results have high relevance scores high_relevance_count = sum( 1 for result in results if result.get('similarity_score', 0) > 0.8 ) return high_relevance_count >= 2 class PerformanceMonitor: """Monitor performance of hybrid memory system""" def __init__(self): self.metrics = { 'add_operations': [], 'retrieve_operations': [], 'pattern_performance': {} } def record_add_operation(self, results: Dict[str, bool]): """Record add operation results""" self.metrics['add_operations'].append({ 'timestamp': datetime.now(), 'results': results, 'success_rate': sum(results.values()) / len(results) if results else 0 }) def record_retrieve_operation(self, pattern_name: str, query: str, result_count: int, duration: float): """Record retrieve operation metrics""" if pattern_name not in self.metrics['pattern_performance']: self.metrics['pattern_performance'][pattern_name] = [] self.metrics['pattern_performance'][pattern_name].append({ 'timestamp': datetime.now(), 'query': query, 'result_count': result_count, 'duration': duration }) def get_performance_summary(self) -> Dict: """Get performance summary across all patterns""" summary = {} for pattern_name, operations in self.metrics['pattern_performance'].items(): if not operations: continue durations = [op['duration'] for op in operations] result_counts = [op['result_count'] for op in operations] summary[pattern_name] = { 'avg_duration': sum(durations) / len(durations), 'avg_result_count': sum(result_counts) / len(result_counts), 'total_operations': len(operations) } return summary

Common Hybrid Patterns

Vector + Graph Hybrid

class VectorGraphHybrid(MemoryPattern): """Combines vector similarity search with graph relationship modeling""" def __init__(self): self.vector_memory = VectorRetrievalMemory() self.graph_memory = GraphMemory() self.entity_extractor = EntityExtractor() def add_interaction(self, user_input: str, agent_response: str, context: Dict = None) -> bool: try: # Add to vector memory for semantic search self.vector_memory.add_interaction(user_input, agent_response) # Extract entities and relationships for graph entities = self.entity_extractor.extract_entities(f"{user_input} {agent_response}") relationships = self.entity_extractor.extract_relationships(f"{user_input} {agent_response}") # Add to graph memory for entity in entities: self.graph_memory.add_entity(entity['text'], entity['type']) for rel in relationships: self.graph_memory.add_relationship( rel['subject'], rel['object'], rel['predicate'] ) return True except Exception as e: print(f"Error in VectorGraphHybrid.add_interaction: {e}") return False def retrieve(self, query: str, limit: int = 10, expand_graph: bool = True, **kwargs) -> List[Dict]: """Retrieve using both vector similarity and graph expansion""" results = [] # Get semantically similar results vector_results = self.vector_memory.retrieve_relevant(query, top_k=limit//2) # Extract entities from query for graph expansion query_entities = self.entity_extractor.extract_entities(query) if expand_graph and query_entities: # Find related entities through graph expanded_entities = set() for entity_info in query_entities: entity_name = entity_info['text'] neighbors = self.graph_memory.get_neighbors(entity_name, max_distance=2) expanded_entities.update(neighbors.keys()) # Find interactions involving expanded entities graph_results = [] for entity in expanded_entities: entity_interactions = self._find_interactions_with_entity(entity) graph_results.extend(entity_interactions) else: graph_results = [] # Combine and rank results all_results = self._merge_vector_graph_results(vector_results, graph_results) return all_results[:limit] def _merge_vector_graph_results(self, vector_results: List[Dict], graph_results: List[Dict]) -> List[Dict]: """Merge and rank vector and graph results""" seen_interactions = set() merged_results = [] # Add vector results with semantic score for result in vector_results: interaction_key = (result['user_input'], result['agent_response']) if interaction_key not in seen_interactions: result['source'] = 'vector' result['combined_score'] = result['similarity_score'] * 0.7 merged_results.append(result) seen_interactions.add(interaction_key) # Add graph results with relationship score for result in graph_results: interaction_key = (result['user_input'], result['agent_response']) if interaction_key not in seen_interactions: result['source'] = 'graph' result['combined_score'] = result.get('relevance_score', 0.5) * 0.6 merged_results.append(result) seen_interactions.add(interaction_key) # Sort by combined score merged_results.sort(key=lambda x: x['combined_score'], reverse=True) return merged_results def _find_interactions_with_entity(self, entity: str) -> List[Dict]: """Find interactions that mention a specific entity""" # This would typically involve searching through stored interactions # Simplified implementation for example return [] def get_capabilities(self) -> Dict[str, bool]: return { 'semantic_search': True, 'relationship_modeling': True, 'entity_expansion': True, 'graph_traversal': True } class EntityExtractor: """Simplified entity and relationship extractor""" def extract_entities(self, text: str) -> List[Dict]: # Simplified implementation import re entities = [] # Find capitalized words (potential entities) capitalized_words = re.findall(r'\b[A-Z][a-z]+\b', text) for word in capitalized_words: entities.append({ 'text': word, 'type': 'PERSON', # Simplified 'confidence': 0.8 }) return entities def extract_relationships(self, text: str) -> List[Dict]: # Simplified relationship extraction relationships = [] # Simple pattern matching for relationships if ' works at ' in text.lower(): parts = text.lower().split(' works at ') if len(parts) == 2: relationships.append({ 'subject': parts[0].strip(), 'predicate': 'works_at', 'object': parts[1].strip() }) return relationships

Hierarchical + Vector Hybrid

class HierarchicalVectorHybrid(MemoryPattern): """Combines hierarchical organization with vector retrieval at each level""" def __init__(self): self.hierarchical_memory = HierarchicalMemory() self.vector_indices = { 'working': VectorRetrievalMemory(), 'short_term': VectorRetrievalMemory(), 'long_term': VectorRetrievalMemory() } def add_interaction(self, user_input: str, agent_response: str, context: Dict = None) -> bool: try: # Add to hierarchical memory self.hierarchical_memory.add_interaction(user_input, agent_response, context) # Add to working memory vector index self.vector_indices['working'].add_interaction(user_input, agent_response) return True except Exception as e: print(f"Error in HierarchicalVectorHybrid.add_interaction: {e}") return False def retrieve(self, query: str, limit: int = 10, level_weights: Dict[str, float] = None, **kwargs) -> List[Dict]: """Retrieve using vector search within hierarchical levels""" if level_weights is None: level_weights = {'working': 0.5, 'short_term': 0.3, 'long_term': 0.2} all_results = [] for level, weight in level_weights.items(): if level in self.vector_indices: level_limit = max(1, int(limit * weight)) # Get vector results for this level level_results = self.vector_indices[level].retrieve_relevant( query, top_k=level_limit ) # Add level information and weight results for result in level_results: result['memory_level'] = level result['level_weight'] = weight result['weighted_score'] = result.get('similarity_score', 0.5) * weight all_results.append(result) # Sort by weighted score all_results.sort(key=lambda x: x['weighted_score'], reverse=True) return all_results[:limit] def consolidate_memories(self): """Perform memory consolidation and update vector indices""" # Consolidate hierarchical memory self.hierarchical_memory.consolidate_memories() # Move vectors between indices based on memory consolidation self._sync_vector_indices_with_hierarchy() def _sync_vector_indices_with_hierarchy(self): """Synchronize vector indices with hierarchical memory state""" # This would involve moving vectors between indices # as memories are promoted/demoted in the hierarchy pass def get_capabilities(self) -> Dict[str, bool]: return { 'hierarchical_organization': True, 'semantic_search': True, 'level_based_retrieval': True, 'memory_consolidation': True }

Adaptive Multi-Pattern System

class AdaptiveMultiPatternMemory: """Dynamically selects and combines memory patterns based on usage patterns""" def __init__(self): self.orchestrator = HybridMemoryOrchestrator() self.usage_analyzer = UsageAnalyzer() self.pattern_selector = PatternSelector() # Initialize with multiple patterns self._setup_default_patterns() def _setup_default_patterns(self): """Set up default memory patterns with routing rules""" # Vector retrieval for semantic queries vector_memory = VectorRetrievalMemory() self.orchestrator.register_pattern( 'vector', vector_memory, routing_rules={ 'keywords': ['similar', 'like', 'about', 'explain', 'what'], 'interaction_types': ['question', 'explanation_request'] }, weight=1.0 ) # Graph memory for relationship queries graph_memory = GraphMemory() self.orchestrator.register_pattern( 'graph', graph_memory, routing_rules={ 'keywords': ['relationship', 'connected', 'between', 'works at'], 'interaction_types': ['relationship_query', 'entity_query'] }, weight=0.8 ) # Sliding window for recent context sliding_memory = SlidingWindowMemory(max_size=50) self.orchestrator.register_pattern( 'sliding_window', sliding_memory, routing_rules={ 'keywords': ['recent', 'just', 'current', 'now'], 'context_requirements': {'prefer_recent': True} }, weight=0.9 ) # Hierarchical for general-purpose hierarchical_memory = HierarchicalMemory() self.orchestrator.register_pattern( 'hierarchical', hierarchical_memory, weight=0.7 ) def add_interaction(self, user_input: str, agent_response: str, context: Dict = None): """Add interaction with adaptive pattern selection""" # Analyze interaction characteristics interaction_analysis = self.usage_analyzer.analyze_interaction( user_input, agent_response, context ) # Update context with analysis results enriched_context = {**(context or {}), **interaction_analysis} # Add to orchestrator results = self.orchestrator.add_interaction(user_input, agent_response, enriched_context) # Learn from this interaction self.usage_analyzer.record_interaction( user_input, agent_response, enriched_context, results ) return results def retrieve_context(self, query: str, **kwargs): """Retrieve context using adaptive strategy selection""" # Analyze query to determine best approach query_analysis = self.usage_analyzer.analyze_query(query, kwargs) # Select retrieval strategy based on analysis strategy = self.pattern_selector.select_strategy(query_analysis) # Execute retrieval results = self.orchestrator.retrieve_context(query, strategy=strategy, **kwargs) # Learn from retrieval performance self.usage_analyzer.record_retrieval(query, strategy, results) return results def optimize_patterns(self): """Optimize pattern weights and routing based on usage patterns""" usage_insights = self.usage_analyzer.get_insights() # Adjust pattern weights based on performance for pattern_name, performance in usage_insights['pattern_performance'].items(): if performance['effectiveness'] > 0.8: # Increase weight for high-performing patterns current_weight = self.orchestrator.fusion_strategies[pattern_name]['weight'] new_weight = min(1.0, current_weight * 1.1) self.orchestrator.fusion_strategies[pattern_name]['weight'] = new_weight elif performance['effectiveness'] < 0.4: # Decrease weight for poor-performing patterns current_weight = self.orchestrator.fusion_strategies[pattern_name]['weight'] new_weight = max(0.1, current_weight * 0.9) self.orchestrator.fusion_strategies[pattern_name]['weight'] = new_weight # Update routing rules based on successful patterns self._update_routing_rules(usage_insights) def _update_routing_rules(self, insights: Dict): """Update routing rules based on usage insights""" # This would analyze which patterns work best for which types of queries # and update routing rules accordingly pass class UsageAnalyzer: """Analyzes usage patterns to optimize memory system""" def __init__(self): self.interaction_history = [] self.retrieval_history = [] self.performance_metrics = {} def analyze_interaction(self, user_input: str, agent_response: str, context: Dict) -> Dict: """Analyze interaction characteristics""" analysis = { 'length': len(user_input.split()) + len(agent_response.split()), 'has_entities': self._detect_entities(user_input + " " + agent_response), 'is_question': user_input.strip().endswith('?'), 'sentiment': self._analyze_sentiment(user_input), 'topic': self._extract_topic(user_input + " " + agent_response) } return analysis def analyze_query(self, query: str, kwargs: Dict) -> Dict: """Analyze query characteristics for strategy selection""" analysis = { 'query_length': len(query.split()), 'query_type': self._classify_query_type(query), 'semantic_complexity': self._assess_semantic_complexity(query), 'temporal_focus': self._detect_temporal_focus(query), 'entity_count': len(self._detect_entities(query)) } return analysis def record_interaction(self, user_input: str, agent_response: str, context: Dict, results: Dict): """Record interaction for learning""" self.interaction_history.append({ 'timestamp': datetime.now(), 'user_input': user_input, 'agent_response': agent_response, 'context': context, 'pattern_results': results }) def record_retrieval(self, query: str, strategy: str, results: Dict): """Record retrieval for performance analysis""" self.retrieval_history.append({ 'timestamp': datetime.now(), 'query': query, 'strategy': strategy, 'results': results, 'total_results': sum(len(pattern_results) for pattern_results in results.values()) }) def get_insights(self) -> Dict: """Get usage insights for optimization""" return { 'pattern_performance': self._calculate_pattern_performance(), 'query_patterns': self._identify_query_patterns(), 'strategy_effectiveness': self._evaluate_strategy_effectiveness() } def _detect_entities(self, text: str) -> List[str]: """Simple entity detection""" import re return re.findall(r'\b[A-Z][a-z]+\b', text) def _analyze_sentiment(self, text: str) -> str: """Simple sentiment analysis""" positive_words = ['good', 'great', 'excellent', 'happy', 'pleased'] negative_words = ['bad', 'terrible', 'angry', 'frustrated', 'disappointed'] text_lower = text.lower() pos_count = sum(1 for word in positive_words if word in text_lower) neg_count = sum(1 for word in negative_words if word in text_lower) if pos_count > neg_count: return 'positive' elif neg_count > pos_count: return 'negative' else: return 'neutral' def _extract_topic(self, text: str) -> str: """Simple topic extraction""" # Remove common words and return most frequent remaining words common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} words = [word.lower() for word in text.split() if word.lower() not in common_words and len(word) > 2] if words: # Return most common word as topic (simplified) from collections import Counter return Counter(words).most_common(1)[0][0] return 'general' def _classify_query_type(self, query: str) -> str: """Classify query type""" query_lower = query.lower() if any(word in query_lower for word in ['what', 'how', 'why', 'when', 'where']): return 'information_seeking' elif any(word in query_lower for word in ['find', 'search', 'look']): return 'search' elif any(word in query_lower for word in ['relationship', 'connection', 'between']): return 'relationship' elif any(word in query_lower for word in ['recent', 'latest', 'current']): return 'temporal' else: return 'general' def _assess_semantic_complexity(self, query: str) -> float: """Assess semantic complexity of query""" # Simple heuristic based on length and vocabulary words = query.split() unique_words = set(words) complexity = len(unique_words) / max(len(words), 1) return min(1.0, complexity * 2) # Normalize to 0-1 def _detect_temporal_focus(self, query: str) -> str: """Detect temporal focus of query""" query_lower = query.lower() if any(word in query_lower for word in ['recent', 'latest', 'current', 'now']): return 'recent' elif any(word in query_lower for word in ['history', 'past', 'before', 'previous']): return 'historical' else: return 'general' def _calculate_pattern_performance(self) -> Dict: """Calculate performance metrics for each pattern""" # Simplified implementation return {} def _identify_query_patterns(self) -> Dict: """Identify common query patterns""" # Simplified implementation return {} def _evaluate_strategy_effectiveness(self) -> Dict: """Evaluate effectiveness of different retrieval strategies""" # Simplified implementation return {} class PatternSelector: """Selects optimal retrieval strategy based on query analysis""" def __init__(self): self.strategy_rules = { 'adaptive': self._should_use_adaptive, 'parallel': self._should_use_parallel, 'cascade': self._should_use_cascade, 'weighted': self._should_use_weighted } def select_strategy(self, query_analysis: Dict) -> str: """Select best strategy based on query analysis""" # Score each strategy strategy_scores = {} for strategy, rule_func in self.strategy_rules.items(): strategy_scores[strategy] = rule_func(query_analysis) # Return strategy with highest score best_strategy = max(strategy_scores, key=strategy_scores.get) return best_strategy def _should_use_adaptive(self, analysis: Dict) -> float: """Score for adaptive strategy""" base_score = 0.7 # Adaptive works well for complex queries if analysis.get('semantic_complexity', 0) > 0.6: base_score += 0.2 # Good for information seeking queries if analysis.get('query_type') == 'information_seeking': base_score += 0.1 return base_score def _should_use_parallel(self, analysis: Dict) -> float: """Score for parallel strategy""" base_score = 0.5 # Parallel good when we need comprehensive results if analysis.get('query_length', 0) > 5: base_score += 0.2 # Good for complex queries that might benefit from multiple approaches if analysis.get('semantic_complexity', 0) > 0.7: base_score += 0.2 return base_score def _should_use_cascade(self, analysis: Dict) -> float: """Score for cascade strategy""" base_score = 0.6 # Cascade good for temporal queries (try recent first) if analysis.get('temporal_focus') == 'recent': base_score += 0.3 # Good for simple queries that might be satisfied quickly if analysis.get('semantic_complexity', 0) < 0.4: base_score += 0.1 return base_score def _should_use_weighted(self, analysis: Dict) -> float: """Score for weighted strategy""" base_score = 0.4 # Weighted good for general queries if analysis.get('query_type') == 'general': base_score += 0.3 # Good when we want balanced results if 0.3 < analysis.get('semantic_complexity', 0) < 0.7: base_score += 0.2 return base_score

Performance Characteristics

Pros

  • Best of All Worlds: Leverages strengths of multiple patterns
  • Adaptive Performance: Can optimize based on usage patterns
  • Redundancy and Reliability: Fallback options when one pattern fails
  • Flexible Configuration: Can be tuned for specific applications
  • Comprehensive Coverage: Handles diverse query types effectively

Cons

  • Implementation Complexity: Much more complex than single patterns
  • Resource Overhead: Higher memory and computational requirements
  • Coordination Challenges: Managing interactions between patterns
  • Tuning Difficulty: Many parameters to optimize
  • Debugging Complexity: Harder to troubleshoot issues

Performance Metrics

# Performance characteristics vary by component patterns HYBRID_CHARACTERISTICS = { "storage_overhead": "1.5-3x single pattern", "retrieval_latency": "Variable (10ms-500ms)", "memory_usage": "Proportional to pattern count", "accuracy": "Higher than individual patterns", "complexity": "High implementation and maintenance" }

When to Use

Ideal Scenarios

  • Production systems requiring high reliability and performance
  • Diverse workloads with varying query types and patterns
  • Long-term deployments where optimization benefits justify complexity
  • Resource-rich environments that can handle increased overhead
  • Critical applications where memory quality is paramount
  • Simple applications with uniform memory requirements
  • Resource-constrained environments with tight limits
  • Rapid prototypes where simplicity is more important than optimization
  • Single-use-case systems where one pattern suffices

Best Practices

Configuration Management

class HybridMemoryConfig: """Configuration management for hybrid memory systems""" def __init__(self, config_file: str = None): self.default_config = { 'patterns': { 'vector': { 'enabled': True, 'weight': 1.0, 'model': 'all-MiniLM-L6-v2', 'routing_rules': { 'keywords': ['similar', 'like', 'about'], 'min_similarity_threshold': 0.3 } }, 'graph': { 'enabled': True, 'weight': 0.8, 'max_hops': 3, 'routing_rules': { 'keywords': ['relationship', 'connected', 'between'] } }, 'sliding_window': { 'enabled': True, 'weight': 0.9, 'window_size': 50, 'routing_rules': { 'keywords': ['recent', 'current'] } } }, 'fusion_strategies': { 'default_strategy': 'adaptive', 'parallel_threshold': 0.7, 'cascade_timeout': 5.0 }, 'optimization': { 'auto_tune': True, 'tune_interval_hours': 24, 'performance_window_size': 100 } } if config_file: self.config = self._load_config(config_file) else: self.config = self.default_config def _load_config(self, config_file: str) -> Dict: """Load configuration from file""" import json with open(config_file, 'r') as f: return json.load(f) def create_memory_system(self) -> AdaptiveMultiPatternMemory: """Create memory system from configuration""" memory_system = AdaptiveMultiPatternMemory() # Configure patterns based on config for pattern_name, pattern_config in self.config['patterns'].items(): if pattern_config.get('enabled', True): self._configure_pattern(memory_system, pattern_name, pattern_config) return memory_system def _configure_pattern(self, memory_system, pattern_name: str, config: Dict): """Configure individual pattern""" # This would create and configure the specific pattern type pass

Monitoring and Observability

class HybridMemoryMonitor: """Monitoring and observability for hybrid memory systems""" def __init__(self, memory_system: AdaptiveMultiPatternMemory): self.memory_system = memory_system self.metrics_collector = MetricsCollector() self.alerting = AlertingSystem() def start_monitoring(self): """Start continuous monitoring""" import threading import time def monitor_loop(): while True: self._collect_metrics() self._check_alerts() time.sleep(60) # Monitor every minute monitor_thread = threading.Thread(target=monitor_loop, daemon=True) monitor_thread.start() def _collect_metrics(self): """Collect current metrics""" metrics = { 'timestamp': datetime.now(), 'pattern_performance': self._get_pattern_performance(), 'memory_usage': self._get_memory_usage(), 'retrieval_latency': self._get_retrieval_latency(), 'success_rates': self._get_success_rates() } self.metrics_collector.record_metrics(metrics) def _check_alerts(self): """Check for alert conditions""" latest_metrics = self.metrics_collector.get_latest_metrics() # Check for high latency if latest_metrics.get('retrieval_latency', {}).get('avg', 0) > 500: # 500ms self.alerting.send_alert('High retrieval latency detected') # Check for low success rates success_rates = latest_metrics.get('success_rates', {}) for pattern, rate in success_rates.items(): if rate < 0.8: # 80% success rate threshold self.alerting.send_alert(f'Low success rate for {pattern}: {rate:.2f}') # Check for memory pressure memory_usage = latest_metrics.get('memory_usage', {}) if memory_usage.get('total_mb', 0) > 1000: # 1GB threshold self.alerting.send_alert('High memory usage detected') def _get_pattern_performance(self) -> Dict: """Get performance metrics for each pattern""" performance_summary = self.memory_system.orchestrator.performance_monitor.get_performance_summary() return performance_summary def _get_memory_usage(self) -> Dict: """Get memory usage statistics""" import psutil import os process = psutil.Process(os.getpid()) memory_info = process.memory_info() return { 'rss_mb': memory_info.rss / (1024 * 1024), 'vms_mb': memory_info.vms / (1024 * 1024), 'percent': process.memory_percent() } def _get_retrieval_latency(self) -> Dict: """Get retrieval latency statistics""" # Would calculate from recent retrieval operations return {'avg': 100, 'p95': 200, 'p99': 300} # Placeholder def _get_success_rates(self) -> Dict: """Get success rates for each pattern""" # Would calculate from recent operations return {'vector': 0.95, 'graph': 0.88, 'sliding_window': 0.92} # Placeholder class MetricsCollector: """Collect and store metrics""" def __init__(self): self.metrics_history = [] def record_metrics(self, metrics: Dict): """Record metrics""" self.metrics_history.append(metrics) # Keep only last 1000 entries if len(self.metrics_history) > 1000: self.metrics_history = self.metrics_history[-1000:] def get_latest_metrics(self) -> Dict: """Get latest metrics""" return self.metrics_history[-1] if self.metrics_history else {} class AlertingSystem: """Simple alerting system""" def send_alert(self, message: str): """Send alert (placeholder implementation)""" print(f"ALERT: {message} at {datetime.now()}")

Testing and Validation

Integration Tests

import pytest import time def test_hybrid_memory_integration(): """Test full hybrid memory system integration""" memory_system = AdaptiveMultiPatternMemory() # Add diverse types of interactions test_interactions = [ ("What is machine learning?", "ML is a subset of AI", {'type': 'question'}), ("John works at Apple", "That's interesting", {'type': 'fact'}), ("Show me recent conversations", "Here are recent chats", {'type': 'retrieval', 'prefer_recent': True}), ("How is John related to Apple?", "John works at Apple", {'type': 'relationship_query'}) ] for user_input, agent_response, context in test_interactions: results = memory_system.add_interaction(user_input, agent_response, context) assert any(results.values()), f"Failed to add interaction: {user_input}" # Test different types of retrieval test_queries = [ ("machine learning", {'expected_patterns': ['vector']}), ("John Apple relationship", {'expected_patterns': ['graph']}), ("recent", {'expected_patterns': ['sliding_window']}) ] for query, expectations in test_queries: results = memory_system.retrieve_context(query) assert results, f"No results for query: {query}" # Check that expected patterns were used pattern_names = list(results.keys()) for expected_pattern in expectations.get('expected_patterns', []): assert any(expected_pattern in name for name in pattern_names), \ f"Expected pattern {expected_pattern} not used for query: {query}" def test_pattern_coordination(): """Test coordination between different patterns""" orchestrator = HybridMemoryOrchestrator() # Register test patterns vector_mock = MockVectorPattern() graph_mock = MockGraphPattern() orchestrator.register_pattern('vector', vector_mock) orchestrator.register_pattern('graph', graph_mock) # Test routing results = orchestrator.add_interaction( "What is similar to machine learning?", "Deep learning is similar to ML", {'type': 'semantic_query'} ) assert 'vector' in results assert results['vector'] == True def test_performance_monitoring(): """Test performance monitoring functionality""" monitor = PerformanceMonitor() # Record some operations monitor.record_add_operation({'pattern1': True, 'pattern2': False}) monitor.record_retrieve_operation('pattern1', 'test query', 5, 0.1) summary = monitor.get_performance_summary() assert 'pattern1' in summary assert summary['pattern1']['total_operations'] == 1 class MockVectorPattern(MemoryPattern): def add_interaction(self, user_input, agent_response, context=None): return True def retrieve(self, query, **kwargs): return [{'content': f'Vector result for {query}', 'similarity_score': 0.8}] def get_capabilities(self): return {'semantic_search': True} class MockGraphPattern(MemoryPattern): def add_interaction(self, user_input, agent_response, context=None): return True def retrieve(self, query, **kwargs): return [{'content': f'Graph result for {query}', 'relevance_score': 0.7}] def get_capabilities(self): return {'relationship_modeling': True}

Migration and Deployment

Gradual Migration Strategy

class GradualMigrationManager: """Manage gradual migration to hybrid memory system""" def __init__(self, existing_memory, target_hybrid_config): self.existing_memory = existing_memory self.target_config = target_hybrid_config self.migration_state = 'not_started' self.hybrid_system = None def start_migration(self, migration_strategy: str = 'shadow_mode'): """Start migration process""" if migration_strategy == 'shadow_mode': self._start_shadow_mode() elif migration_strategy == 'gradual_cutover': self._start_gradual_cutover() elif migration_strategy == 'big_bang': self._start_big_bang() else: raise ValueError(f"Unknown migration strategy: {migration_strategy}") def _start_shadow_mode(self): """Run hybrid system in parallel to collect performance data""" self.hybrid_system = self._create_hybrid_system() self.migration_state = 'shadow_mode' # Copy existing data to hybrid system self._copy_existing_data() def _start_gradual_cutover(self): """Gradually shift traffic to hybrid system""" self.hybrid_system = self._create_hybrid_system() self.migration_state = 'gradual_cutover' self.cutover_percentage = 10 # Start with 10% of traffic def _start_big_bang(self): """Complete migration in one step""" self.hybrid_system = self._create_hybrid_system() self._copy_existing_data() self.migration_state = 'completed' def process_interaction(self, user_input: str, agent_response: str, context: Dict = None): """Process interaction during migration""" if self.migration_state == 'shadow_mode': # Add to both systems existing_result = self.existing_memory.add_interaction(user_input, agent_response, context) hybrid_result = self.hybrid_system.add_interaction(user_input, agent_response, context) return existing_result # Use existing system's result elif self.migration_state == 'gradual_cutover': import random if random.random() < (self.cutover_percentage / 100): return self.hybrid_system.add_interaction(user_input, agent_response, context) else: return self.existing_memory.add_interaction(user_input, agent_response, context) elif self.migration_state == 'completed': return self.hybrid_system.add_interaction(user_input, agent_response, context) else: return self.existing_memory.add_interaction(user_input, agent_response, context) def increase_cutover_percentage(self, increment: int = 10): """Gradually increase traffic to hybrid system""" if self.migration_state == 'gradual_cutover': self.cutover_percentage = min(100, self.cutover_percentage + increment) if self.cutover_percentage >= 100: self.migration_state = 'completed' def _create_hybrid_system(self): """Create hybrid system from configuration""" config = HybridMemoryConfig() return config.create_memory_system() def _copy_existing_data(self): """Copy data from existing system to hybrid system""" # Get all interactions from existing system if hasattr(self.existing_memory, 'get_all_interactions'): interactions = self.existing_memory.get_all_interactions() for interaction in interactions: self.hybrid_system.add_interaction( interaction.get('user_input', ''), interaction.get('agent_response', ''), interaction.get('context', {}) )

Next Steps

  1. Assess your specific use case requirements and constraints
  2. Choose appropriate base patterns for your hybrid system
  3. Design coordination and fusion strategies
  4. Implement gradual migration approach if upgrading existing system
  5. Set up comprehensive monitoring and observability
  6. Tune pattern weights and routing rules based on usage patterns
  7. Plan for ongoing optimization and pattern evolution

Hybrid approaches represent the most sophisticated memory architectures, providing maximum flexibility and capability at the cost of increased complexity. They are best suited for production systems with diverse requirements and the resources to support comprehensive memory management.