Skip to Content
DocumentationFAQTroubleshooting

Troubleshooting FAQ

Common Memory Issues

My agent is forgetting important information

This is one of the most common memory system problems. Let’s diagnose and fix it:

Diagnostic steps:

  1. Check if information is being stored at all
  2. Verify retrieval is working correctly
  3. Examine memory importance scoring
  4. Look for memory conflicts or overwrites

Step 1: Verify Storage

# Add debugging to your memory storage class DebugMemoryManager: def store_memory(self, user_id, content, **kwargs): print(f"Storing memory for user {user_id}: {content[:100]}...") # Store the memory memory_id = self.memory_store.create_memory(user_id, content, **kwargs) # Verify it was stored retrieved = self.memory_store.get_memory(memory_id) if retrieved: print(f"âś“ Memory stored successfully: {memory_id}") else: print(f"âś— Failed to store memory: {memory_id}") return memory_id

Step 2: Check Retrieval Logic

def debug_memory_retrieval(self, user_id, query): print(f"Searching memories for: {query}") # Check total memories for user all_memories = self.get_all_memories(user_id) print(f"Total memories for user: {len(all_memories)}") # Test similarity search similar = self.find_similar_memories(query, limit=10) print(f"Similar memories found: {len(similar)}") for i, (memory, score) in enumerate(similar): print(f" {i+1}. Score: {score:.3f}, Content: {memory.content[:50]}...") return similar

Common fixes:

  • Low importance scores: Memories being scored too low and filtered out
  • Poor embedding quality: Text not embedding well for similarity search
  • Retrieval threshold too high: Similarity threshold excluding relevant memories
  • Memory expiration: Automatic cleanup removing important data

Quick fix pattern:

# Boost importance for user corrections if context.get("user_emphasized"): importance_score *= 1.5 # Lower retrieval threshold for sparse memory if len(candidate_memories) < 5: similarity_threshold *= 0.7

Memory retrieval is too slow

Performance problems often emerge as memory stores grow. Here’s how to diagnose and optimize:

Performance Debugging:

import time from functools import wraps def timing_decorator(func): @wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f"{func.__name__} took {end - start:.3f} seconds") return result return wrapper class PerformanceDebugMemoryManager: @timing_decorator def retrieve_memories(self, user_id, query): # Your retrieval logic here pass def profile_memory_performance(self, user_id): """Profile different aspects of memory performance""" # Test database query speed start = time.time() memories = self.db.get_user_memories(user_id, limit=1000) db_time = time.time() - start # Test embedding generation speed start = time.time() embedding = self.generate_embedding("test query") embedding_time = time.time() - start # Test similarity search speed start = time.time() similar = self.vector_db.similarity_search(embedding, top_k=50) vector_search_time = time.time() - start print(f"Performance Profile:") print(f" Database query: {db_time:.3f}s") print(f" Embedding generation: {embedding_time:.3f}s") print(f" Vector search: {vector_search_time:.3f}s") return { "db_time": db_time, "embedding_time": embedding_time, "vector_search_time": vector_search_time }

Optimization strategies:

1. Database Optimization:

-- Add proper indexes CREATE INDEX CONCURRENTLY idx_memories_user_timestamp ON memories(user_id, created_at DESC); CREATE INDEX CONCURRENTLY idx_memories_importance ON memories(importance DESC) WHERE importance > 0.5; -- Optimize vector similarity search CREATE INDEX ON memories USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

2. Caching Strategy:

import redis from functools import lru_cache class CachedMemoryManager: def __init__(self): self.redis_client = redis.Redis() self.cache_ttl = 300 # 5 minutes @lru_cache(maxsize=1000) def get_user_embedding_cache(self, text): """Cache embeddings to avoid recomputation""" return self.generate_embedding(text) def get_cached_memories(self, user_id, query_hash): """Check cache before expensive retrieval""" cache_key = f"memories:{user_id}:{query_hash}" cached = self.redis_client.get(cache_key) if cached: return json.loads(cached) return None def cache_memory_results(self, user_id, query_hash, results): """Cache retrieval results""" cache_key = f"memories:{user_id}:{query_hash}" self.redis_client.setex( cache_key, self.cache_ttl, json.dumps(results, default=str) )

