Technical FAQ
Implementation Details
What embedding models should I use for memory?
The choice depends on your specific requirements:
General Purpose (Recommended for most):
- OpenAI text-embedding-3-small: Good balance of quality and speed
- OpenAI text-embedding-3-large: Higher quality, slower
- Sentence Transformers all-MiniLM-L6-v2: Free, runs locally
Domain Specific:
- E5-large: Strong performance across domains
- BGE models: Excellent for retrieval tasks
- Custom fine-tuned: For specialized domains
Implementation example:
from openai import OpenAI
import numpy as np
class MemoryEmbedder:
def __init__(self, model="text-embedding-3-small"):
self.client = OpenAI()
self.model = model
def embed_memory(self, text):
"""Convert text to embedding for storage"""
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
def find_similar_memories(self, query, memories, top_k=5):
"""Find most relevant memories using cosine similarity"""
query_embedding = self.embed_memory(query)
similarities = []
for memory in memories:
similarity = np.dot(query_embedding, memory.embedding)
similarities.append((memory, similarity))
# Return top matches
return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]Performance considerations:
- Dimensions: More dimensions = better quality but higher storage costs
- Speed: Smaller models are faster for real-time applications
- Cost: Local models eliminate API costs but need more infrastructure
How do I structure memory data effectively?
Design your memory schema with both storage and retrieval in mind:
Core Memory Structure:
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any, Optional
@dataclass
class Memory:
id: str
user_id: str
content: str # Human-readable memory content
embedding: List[float] # Vector representation
memory_type: str # "preference", "fact", "interaction"
importance: float # 0-1 relevance score
confidence: float # 0-1 confidence in accuracy
created_at: datetime
last_accessed: datetime
access_count: int
tags: List[str]
metadata: Dict[str, Any]
@dataclass
class UserPreference(Memory):
preference_key: str # e.g., "communication_style"
preference_value: Any # e.g., "formal"
source: str # How this was learned
@dataclass
class InteractionMemory(Memory):
conversation_id: str
summary: str
participants: List[str]
outcome: str # Success, failure, neutral
context: Dict[str, Any]Database Schema (PostgreSQL example):
-- Core memories table
CREATE TABLE memories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
content TEXT NOT NULL,
embedding vector(1536), -- For OpenAI embeddings
memory_type VARCHAR(50),
importance FLOAT CHECK (importance >= 0 AND importance <= 1),
confidence FLOAT CHECK (confidence >= 0 AND confidence <= 1),
created_at TIMESTAMP DEFAULT NOW(),
last_accessed TIMESTAMP DEFAULT NOW(),
access_count INTEGER DEFAULT 0,
tags TEXT[],
metadata JSONB
);
-- Indexes for performance
CREATE INDEX idx_memories_user_id ON memories(user_id);
CREATE INDEX idx_memories_type ON memories(memory_type);
CREATE INDEX idx_memories_importance ON memories(importance DESC);
CREATE INDEX idx_memories_embedding ON memories USING ivfflat (embedding vector_cosine_ops);How do I implement memory retrieval efficiently?
Efficient retrieval combines multiple search strategies:
Multi-Stage Retrieval Pipeline:
class MemoryRetriever:
def __init__(self, vector_db, sql_db):
self.vector_db = vector_db
self.sql_db = sql_db
def retrieve_relevant_memories(self, query, user_id, max_memories=10):
"""Multi-stage memory retrieval"""
# Stage 1: Filter by user and basic criteria
candidates = self.sql_db.query("""
SELECT * FROM memories
WHERE user_id = %s
AND importance > 0.3
ORDER BY last_accessed DESC
LIMIT 100
""", [user_id])
# Stage 2: Semantic similarity search
if candidates:
semantic_matches = self.vector_db.similarity_search(
query=query,
candidates=candidates,
top_k=max_memories * 2
)
else:
semantic_matches = []
# Stage 3: Re-rank by relevance and recency
final_memories = self.rerank_memories(
memories=semantic_matches,
query=query,
max_results=max_memories
)
# Stage 4: Update access patterns
self.update_access_stats(final_memories)
return final_memories
def rerank_memories(self, memories, query, max_results):
"""Combine multiple signals for final ranking"""
scored_memories = []
for memory in memories:
# Combine different relevance signals
score = (
memory.semantic_similarity * 0.4 + # How well it matches the query
memory.importance * 0.3 + # Importance score
self.recency_score(memory) * 0.2 + # How recent is it
memory.confidence * 0.1 # How confident are we
)
scored_memories.append((memory, score))
# Return top results
return [
memory for memory, score in
sorted(scored_memories, key=lambda x: x[1], reverse=True)
[:max_results]
]Optimization strategies:
- Indexing: Proper database indexes for common query patterns
- Caching: Cache frequently accessed memories
- Batch processing: Group similar queries for efficiency
- Precomputed: Cache common query results
How do I handle memory updates and consistency?
Memory systems need to handle updates gracefully:
Memory Update Strategies:
class MemoryManager:
def update_memory(self, user_id, new_information, context=None):
"""Handle memory updates with conflict resolution"""
# 1. Check for existing related memories
existing = self.find_conflicting_memories(user_id, new_information)
if not existing:
# No conflicts, just add new memory
return self.create_memory(user_id, new_information)
# 2. Resolve conflicts
for old_memory in existing:
resolution = self.resolve_conflict(old_memory, new_information, context)
if resolution.action == "replace":
self.replace_memory(old_memory.id, new_information)
elif resolution.action == "merge":
self.merge_memories(old_memory, new_information)
elif resolution.action == "keep_both":
self.create_memory(user_id, new_information)
# Keep old memory but lower confidence
self.update_confidence(old_memory.id, resolution.new_confidence)
def resolve_conflict(self, old_memory, new_info, context):
"""Decide how to handle conflicting information"""
# Simple rules-based approach
if context and context.get("source") == "explicit_user_correction":
return ConflictResolution("replace", confidence=0.9)
if new_info.confidence > old_memory.confidence + 0.2:
return ConflictResolution("replace", confidence=new_info.confidence)
if self.are_compatible(old_memory, new_info):
return ConflictResolution("merge", confidence=0.8)
return ConflictResolution("keep_both", new_confidence=0.6)
# Memory consistency checks
class MemoryValidator:
def validate_memory_consistency(self, user_id):
"""Check for and report memory conflicts"""
memories = self.get_all_memories(user_id)
conflicts = []
for i, memory1 in enumerate(memories):
for memory2 in memories[i+1:]:
if self.are_conflicting(memory1, memory2):
conflicts.append({
"memory1": memory1,
"memory2": memory2,
"conflict_type": self.get_conflict_type(memory1, memory2)
})
return conflictsHow do I optimize memory storage costs?
Memory storage can get expensive quickly. Here are optimization strategies:
Storage Cost Optimization:
class MemoryOptimizer:
def __init__(self, storage_budget_mb=100):
self.storage_budget = storage_budget_mb * 1024 * 1024 # bytes
self.compression_ratio = 0.3 # Typical compression ratio
def optimize_storage(self, user_id):
"""Optimize memory storage within budget constraints"""
# 1. Get current storage usage
current_memories = self.get_memories(user_id)
current_size = sum(self.estimate_memory_size(m) for m in current_memories)
if current_size <= self.storage_budget:
return # Within budget
# 2. Score memories for retention
scored_memories = self.score_memories_for_retention(current_memories)
# 3. Remove or compress low-value memories
retained_memories = []
running_size = 0
for memory, score in scored_memories:
estimated_size = self.estimate_memory_size(memory)
if running_size + estimated_size <= self.storage_budget:
retained_memories.append(memory)
running_size += estimated_size
elif score > 0.7: # High-value memories get compressed
compressed = self.compress_memory(memory)
if running_size + compressed.size <= self.storage_budget:
retained_memories.append(compressed)
running_size += compressed.size
# 4. Update storage
self.replace_memories(user_id, retained_memories)
def score_memories_for_retention(self, memories):
"""Score memories based on multiple factors"""
scored = []
for memory in memories:
score = (
memory.importance * 0.4 + # Explicit importance
self.access_frequency_score(memory) * 0.3 + # How often accessed
self.recency_score(memory) * 0.2 + # How recent
memory.confidence * 0.1 # How confident
)
scored.append((memory, score))
return sorted(scored, key=lambda x: x[1], reverse=True)
def compress_memory(self, memory):
"""Compress memory by summarizing or reducing precision"""
if memory.memory_type == "interaction":
# Summarize long interactions
summary = self.summarize_interaction(memory.content)
return memory.copy(content=summary)
elif memory.memory_type == "preference":
# Keep preferences as-is (small anyway)
return memory
else:
# Reduce embedding precision for other types
compressed_embedding = self.reduce_precision(memory.embedding)
return memory.copy(embedding=compressed_embedding)Cost optimization strategies:
- Tiered storage: Keep recent/important memories in fast storage, archive others
- Compression: Summarize old interactions, reduce embedding precision
- Garbage collection: Remove outdated or low-confidence memories
- Deduplication: Merge similar memories
How do I implement memory learning and adaptation?
Memory systems should improve over time:
Learning Implementation:
class MemoryLearner:
def __init__(self):
self.feedback_window = 30 # Days to consider for learning
def learn_from_interactions(self, user_id):
"""Update memory importance based on usage patterns"""
# Get recent interactions and access patterns
recent_memories = self.get_recent_memories(user_id, days=self.feedback_window)
access_stats = self.get_access_statistics(user_id, days=self.feedback_window)
for memory in recent_memories:
# Update importance based on usage
new_importance = self.calculate_learned_importance(memory, access_stats)
self.update_memory_importance(memory.id, new_importance)
def calculate_learned_importance(self, memory, access_stats):
"""Calculate new importance score based on learning signals"""
current_importance = memory.importance
# Learning signals
access_frequency = access_stats.get(memory.id, {}).get("frequency", 0)
user_feedback = self.get_user_feedback_score(memory.id)
context_relevance = self.get_context_relevance_score(memory.id)
# Combine signals with learning rate
learning_rate = 0.1
learned_component = (
access_frequency * 0.4 +
user_feedback * 0.4 +
context_relevance * 0.2
)
new_importance = (
current_importance * (1 - learning_rate) +
learned_component * learning_rate
)
return max(0.0, min(1.0, new_importance))
def adapt_memory_structure(self, user_id):
"""Adapt memory organization based on usage patterns"""
# Identify frequently co-accessed memories
memory_clusters = self.find_memory_clusters(user_id)
# Create summary memories for large clusters
for cluster in memory_clusters:
if len(cluster.memories) > 5:
summary = self.create_cluster_summary(cluster)
self.store_summary_memory(user_id, summary, cluster)Advanced Topics
How do I implement memory sharing between agents?
For multi-agent systems, memory sharing enables collaboration:
Shared Memory Architecture:
class SharedMemoryManager:
def __init__(self):
self.shared_memories = {} # Cross-agent memories
self.agent_private_memories = {} # Agent-specific memories
self.memory_permissions = {} # Access control
def share_memory(self, from_agent, to_agent, memory, permission_level="read"):
"""Share memory between agents with proper permissions"""
# Validate permissions
if not self.can_share(from_agent, memory, permission_level):
raise PermissionError("Cannot share this memory")
# Create shared memory reference
shared_ref = {
"original_memory_id": memory.id,
"shared_by": from_agent,
"permission_level": permission_level,
"shared_at": datetime.now()
}
# Add to recipient's shared memory space
if to_agent not in self.shared_memories:
self.shared_memories[to_agent] = []
self.shared_memories[to_agent].append(shared_ref)
def get_accessible_memories(self, agent_id, query):
"""Get both private and shared memories for an agent"""
# Get agent's private memories
private_memories = self.get_private_memories(agent_id, query)
# Get shared memories
shared_memories = []
if agent_id in self.shared_memories:
for shared_ref in self.shared_memories[agent_id]:
if self.is_memory_accessible(shared_ref, query):
original_memory = self.get_memory(shared_ref["original_memory_id"])
shared_memories.append(original_memory)
return private_memories + shared_memoriesHow do I handle multi-modal memories (text, images, actions)?
Modern agents need to remember more than just text:
Multi-Modal Memory System:
from typing import Union, List
from dataclasses import dataclass
@dataclass
class MultiModalMemory:
id: str
user_id: str
modalities: Dict[str, Any] # text, image, action, etc.
unified_embedding: List[float] # Cross-modal embedding
memory_type: str
created_at: datetime
class MultiModalMemoryManager:
def __init__(self):
self.text_embedder = TextEmbedder()
self.image_embedder = ImageEmbedder()
self.action_embedder = ActionEmbedder()
self.cross_modal_embedder = CrossModalEmbedder()
def store_multimodal_memory(self, user_id, content):
"""Store memory with multiple modalities"""
modalities = {}
embeddings = []
# Process each modality
if "text" in content:
modalities["text"] = content["text"]
text_emb = self.text_embedder.embed(content["text"])
embeddings.append(text_emb)
if "image" in content:
modalities["image"] = content["image"]
img_emb = self.image_embedder.embed(content["image"])
embeddings.append(img_emb)
if "action" in content:
modalities["action"] = content["action"]
action_emb = self.action_embedder.embed(content["action"])
embeddings.append(action_emb)
# Create unified embedding
unified_embedding = self.cross_modal_embedder.fuse(embeddings)
# Store memory
memory = MultiModalMemory(
id=generate_id(),
user_id=user_id,
modalities=modalities,
unified_embedding=unified_embedding,
memory_type="multimodal",
created_at=datetime.now()
)
return self.store_memory(memory)
def search_multimodal_memory(self, user_id, query):
"""Search across all modalities"""
# Determine query modality and embed accordingly
if self.is_text_query(query):
query_embedding = self.text_embedder.embed(query)
elif self.is_image_query(query):
query_embedding = self.image_embedder.embed(query)
elif self.is_action_query(query):
query_embedding = self.action_embedder.embed(query)
# Search using unified embeddings
matches = self.vector_search(user_id, query_embedding)
return matchesHow do I implement memory compression and archiving?
For long-running agents, memory compression becomes essential:
Hierarchical Memory Compression:
class MemoryArchiver:
def __init__(self):
self.compression_levels = {
"recent": 7, # Keep full detail for 1 week
"summary": 30, # Keep summaries for 1 month
"archive": 365 # Keep compressed archives for 1 year
}
def compress_old_memories(self, user_id):
"""Compress memories based on age and access patterns"""
now = datetime.now()
memories = self.get_all_memories(user_id)
for memory in memories:
age_days = (now - memory.created_at).days
if age_days > self.compression_levels["archive"]:
# Very old - consider deletion or heavy compression
self.archive_or_delete(memory)
elif age_days > self.compression_levels["summary"]:
# Old - compress to summary
if not memory.is_compressed:
self.compress_to_summary(memory)
elif age_days > self.compression_levels["recent"]:
# Moderate age - light compression
self.apply_light_compression(memory)
def compress_to_summary(self, memory):
"""Create a summary version of the memory"""
if memory.memory_type == "interaction":
# Summarize conversation
summary = self.llm_summarize(
memory.content,
max_tokens=100,
preserve_sentiment=True
)
compressed_memory = memory.copy(
content=summary,
is_compressed=True,
compression_ratio=len(summary) / len(memory.content)
)
elif memory.memory_type == "preference":
# Preferences don't compress well, keep as-is
return memory
else:
# Generic compression
compressed_memory = self.generic_compress(memory)
self.replace_memory(memory.id, compressed_memory)
def create_memory_digest(self, user_id, time_period="week"):
"""Create periodic digests of memory activity"""
if time_period == "week":
start_date = datetime.now() - timedelta(days=7)
elif time_period == "month":
start_date = datetime.now() - timedelta(days=30)
memories = self.get_memories_since(user_id, start_date)
# Group by themes
themes = self.cluster_memories_by_theme(memories)
# Create digest
digest = {
"period": time_period,
"start_date": start_date,
"end_date": datetime.now(),
"total_memories": len(memories),
"themes": []
}
for theme, theme_memories in themes.items():
theme_summary = {
"theme": theme,
"memory_count": len(theme_memories),
"key_insights": self.extract_insights(theme_memories),
"representative_memories": theme_memories[:3]
}
digest["themes"].append(theme_summary)
# Store digest as a special memory type
self.store_memory(user_id, {
"content": digest,
"memory_type": "digest",
"importance": 0.8
})
return digestFor implementation help, see our Implementation Guide. For troubleshooting specific issues, check the Troubleshooting FAQ.