Troubleshooting FAQ
Common Memory Issues
My agent is forgetting important information
This is one of the most common memory system problems. Let’s diagnose and fix it:
Diagnostic steps:
- Check if information is being stored at all
- Verify retrieval is working correctly
- Examine memory importance scoring
- Look for memory conflicts or overwrites
Step 1: Verify Storage
# Add debugging to your memory storage
class DebugMemoryManager:
def store_memory(self, user_id, content, **kwargs):
print(f"Storing memory for user {user_id}: {content[:100]}...")
# Store the memory
memory_id = self.memory_store.create_memory(user_id, content, **kwargs)
# Verify it was stored
retrieved = self.memory_store.get_memory(memory_id)
if retrieved:
print(f"âś“ Memory stored successfully: {memory_id}")
else:
print(f"âś— Failed to store memory: {memory_id}")
return memory_idStep 2: Check Retrieval Logic
def debug_memory_retrieval(self, user_id, query):
print(f"Searching memories for: {query}")
# Check total memories for user
all_memories = self.get_all_memories(user_id)
print(f"Total memories for user: {len(all_memories)}")
# Test similarity search
similar = self.find_similar_memories(query, limit=10)
print(f"Similar memories found: {len(similar)}")
for i, (memory, score) in enumerate(similar):
print(f" {i+1}. Score: {score:.3f}, Content: {memory.content[:50]}...")
return similarCommon fixes:
- Low importance scores: Memories being scored too low and filtered out
- Poor embedding quality: Text not embedding well for similarity search
- Retrieval threshold too high: Similarity threshold excluding relevant memories
- Memory expiration: Automatic cleanup removing important data
Quick fix pattern:
# Boost importance for user corrections
if context.get("user_emphasized"):
importance_score *= 1.5
# Lower retrieval threshold for sparse memory
if len(candidate_memories) < 5:
similarity_threshold *= 0.7Memory retrieval is too slow
Performance problems often emerge as memory stores grow. Here’s how to diagnose and optimize:
Performance Debugging:
import time
from functools import wraps
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} took {end - start:.3f} seconds")
return result
return wrapper
class PerformanceDebugMemoryManager:
@timing_decorator
def retrieve_memories(self, user_id, query):
# Your retrieval logic here
pass
def profile_memory_performance(self, user_id):
"""Profile different aspects of memory performance"""
# Test database query speed
start = time.time()
memories = self.db.get_user_memories(user_id, limit=1000)
db_time = time.time() - start
# Test embedding generation speed
start = time.time()
embedding = self.generate_embedding("test query")
embedding_time = time.time() - start
# Test similarity search speed
start = time.time()
similar = self.vector_db.similarity_search(embedding, top_k=50)
vector_search_time = time.time() - start
print(f"Performance Profile:")
print(f" Database query: {db_time:.3f}s")
print(f" Embedding generation: {embedding_time:.3f}s")
print(f" Vector search: {vector_search_time:.3f}s")
return {
"db_time": db_time,
"embedding_time": embedding_time,
"vector_search_time": vector_search_time
}Optimization strategies:
1. Database Optimization:
-- Add proper indexes
CREATE INDEX CONCURRENTLY idx_memories_user_timestamp
ON memories(user_id, created_at DESC);
CREATE INDEX CONCURRENTLY idx_memories_importance
ON memories(importance DESC)
WHERE importance > 0.5;
-- Optimize vector similarity search
CREATE INDEX ON memories USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);2. Caching Strategy:
import redis
from functools import lru_cache
class CachedMemoryManager:
def __init__(self):
self.redis_client = redis.Redis()
self.cache_ttl = 300 # 5 minutes
@lru_cache(maxsize=1000)
def get_user_embedding_cache(self, text):
"""Cache embeddings to avoid recomputation"""
return self.generate_embedding(text)
def get_cached_memories(self, user_id, query_hash):
"""Check cache before expensive retrieval"""
cache_key = f"memories:{user_id}:{query_hash}"
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
def cache_memory_results(self, user_id, query_hash, results):
"""Cache retrieval results"""
cache_key = f"memories:{user_id}:{query_hash}"
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(results, default=str)
)3. Retrieval Optimization:
def optimized_memory_retrieval(self, user_id, query, max_memories=10):
"""Multi-stage retrieval with early termination"""
# Stage 1: Quick filter on structured data
candidates = self.db.execute("""
SELECT id, content, importance, embedding
FROM memories
WHERE user_id = %s
AND importance > 0.3
AND created_at > NOW() - INTERVAL '90 days'
ORDER BY importance DESC
LIMIT 100
""", [user_id])
if len(candidates) < 5:
# Not enough candidates, expand search
candidates = self.expand_candidate_search(user_id)
# Stage 2: Batch embedding computation
query_embedding = self.get_cached_embedding(query)
# Stage 3: Efficient similarity computation
similarities = self.batch_similarity_search(query_embedding, candidates)
# Stage 4: Early termination if good matches found
top_matches = similarities[:max_memories]
if top_matches and top_matches[0].similarity > 0.8:
return top_matches # High confidence, return early
# Stage 5: Expand search if needed
return self.expanded_search(user_id, query, query_embedding)Memory is storing conflicting information
Handling contradictory information is crucial for reliable memory systems:
Conflict Detection:
class MemoryConflictResolver:
def detect_conflicts(self, user_id, new_memory):
"""Detect potential conflicts before storing"""
# Find semantically similar memories
similar_memories = self.find_similar_memories(
new_memory.content,
user_id=user_id,
similarity_threshold=0.7
)
conflicts = []
for existing in similar_memories:
conflict_type = self.analyze_conflict(existing, new_memory)
if conflict_type != "no_conflict":
conflicts.append({
"existing_memory": existing,
"new_memory": new_memory,
"conflict_type": conflict_type,
"severity": self.assess_conflict_severity(existing, new_memory)
})
return conflicts
def analyze_conflict(self, memory1, memory2):
"""Determine type of conflict between memories"""
# Use LLM to analyze semantic conflict
prompt = f"""
Analyze these two pieces of information for conflicts:
Memory 1: {memory1.content}
Memory 2: {memory2.content}
Are these conflicting, complementary, or unrelated?
Return: "direct_conflict", "partial_conflict", "complementary", or "no_conflict"
"""
conflict_type = self.llm.query(prompt).strip()
return conflict_type
def resolve_conflict(self, conflict):
"""Resolve detected conflicts"""
existing = conflict["existing_memory"]
new_memory = conflict["new_memory"]
conflict_type = conflict["conflict_type"]
if conflict_type == "direct_conflict":
# Choose based on confidence, recency, and source
if self.should_replace(existing, new_memory):
self.replace_memory(existing.id, new_memory)
return "replaced"
else:
self.mark_memory_disputed(existing.id, new_memory)
return "marked_disputed"
elif conflict_type == "partial_conflict":
# Try to merge if possible
merged = self.attempt_merge(existing, new_memory)
if merged:
self.replace_memory(existing.id, merged)
return "merged"
else:
# Store both with conflict notation
self.store_with_conflict_note(new_memory, existing.id)
return "stored_with_note"
def should_replace(self, existing, new_memory):
"""Decide whether new memory should replace existing"""
# Factors to consider
recency_weight = 0.3
confidence_weight = 0.4
source_weight = 0.3
existing_score = (
existing.confidence * confidence_weight +
self.source_reliability(existing.source) * source_weight +
(1 - self.age_factor(existing)) * recency_weight
)
new_score = (
new_memory.confidence * confidence_weight +
self.source_reliability(new_memory.source) * source_weight +
1.0 * recency_weight # New memories get full recency score
)
return new_score > existing_score + 0.1 # Slight bias for existingPrevention strategies:
def validate_memory_before_storage(self, user_id, content, source="unknown"):
"""Validate memory for consistency before storing"""
# Check for obvious contradictions
user_profile = self.get_user_profile(user_id)
# Cross-reference with existing facts
potential_conflicts = self.find_potential_conflicts(user_id, content)
if potential_conflicts:
# Flag for human review or additional validation
return self.flag_for_review(content, potential_conflicts)
# Store with source attribution
return self.store_memory_with_source(user_id, content, source)My memory system is using too much storage/cost
Memory storage costs can grow quickly. Here’s how to optimize:
Storage Analysis:
class StorageAnalyzer:
def analyze_memory_usage(self, user_id):
"""Comprehensive storage usage analysis"""
memories = self.get_all_memories(user_id)
analysis = {
"total_memories": len(memories),
"total_size_mb": sum(self.estimate_memory_size(m) for m in memories) / (1024*1024),
"by_type": {},
"by_age": {},
"by_access": {},
"optimization_opportunities": []
}
# Analyze by type
for memory_type in set(m.memory_type for m in memories):
type_memories = [m for m in memories if m.memory_type == memory_type]
analysis["by_type"][memory_type] = {
"count": len(type_memories),
"size_mb": sum(self.estimate_memory_size(m) for m in type_memories) / (1024*1024),
"avg_size_kb": sum(self.estimate_memory_size(m) for m in type_memories) / len(type_memories) / 1024
}
# Analyze by age
now = datetime.now()
age_buckets = {"week": 7, "month": 30, "quarter": 90, "year": 365}
for bucket_name, days in age_buckets.items():
cutoff = now - timedelta(days=days)
old_memories = [m for m in memories if m.created_at < cutoff]
analysis["by_age"][f"older_than_{bucket_name}"] = {
"count": len(old_memories),
"size_mb": sum(self.estimate_memory_size(m) for m in old_memories) / (1024*1024)
}
# Find optimization opportunities
analysis["optimization_opportunities"] = self.find_optimization_opportunities(memories)
return analysis
def find_optimization_opportunities(self, memories):
"""Identify specific optimization opportunities"""
opportunities = []
# Large, rarely accessed memories
large_unused = [
m for m in memories
if self.estimate_memory_size(m) > 10*1024 # > 10KB
and m.access_count < 2
and (datetime.now() - m.created_at).days > 30
]
if large_unused:
opportunities.append({
"type": "compress_unused",
"count": len(large_unused),
"potential_savings_mb": sum(self.estimate_memory_size(m) for m in large_unused) * 0.7 / (1024*1024)
})
# Duplicate or near-duplicate memories
duplicates = self.find_near_duplicates(memories)
if duplicates:
opportunities.append({
"type": "deduplicate",
"count": len(duplicates),
"potential_savings_mb": sum(self.estimate_memory_size(m) for m in duplicates) / (1024*1024)
})
# Low-importance memories taking up space
low_importance = [m for m in memories if m.importance < 0.2]
if low_importance:
opportunities.append({
"type": "remove_low_importance",
"count": len(low_importance),
"potential_savings_mb": sum(self.estimate_memory_size(m) for m in low_importance) / (1024*1024)
})
return opportunitiesAutomated Optimization:
class AutomatedMemoryOptimizer:
def __init__(self, storage_budget_mb=50, optimization_schedule="daily"):
self.storage_budget = storage_budget_mb
self.schedule = optimization_schedule
def run_optimization(self, user_id):
"""Run automated memory optimization"""
current_usage = self.get_storage_usage(user_id)
if current_usage < self.storage_budget * 0.8:
return "No optimization needed"
# Priority-based optimization
optimizations = [
self.remove_expired_memories,
self.compress_old_interactions,
self.deduplicate_similar_memories,
self.archive_low_importance_memories,
self.summarize_interaction_clusters
]
for optimization_func in optimizations:
optimization_func(user_id)
current_usage = self.get_storage_usage(user_id)
if current_usage <= self.storage_budget:
break
return f"Optimized to {current_usage:.1f}MB"
def compress_old_interactions(self, user_id):
"""Compress old interaction memories"""
cutoff = datetime.now() - timedelta(days=30)
old_interactions = self.get_memories(
user_id,
memory_type="interaction",
created_before=cutoff,
access_count__lt=3
)
for memory in old_interactions:
# Summarize using LLM
summary = self.llm.summarize(
memory.content,
max_tokens=100,
preserve_key_facts=True
)
# Replace with summary
compressed_memory = memory.copy(
content=summary,
is_compressed=True,
original_size=len(memory.content)
)
self.replace_memory(memory.id, compressed_memory)Debugging Tools
How do I debug memory retrieval issues?
Build debugging tools into your memory system from the start:
Memory Retrieval Debugger:
class MemoryDebugger:
def __init__(self, memory_manager):
self.memory_manager = memory_manager
self.debug_mode = True
def debug_retrieval_pipeline(self, user_id, query):
"""Step through retrieval pipeline with detailed logging"""
print(f"🔍 Debugging memory retrieval for query: '{query}'")
print("-" * 60)
# Stage 1: Initial candidate selection
print("1. Candidate Selection:")
candidates = self.memory_manager.get_candidate_memories(user_id)
print(f" Found {len(candidates)} candidate memories")
if self.debug_mode:
for i, candidate in enumerate(candidates[:5]):
print(f" {i+1}. [{candidate.memory_type}] {candidate.content[:50]}...")
# Stage 2: Embedding generation
print("\n2. Query Embedding:")
query_start = time.time()
query_embedding = self.memory_manager.generate_embedding(query)
query_time = time.time() - query_start
print(f" Generated embedding in {query_time:.3f}s")
print(f" Embedding dimensions: {len(query_embedding)}")
# Stage 3: Similarity computation
print("\n3. Similarity Computation:")
similarities = []
for candidate in candidates:
similarity = self.compute_similarity(query_embedding, candidate.embedding)
similarities.append((candidate, similarity))
similarities.sort(key=lambda x: x[1], reverse=True)
print(f" Computed similarities for {len(similarities)} memories")
if self.debug_mode:
print(" Top 5 matches:")
for i, (memory, score) in enumerate(similarities[:5]):
print(f" {i+1}. Score: {score:.3f} - {memory.content[:40]}...")
# Stage 4: Filtering and ranking
print("\n4. Filtering & Ranking:")
filtered = self.memory_manager.apply_filters(similarities, user_id)
print(f" {len(filtered)} memories after filtering")
final_results = self.memory_manager.final_ranking(filtered, query)
print(f" {len(final_results)} memories in final results")
# Stage 5: Analysis
print("\n5. Analysis:")
if not final_results:
print(" ⚠️ No memories retrieved!")
self.diagnose_empty_results(user_id, query, candidates, similarities)
else:
best_score = final_results[0][1] if final_results else 0
print(f" Best match score: {best_score:.3f}")
if best_score < 0.5:
print(" ⚠️ Low similarity scores - consider expanding search")
return final_results
def diagnose_empty_results(self, user_id, query, candidates, similarities):
"""Diagnose why no memories were retrieved"""
print(" Diagnosis:")
if not candidates:
print(" - No candidate memories found")
print(" - Check user has stored memories")
elif not similarities or max(s[1] for s in similarities) < 0.3:
print(" - Poor similarity scores")
print(" - Consider different embedding model or query preprocessing")
else:
print(" - Memories found but filtered out")
print(" - Check filtering criteria (importance, recency, etc.)")
def memory_search_playground(self, user_id):
"""Interactive memory search for debugging"""
print("Memory Search Playground")
print("Type 'quit' to exit")
while True:
query = input("\nEnter search query: ")
if query.lower() == 'quit':
break
results = self.debug_retrieval_pipeline(user_id, query)
if results:
print(f"\n📝 Retrieved {len(results)} memories:")
for i, (memory, score) in enumerate(results):
print(f"{i+1}. [{score:.3f}] {memory.content}")
else:
print("\n❌ No memories retrieved")How do I test memory system reliability?
Automated testing ensures your memory system works correctly:
Memory System Test Suite:
import pytest
from unittest import TestCase
class MemorySystemTestSuite(TestCase):
def setUp(self):
"""Set up test environment"""
self.memory_manager = MemoryManager(test_mode=True)
self.test_user_id = "test_user_123"
def test_basic_storage_and_retrieval(self):
"""Test basic memory storage and retrieval"""
# Store a memory
content = "User prefers dark mode interface"
memory_id = self.memory_manager.store_memory(
self.test_user_id,
content,
memory_type="preference",
importance=0.8
)
# Retrieve it
retrieved = self.memory_manager.get_memory(memory_id)
self.assertIsNotNone(retrieved)
self.assertEqual(retrieved.content, content)
def test_semantic_search(self):
"""Test semantic similarity search"""
# Store related memories
memories = [
"User likes Italian food",
"User enjoys pasta dishes",
"User prefers vegetarian options",
"User works in software engineering",
"User lives in San Francisco"
]
for content in memories:
self.memory_manager.store_memory(self.test_user_id, content)
# Search for food-related memories
results = self.memory_manager.search_memories(
self.test_user_id,
"What food does the user like?",
limit=3
)
# Check that food-related memories are returned
food_memories = [r.content for r in results]
self.assertIn("User likes Italian food", food_memories)
self.assertIn("User enjoys pasta dishes", food_memories)
def test_memory_conflict_resolution(self):
"""Test handling of conflicting memories"""
# Store initial preference
self.memory_manager.store_memory(
self.test_user_id,
"User prefers dark mode",
memory_type="preference",
confidence=0.8
)
# Store conflicting preference
conflict_result = self.memory_manager.store_memory(
self.test_user_id,
"User prefers light mode",
memory_type="preference",
confidence=0.9
)
# Verify conflict was handled
self.assertIn("conflict_resolved", conflict_result.metadata)
# Check final state
mode_memories = self.memory_manager.search_memories(
self.test_user_id,
"user interface preference"
)
# Should have higher confidence preference or marked conflict
self.assertTrue(len(mode_memories) >= 1)
def test_memory_importance_learning(self):
"""Test that memory importance updates based on usage"""
# Store memory with initial importance
memory_id = self.memory_manager.store_memory(
self.test_user_id,
"User's favorite restaurant is Joe's Pizza",
importance=0.5
)
initial_memory = self.memory_manager.get_memory(memory_id)
initial_importance = initial_memory.importance
# Simulate multiple accesses
for _ in range(10):
self.memory_manager.access_memory(memory_id)
# Run learning update
self.memory_manager.update_memory_importance(self.test_user_id)
# Check that importance increased
updated_memory = self.memory_manager.get_memory(memory_id)
self.assertGreater(updated_memory.importance, initial_importance)
def test_storage_budget_compliance(self):
"""Test that storage stays within budget"""
# Set small storage budget
self.memory_manager.set_storage_budget(self.test_user_id, max_mb=1)
# Store many memories to exceed budget
for i in range(100):
self.memory_manager.store_memory(
self.test_user_id,
f"This is test memory number {i} with some content to make it larger " * 10,
importance=0.3
)
# Check storage is within budget
current_usage = self.memory_manager.get_storage_usage(self.test_user_id)
self.assertLessEqual(current_usage, 1.1) # Allow 10% tolerance
def test_memory_privacy_deletion(self):
"""Test privacy-compliant memory deletion"""
# Store some memories
memory_ids = []
for i in range(5):
memory_id = self.memory_manager.store_memory(
self.test_user_id,
f"Personal information {i}"
)
memory_ids.append(memory_id)
# Request privacy deletion
self.memory_manager.delete_all_user_data(self.test_user_id)
# Verify all memories are deleted
for memory_id in memory_ids:
retrieved = self.memory_manager.get_memory(memory_id)
self.assertIsNone(retrieved)
# Verify user can start fresh
new_memory_id = self.memory_manager.store_memory(
self.test_user_id,
"Fresh start memory"
)
self.assertIsNotNone(new_memory_id)
def run_memory_reliability_tests():
"""Run comprehensive memory system tests"""
suite = unittest.TestLoader().loadTestsFromTestCase(MemorySystemTestSuite)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
if result.wasSuccessful():
print("âś… All memory system tests passed!")
else:
print(f"❌ {len(result.failures)} test(s) failed")
for failure in result.failures:
print(f"Failed: {failure[0]}")
return result.wasSuccessful()How do I monitor memory system performance in production?
Production monitoring helps catch issues before they affect users:
Production Monitoring Setup:
import logging
from dataclasses import dataclass
from typing import Dict, List
import time
@dataclass
class MemoryMetrics:
timestamp: datetime
user_id: str
operation: str
duration_ms: float
memory_count: int
storage_mb: float
cache_hit_rate: float
error_count: int
class MemorySystemMonitor:
def __init__(self):
self.metrics_buffer = []
self.alert_thresholds = {
"avg_response_time_ms": 500,
"error_rate_percent": 5,
"storage_usage_mb": 100,
"cache_hit_rate_percent": 70
}
def log_operation(self, user_id, operation, duration_ms, **kwargs):
"""Log memory operation for monitoring"""
metrics = MemoryMetrics(
timestamp=datetime.now(),
user_id=user_id,
operation=operation,
duration_ms=duration_ms,
memory_count=kwargs.get("memory_count", 0),
storage_mb=kwargs.get("storage_mb", 0),
cache_hit_rate=kwargs.get("cache_hit_rate", 0),
error_count=kwargs.get("error_count", 0)
)
self.metrics_buffer.append(metrics)
# Check for immediate alerts
self.check_thresholds(metrics)
# Flush buffer periodically
if len(self.metrics_buffer) >= 100:
self.flush_metrics()
def check_thresholds(self, metrics):
"""Check if metrics exceed alert thresholds"""
alerts = []
if metrics.duration_ms > self.alert_thresholds["avg_response_time_ms"]:
alerts.append(f"Slow response: {metrics.duration_ms:.1f}ms for {metrics.operation}")
if metrics.storage_mb > self.alert_thresholds["storage_usage_mb"]:
alerts.append(f"High storage usage: {metrics.storage_mb:.1f}MB for user {metrics.user_id}")
if metrics.cache_hit_rate < self.alert_thresholds["cache_hit_rate_percent"]:
alerts.append(f"Low cache hit rate: {metrics.cache_hit_rate:.1f}% for user {metrics.user_id}")
for alert in alerts:
logging.warning(f"Memory system alert: {alert}")
def generate_daily_report(self):
"""Generate daily performance report"""
if not self.metrics_buffer:
return "No metrics available"
# Calculate aggregated metrics
total_operations = len(self.metrics_buffer)
avg_response_time = sum(m.duration_ms for m in self.metrics_buffer) / total_operations
error_count = sum(m.error_count for m in self.metrics_buffer)
error_rate = (error_count / total_operations) * 100
unique_users = len(set(m.user_id for m in self.metrics_buffer))
total_storage = sum(m.storage_mb for m in self.metrics_buffer if m.storage_mb > 0)
report = f"""
Memory System Daily Report - {datetime.now().strftime('%Y-%m-%d')}
{'=' * 60}
Performance Metrics:
- Total operations: {total_operations}
- Average response time: {avg_response_time:.1f}ms
- Error rate: {error_rate:.1f}%
- Active users: {unique_users}
- Total storage used: {total_storage:.1f}MB
Top Operations:
{self.get_top_operations()}
Performance Trends:
{self.get_performance_trends()}
Recommendations:
{self.generate_recommendations()}
"""
return report
def get_top_operations(self):
"""Get most common operations"""
operation_counts = {}
for metric in self.metrics_buffer:
operation_counts[metric.operation] = operation_counts.get(metric.operation, 0) + 1
sorted_ops = sorted(operation_counts.items(), key=lambda x: x[1], reverse=True)
return "\n".join([f"- {op}: {count} times" for op, count in sorted_ops[:5]])
def generate_recommendations(self):
"""Generate optimization recommendations"""
recommendations = []
avg_response_time = sum(m.duration_ms for m in self.metrics_buffer) / len(self.metrics_buffer)
if avg_response_time > 200:
recommendations.append("Consider adding caching or optimizing database queries")
high_storage_users = [m for m in self.metrics_buffer if m.storage_mb > 50]
if len(high_storage_users) > len(self.metrics_buffer) * 0.1:
recommendations.append("Implement memory compression for high-usage users")
if not recommendations:
recommendations.append("System performing within normal parameters")
return "\n".join([f"- {rec}" for rec in recommendations])
# Usage example
monitor = MemorySystemMonitor()
class MonitoredMemoryManager:
def __init__(self):
self.monitor = MemorySystemMonitor()
def store_memory(self, user_id, content, **kwargs):
start_time = time.time()
try:
# Your memory storage logic
result = self._actual_store_memory(user_id, content, **kwargs)
# Log successful operation
duration_ms = (time.time() - start_time) * 1000
self.monitor.log_operation(
user_id=user_id,
operation="store_memory",
duration_ms=duration_ms,
memory_count=1,
error_count=0
)
return result
except Exception as e:
# Log error
duration_ms = (time.time() - start_time) * 1000
self.monitor.log_operation(
user_id=user_id,
operation="store_memory",
duration_ms=duration_ms,
memory_count=0,
error_count=1
)
raise eFor additional help, check our Implementation Examples and Best Practices Guide.