3. Retrieval Optimization:

def optimized_memory_retrieval(self, user_id, query, max_memories=10): """Multi-stage retrieval with early termination""" # Stage 1: Quick filter on structured data candidates = self.db.execute(""" SELECT id, content, importance, embedding FROM memories WHERE user_id = %s AND importance > 0.3 AND created_at > NOW() - INTERVAL '90 days' ORDER BY importance DESC LIMIT 100 """, [user_id]) if len(candidates) < 5: # Not enough candidates, expand search candidates = self.expand_candidate_search(user_id) # Stage 2: Batch embedding computation query_embedding = self.get_cached_embedding(query) # Stage 3: Efficient similarity computation similarities = self.batch_similarity_search(query_embedding, candidates) # Stage 4: Early termination if good matches found top_matches = similarities[:max_memories] if top_matches and top_matches[0].similarity > 0.8: return top_matches # High confidence, return early # Stage 5: Expand search if needed return self.expanded_search(user_id, query, query_embedding)

Memory is storing conflicting information

Handling contradictory information is crucial for reliable memory systems:

Conflict Detection:

class MemoryConflictResolver: def detect_conflicts(self, user_id, new_memory): """Detect potential conflicts before storing""" # Find semantically similar memories similar_memories = self.find_similar_memories( new_memory.content, user_id=user_id, similarity_threshold=0.7 ) conflicts = [] for existing in similar_memories: conflict_type = self.analyze_conflict(existing, new_memory) if conflict_type != "no_conflict": conflicts.append({ "existing_memory": existing, "new_memory": new_memory, "conflict_type": conflict_type, "severity": self.assess_conflict_severity(existing, new_memory) }) return conflicts def analyze_conflict(self, memory1, memory2): """Determine type of conflict between memories""" # Use LLM to analyze semantic conflict prompt = f""" Analyze these two pieces of information for conflicts: Memory 1: {memory1.content} Memory 2: {memory2.content} Are these conflicting, complementary, or unrelated? Return: "direct_conflict", "partial_conflict", "complementary", or "no_conflict" """ conflict_type = self.llm.query(prompt).strip() return conflict_type def resolve_conflict(self, conflict): """Resolve detected conflicts""" existing = conflict["existing_memory"] new_memory = conflict["new_memory"] conflict_type = conflict["conflict_type"] if conflict_type == "direct_conflict": # Choose based on confidence, recency, and source if self.should_replace(existing, new_memory): self.replace_memory(existing.id, new_memory) return "replaced" else: self.mark_memory_disputed(existing.id, new_memory) return "marked_disputed" elif conflict_type == "partial_conflict": # Try to merge if possible merged = self.attempt_merge(existing, new_memory) if merged: self.replace_memory(existing.id, merged) return "merged" else: # Store both with conflict notation self.store_with_conflict_note(new_memory, existing.id) return "stored_with_note" def should_replace(self, existing, new_memory): """Decide whether new memory should replace existing""" # Factors to consider recency_weight = 0.3 confidence_weight = 0.4 source_weight = 0.3 existing_score = ( existing.confidence * confidence_weight + self.source_reliability(existing.source) * source_weight + (1 - self.age_factor(existing)) * recency_weight ) new_score = ( new_memory.confidence * confidence_weight + self.source_reliability(new_memory.source) * source_weight + 1.0 * recency_weight # New memories get full recency score ) return new_score > existing_score + 0.1 # Slight bias for existing

Prevention strategies:

def validate_memory_before_storage(self, user_id, content, source="unknown"): """Validate memory for consistency before storing""" # Check for obvious contradictions user_profile = self.get_user_profile(user_id) # Cross-reference with existing facts potential_conflicts = self.find_potential_conflicts(user_id, content) if potential_conflicts: # Flag for human review or additional validation return self.flag_for_review(content, potential_conflicts) # Store with source attribution return self.store_memory_with_source(user_id, content, source)

My memory system is using too much storage/cost

Memory storage costs can grow quickly. Here’s how to optimize:

Storage Analysis:

class StorageAnalyzer: def analyze_memory_usage(self, user_id): """Comprehensive storage usage analysis""" memories = self.get_all_memories(user_id) analysis = { "total_memories": len(memories), "total_size_mb": sum(self.estimate_memory_size(m) for m in memories) / (1024*1024), "by_type": {}, "by_age": {}, "by_access": {}, "optimization_opportunities": [] } # Analyze by type for memory_type in set(m.memory_type for m in memories): type_memories = [m for m in memories if m.memory_type == memory_type] analysis["by_type"][memory_type] = { "count": len(type_memories), "size_mb": sum(self.estimate_memory_size(m) for m in type_memories) / (1024*1024), "avg_size_kb": sum(self.estimate_memory_size(m) for m in type_memories) / len(type_memories) / 1024 } # Analyze by age now = datetime.now() age_buckets = {"week": 7, "month": 30, "quarter": 90, "year": 365} for bucket_name, days in age_buckets.items(): cutoff = now - timedelta(days=days) old_memories = [m for m in memories if m.created_at < cutoff] analysis["by_age"][f"older_than_{bucket_name}"] = { "count": len(old_memories), "size_mb": sum(self.estimate_memory_size(m) for m in old_memories) / (1024*1024) } # Find optimization opportunities analysis["optimization_opportunities"] = self.find_optimization_opportunities(memories) return analysis def find_optimization_opportunities(self, memories): """Identify specific optimization opportunities""" opportunities = [] # Large, rarely accessed memories large_unused = [ m for m in memories if self.estimate_memory_size(m) > 10*1024 # > 10KB and m.access_count < 2 and (datetime.now() - m.created_at).days > 30 ] if large_unused: opportunities.append({ "type": "compress_unused", "count": len(large_unused), "potential_savings_mb": sum(self.estimate_memory_size(m) for m in large_unused) * 0.7 / (1024*1024) }) # Duplicate or near-duplicate memories duplicates = self.find_near_duplicates(memories) if duplicates: opportunities.append({ "type": "deduplicate", "count": len(duplicates), "potential_savings_mb": sum(self.estimate_memory_size(m) for m in duplicates) / (1024*1024) }) # Low-importance memories taking up space low_importance = [m for m in memories if m.importance < 0.2] if low_importance: opportunities.append({ "type": "remove_low_importance", "count": len(low_importance), "potential_savings_mb": sum(self.estimate_memory_size(m) for m in low_importance) / (1024*1024) }) return opportunities

Automated Optimization:

class AutomatedMemoryOptimizer: def __init__(self, storage_budget_mb=50, optimization_schedule="daily"): self.storage_budget = storage_budget_mb self.schedule = optimization_schedule def run_optimization(self, user_id): """Run automated memory optimization""" current_usage = self.get_storage_usage(user_id) if current_usage < self.storage_budget * 0.8: return "No optimization needed" # Priority-based optimization optimizations = [ self.remove_expired_memories, self.compress_old_interactions, self.deduplicate_similar_memories, self.archive_low_importance_memories, self.summarize_interaction_clusters ] for optimization_func in optimizations: optimization_func(user_id) current_usage = self.get_storage_usage(user_id) if current_usage <= self.storage_budget: break return f"Optimized to {current_usage:.1f}MB" def compress_old_interactions(self, user_id): """Compress old interaction memories""" cutoff = datetime.now() - timedelta(days=30) old_interactions = self.get_memories( user_id, memory_type="interaction", created_before=cutoff, access_count__lt=3 ) for memory in old_interactions: # Summarize using LLM summary = self.llm.summarize( memory.content, max_tokens=100, preserve_key_facts=True ) # Replace with summary compressed_memory = memory.copy( content=summary, is_compressed=True, original_size=len(memory.content) ) self.replace_memory(memory.id, compressed_memory)

Debugging Tools

How do I debug memory retrieval issues?

Build debugging tools into your memory system from the start:

Memory Retrieval Debugger:

class MemoryDebugger: def __init__(self, memory_manager): self.memory_manager = memory_manager self.debug_mode = True def debug_retrieval_pipeline(self, user_id, query): """Step through retrieval pipeline with detailed logging""" print(f"🔍 Debugging memory retrieval for query: '{query}'") print("-" * 60) # Stage 1: Initial candidate selection print("1. Candidate Selection:") candidates = self.memory_manager.get_candidate_memories(user_id) print(f" Found {len(candidates)} candidate memories") if self.debug_mode: for i, candidate in enumerate(candidates[:5]): print(f" {i+1}. [{candidate.memory_type}] {candidate.content[:50]}...") # Stage 2: Embedding generation print("\n2. Query Embedding:") query_start = time.time() query_embedding = self.memory_manager.generate_embedding(query) query_time = time.time() - query_start print(f" Generated embedding in {query_time:.3f}s") print(f" Embedding dimensions: {len(query_embedding)}") # Stage 3: Similarity computation print("\n3. Similarity Computation:") similarities = [] for candidate in candidates: similarity = self.compute_similarity(query_embedding, candidate.embedding) similarities.append((candidate, similarity)) similarities.sort(key=lambda x: x[1], reverse=True) print(f" Computed similarities for {len(similarities)} memories") if self.debug_mode: print(" Top 5 matches:") for i, (memory, score) in enumerate(similarities[:5]): print(f" {i+1}. Score: {score:.3f} - {memory.content[:40]}...") # Stage 4: Filtering and ranking print("\n4. Filtering & Ranking:") filtered = self.memory_manager.apply_filters(similarities, user_id) print(f" {len(filtered)} memories after filtering") final_results = self.memory_manager.final_ranking(filtered, query) print(f" {len(final_results)} memories in final results") # Stage 5: Analysis print("\n5. Analysis:") if not final_results: print(" ⚠️ No memories retrieved!") self.diagnose_empty_results(user_id, query, candidates, similarities) else: best_score = final_results[0][1] if final_results else 0 print(f" Best match score: {best_score:.3f}") if best_score < 0.5: print(" ⚠️ Low similarity scores - consider expanding search") return final_results def diagnose_empty_results(self, user_id, query, candidates, similarities): """Diagnose why no memories were retrieved""" print(" Diagnosis:") if not candidates: print(" - No candidate memories found") print(" - Check user has stored memories") elif not similarities or max(s[1] for s in similarities) < 0.3: print(" - Poor similarity scores") print(" - Consider different embedding model or query preprocessing") else: print(" - Memories found but filtered out") print(" - Check filtering criteria (importance, recency, etc.)") def memory_search_playground(self, user_id): """Interactive memory search for debugging""" print("Memory Search Playground") print("Type 'quit' to exit") while True: query = input("\nEnter search query: ") if query.lower() == 'quit': break results = self.debug_retrieval_pipeline(user_id, query) if results: print(f"\n📝 Retrieved {len(results)} memories:") for i, (memory, score) in enumerate(results): print(f"{i+1}. [{score:.3f}] {memory.content}") else: print("\n❌ No memories retrieved")

How do I test memory system reliability?

Automated testing ensures your memory system works correctly:

Memory System Test Suite:

import pytest from unittest import TestCase class MemorySystemTestSuite(TestCase): def setUp(self): """Set up test environment""" self.memory_manager = MemoryManager(test_mode=True) self.test_user_id = "test_user_123" def test_basic_storage_and_retrieval(self): """Test basic memory storage and retrieval""" # Store a memory content = "User prefers dark mode interface" memory_id = self.memory_manager.store_memory( self.test_user_id, content, memory_type="preference", importance=0.8 ) # Retrieve it retrieved = self.memory_manager.get_memory(memory_id) self.assertIsNotNone(retrieved) self.assertEqual(retrieved.content, content) def test_semantic_search(self): """Test semantic similarity search""" # Store related memories memories = [ "User likes Italian food", "User enjoys pasta dishes", "User prefers vegetarian options", "User works in software engineering", "User lives in San Francisco" ] for content in memories: self.memory_manager.store_memory(self.test_user_id, content) # Search for food-related memories results = self.memory_manager.search_memories( self.test_user_id, "What food does the user like?", limit=3 ) # Check that food-related memories are returned food_memories = [r.content for r in results] self.assertIn("User likes Italian food", food_memories) self.assertIn("User enjoys pasta dishes", food_memories) def test_memory_conflict_resolution(self): """Test handling of conflicting memories""" # Store initial preference self.memory_manager.store_memory( self.test_user_id, "User prefers dark mode", memory_type="preference", confidence=0.8 ) # Store conflicting preference conflict_result = self.memory_manager.store_memory( self.test_user_id, "User prefers light mode", memory_type="preference", confidence=0.9 ) # Verify conflict was handled self.assertIn("conflict_resolved", conflict_result.metadata) # Check final state mode_memories = self.memory_manager.search_memories( self.test_user_id, "user interface preference" ) # Should have higher confidence preference or marked conflict self.assertTrue(len(mode_memories) >= 1) def test_memory_importance_learning(self): """Test that memory importance updates based on usage""" # Store memory with initial importance memory_id = self.memory_manager.store_memory( self.test_user_id, "User's favorite restaurant is Joe's Pizza", importance=0.5 ) initial_memory = self.memory_manager.get_memory(memory_id) initial_importance = initial_memory.importance # Simulate multiple accesses for _ in range(10): self.memory_manager.access_memory(memory_id) # Run learning update self.memory_manager.update_memory_importance(self.test_user_id) # Check that importance increased updated_memory = self.memory_manager.get_memory(memory_id) self.assertGreater(updated_memory.importance, initial_importance) def test_storage_budget_compliance(self): """Test that storage stays within budget""" # Set small storage budget self.memory_manager.set_storage_budget(self.test_user_id, max_mb=1) # Store many memories to exceed budget for i in range(100): self.memory_manager.store_memory( self.test_user_id, f"This is test memory number {i} with some content to make it larger " * 10, importance=0.3 ) # Check storage is within budget current_usage = self.memory_manager.get_storage_usage(self.test_user_id) self.assertLessEqual(current_usage, 1.1) # Allow 10% tolerance def test_memory_privacy_deletion(self): """Test privacy-compliant memory deletion""" # Store some memories memory_ids = [] for i in range(5): memory_id = self.memory_manager.store_memory( self.test_user_id, f"Personal information {i}" ) memory_ids.append(memory_id) # Request privacy deletion self.memory_manager.delete_all_user_data(self.test_user_id) # Verify all memories are deleted for memory_id in memory_ids: retrieved = self.memory_manager.get_memory(memory_id) self.assertIsNone(retrieved) # Verify user can start fresh new_memory_id = self.memory_manager.store_memory( self.test_user_id, "Fresh start memory" ) self.assertIsNotNone(new_memory_id) def run_memory_reliability_tests(): """Run comprehensive memory system tests""" suite = unittest.TestLoader().loadTestsFromTestCase(MemorySystemTestSuite) runner = unittest.TextTestRunner(verbosity=2) result = runner.run(suite) if result.wasSuccessful(): print("✅ All memory system tests passed!") else: print(f"❌ {len(result.failures)} test(s) failed") for failure in result.failures: print(f"Failed: {failure[0]}") return result.wasSuccessful()

How do I monitor memory system performance in production?

Production monitoring helps catch issues before they affect users:

Production Monitoring Setup:

import logging from dataclasses import dataclass from typing import Dict, List import time @dataclass class MemoryMetrics: timestamp: datetime user_id: str operation: str duration_ms: float memory_count: int storage_mb: float cache_hit_rate: float error_count: int class MemorySystemMonitor: def __init__(self): self.metrics_buffer = [] self.alert_thresholds = { "avg_response_time_ms": 500, "error_rate_percent": 5, "storage_usage_mb": 100, "cache_hit_rate_percent": 70 } def log_operation(self, user_id, operation, duration_ms, **kwargs): """Log memory operation for monitoring""" metrics = MemoryMetrics( timestamp=datetime.now(), user_id=user_id, operation=operation, duration_ms=duration_ms, memory_count=kwargs.get("memory_count", 0), storage_mb=kwargs.get("storage_mb", 0), cache_hit_rate=kwargs.get("cache_hit_rate", 0), error_count=kwargs.get("error_count", 0) ) self.metrics_buffer.append(metrics) # Check for immediate alerts self.check_thresholds(metrics) # Flush buffer periodically if len(self.metrics_buffer) >= 100: self.flush_metrics() def check_thresholds(self, metrics): """Check if metrics exceed alert thresholds""" alerts = [] if metrics.duration_ms > self.alert_thresholds["avg_response_time_ms"]: alerts.append(f"Slow response: {metrics.duration_ms:.1f}ms for {metrics.operation}") if metrics.storage_mb > self.alert_thresholds["storage_usage_mb"]: alerts.append(f"High storage usage: {metrics.storage_mb:.1f}MB for user {metrics.user_id}") if metrics.cache_hit_rate < self.alert_thresholds["cache_hit_rate_percent"]: alerts.append(f"Low cache hit rate: {metrics.cache_hit_rate:.1f}% for user {metrics.user_id}") for alert in alerts: logging.warning(f"Memory system alert: {alert}") def generate_daily_report(self): """Generate daily performance report""" if not self.metrics_buffer: return "No metrics available" # Calculate aggregated metrics total_operations = len(self.metrics_buffer) avg_response_time = sum(m.duration_ms for m in self.metrics_buffer) / total_operations error_count = sum(m.error_count for m in self.metrics_buffer) error_rate = (error_count / total_operations) * 100 unique_users = len(set(m.user_id for m in self.metrics_buffer)) total_storage = sum(m.storage_mb for m in self.metrics_buffer if m.storage_mb > 0) report = f""" Memory System Daily Report - {datetime.now().strftime('%Y-%m-%d')} {'=' * 60} Performance Metrics: - Total operations: {total_operations} - Average response time: {avg_response_time:.1f}ms - Error rate: {error_rate:.1f}% - Active users: {unique_users} - Total storage used: {total_storage:.1f}MB Top Operations: {self.get_top_operations()} Performance Trends: {self.get_performance_trends()} Recommendations: {self.generate_recommendations()} """ return report def get_top_operations(self): """Get most common operations""" operation_counts = {} for metric in self.metrics_buffer: operation_counts[metric.operation] = operation_counts.get(metric.operation, 0) + 1 sorted_ops = sorted(operation_counts.items(), key=lambda x: x[1], reverse=True) return "\n".join([f"- {op}: {count} times" for op, count in sorted_ops[:5]]) def generate_recommendations(self): """Generate optimization recommendations""" recommendations = [] avg_response_time = sum(m.duration_ms for m in self.metrics_buffer) / len(self.metrics_buffer) if avg_response_time > 200: recommendations.append("Consider adding caching or optimizing database queries") high_storage_users = [m for m in self.metrics_buffer if m.storage_mb > 50] if len(high_storage_users) > len(self.metrics_buffer) * 0.1: recommendations.append("Implement memory compression for high-usage users") if not recommendations: recommendations.append("System performing within normal parameters") return "\n".join([f"- {rec}" for rec in recommendations]) # Usage example monitor = MemorySystemMonitor() class MonitoredMemoryManager: def __init__(self): self.monitor = MemorySystemMonitor() def store_memory(self, user_id, content, **kwargs): start_time = time.time() try: # Your memory storage logic result = self._actual_store_memory(user_id, content, **kwargs) # Log successful operation duration_ms = (time.time() - start_time) * 1000 self.monitor.log_operation( user_id=user_id, operation="store_memory", duration_ms=duration_ms, memory_count=1, error_count=0 ) return result except Exception as e: # Log error duration_ms = (time.time() - start_time) * 1000 self.monitor.log_operation( user_id=user_id, operation="store_memory", duration_ms=duration_ms, memory_count=0, error_count=1 ) raise e

For additional help, check our Implementation Examples and Best Practices Guide.