Database Choices for Agent Memory
Selecting the right database technology is crucial for agent memory systems. This guide compares popular options, provides implementation examples, and offers decision frameworks based on your specific requirements.
Vector Databases
Vector databases are essential for semantic search and similarity-based memory retrieval in modern agent systems.
Pinecone
Pinecone is a managed vector database service that excels at scale and simplicity.
import pinecone
from typing import List, Dict, Any, Optional
import json
from datetime import datetime
class PineconeMemoryStore:
"""Production-ready Pinecone integration for agent memory"""
def __init__(self, api_key: str, environment: str, index_name: str, dimension: int = 384):
pinecone.init(api_key=api_key, environment=environment)
self.index_name = index_name
self.dimension = dimension
# Create index if it doesn't exist
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=dimension,
metric="cosine",
metadata_config={
"indexed": ["memory_type", "importance", "timestamp"]
}
)
self.index = pinecone.Index(index_name)
def store_memory(self, memory_id: str, embedding: List[float],
memory_type: str, content: str, importance: float,
metadata: Optional[Dict] = None) -> bool:
"""Store memory with vector embedding"""
try:
vector_metadata = {
"memory_type": memory_type,
"content": content,
"importance": importance,
"timestamp": datetime.now().isoformat(),
**(metadata or {})
}
self.index.upsert(vectors=[(memory_id, embedding, vector_metadata)])
return True
except Exception as e:
print(f"Failed to store memory {memory_id}: {e}")
return False
def search_memories(self, query_embedding: List[float],
memory_type: Optional[str] = None,
min_importance: float = 0.0,
top_k: int = 10) -> List[Dict]:
"""Search for similar memories"""
filter_dict = {}
if memory_type:
filter_dict["memory_type"] = {"$eq": memory_type}
if min_importance > 0:
filter_dict["importance"] = {"$gte": min_importance}
try:
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True,
filter=filter_dict if filter_dict else None
)
return [{
"id": match.id,
"score": match.score,
"content": match.metadata.get("content", ""),
"memory_type": match.metadata.get("memory_type"),
"importance": match.metadata.get("importance"),
"timestamp": match.metadata.get("timestamp"),
"metadata": {k: v for k, v in match.metadata.items()
if k not in ["content", "memory_type", "importance", "timestamp"]}
} for match in results.matches]
except Exception as e:
print(f"Failed to search memories: {e}")
return []
def delete_memory(self, memory_id: str) -> bool:
"""Delete a specific memory"""
try:
self.index.delete(ids=[memory_id])
return True
except Exception as e:
print(f"Failed to delete memory {memory_id}: {e}")
return False
def get_stats(self) -> Dict:
"""Get index statistics"""
try:
return self.index.describe_index_stats()
except Exception as e:
print(f"Failed to get stats: {e}")
return {}
# Usage example
"""
store = PineconeMemoryStore(
api_key="your-api-key",
environment="your-environment",
index_name="agent-memories",
dimension=384
)
# Store a memory
success = store.store_memory(
memory_id="mem_001",
embedding=[0.1, 0.2, ...], # 384-dimensional vector
memory_type="episodic",
content="User asked about Python decorators",
importance=0.8,
metadata={"topic": "programming", "language": "python"}
)
# Search for memories
results = store.search_memories(
query_embedding=[0.1, 0.2, ...],
memory_type="episodic",
min_importance=0.5,
top_k=5
)
"""Qdrant
Qdrant is an open-source vector database with excellent performance and flexibility.
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams, PointStruct
from typing import List, Dict, Any, Optional
import uuid
class QdrantMemoryStore:
"""Qdrant vector database integration"""
def __init__(self, host: str = "localhost", port: int = 6333,
collection_name: str = "agent_memories", dimension: int = 384):
self.client = QdrantClient(host=host, port=port)
self.collection_name = collection_name
self.dimension = dimension
# Create collection if it doesn't exist
try:
self.client.get_collection(collection_name)
except:
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=dimension, distance=Distance.COSINE),
)
def store_memory(self, memory_id: str, embedding: List[float],
content: str, memory_type: str, importance: float,
metadata: Optional[Dict] = None) -> bool:
"""Store memory with metadata"""
try:
point = PointStruct(
id=memory_id,
vector=embedding,
payload={
"content": content,
"memory_type": memory_type,
"importance": importance,
"timestamp": datetime.now().isoformat(),
**(metadata or {})
}
)
self.client.upsert(
collection_name=self.collection_name,
points=[point]
)
return True
except Exception as e:
print(f"Failed to store memory: {e}")
return False
def search_memories(self, query_embedding: List[float],
memory_type: Optional[str] = None,
min_importance: float = 0.0,
limit: int = 10) -> List[Dict]:
"""Search for similar memories with filtering"""
# Build filter conditions
filter_conditions = []
if memory_type:
filter_conditions.append(
models.FieldCondition(
key="memory_type",
match=models.MatchValue(value=memory_type)
)
)
if min_importance > 0:
filter_conditions.append(
models.FieldCondition(
key="importance",
range=models.Range(gte=min_importance)
)
)
filter_clause = None
if filter_conditions:
filter_clause = models.Filter(must=filter_conditions)
try:
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
query_filter=filter_clause,
limit=limit,
with_payload=True
)
return [{
"id": hit.id,
"score": hit.score,
"content": hit.payload.get("content"),
"memory_type": hit.payload.get("memory_type"),
"importance": hit.payload.get("importance"),
"timestamp": hit.payload.get("timestamp"),
"metadata": {k: v for k, v in hit.payload.items()
if k not in ["content", "memory_type", "importance", "timestamp"]}
} for hit in results]
except Exception as e:
print(f"Search failed: {e}")
return []
def delete_memory(self, memory_id: str) -> bool:
"""Delete a specific memory"""
try:
self.client.delete(
collection_name=self.collection_name,
points_selector=models.PointIdsList(
points=[memory_id]
)
)
return True
except Exception as e:
print(f"Delete failed: {e}")
return False
def batch_store(self, memories: List[Dict]) -> bool:
"""Store multiple memories efficiently"""
try:
points = []
for memory in memories:
point = PointStruct(
id=memory["id"],
vector=memory["embedding"],
payload={
"content": memory["content"],
"memory_type": memory["memory_type"],
"importance": memory["importance"],
"timestamp": memory.get("timestamp", datetime.now().isoformat()),
**memory.get("metadata", {})
}
)
points.append(point)
self.client.upsert(
collection_name=self.collection_name,
points=points
)
return True
except Exception as e:
print(f"Batch store failed: {e}")
return False
# Docker Compose for Qdrant
"""
version: '3.8'
services:
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
volumes:
- ./qdrant_data:/qdrant/storage
environment:
- QDRANT__SERVICE__HTTP_PORT=6333
"""Weaviate
Weaviate combines vector search with graph capabilities and automatic vectorization.
import weaviate
from weaviate.classes.config import Configure, Property, DataType
import json
from typing import List, Dict, Optional, Any
class WeaviateMemoryStore:
"""Weaviate vector database with automatic vectorization"""
def __init__(self, url: str = "http://localhost:8080",
api_key: Optional[str] = None,
openai_api_key: Optional[str] = None):
# Configure authentication
auth_config = None
if api_key:
auth_config = weaviate.AuthApiKey(api_key=api_key)
# Configure headers for OpenAI integration
headers = {}
if openai_api_key:
headers["X-OpenAI-Api-Key"] = openai_api_key
self.client = weaviate.Client(
url=url,
auth_client_secret=auth_config,
additional_headers=headers
)
self._setup_schema()
def _setup_schema(self):
"""Setup the memory schema in Weaviate"""
schema = {
"class": "AgentMemory",
"description": "Agent memory entries with automatic vectorization",
"vectorizer": "text2vec-openai",
"moduleConfig": {
"text2vec-openai": {
"model": "ada",
"type": "text"
}
},
"properties": [
{
"name": "content",
"dataType": ["text"],
"description": "The memory content",
},
{
"name": "memoryType",
"dataType": ["string"],
"description": "Type of memory (episodic, semantic, working, procedural)",
},
{
"name": "importance",
"dataType": ["number"],
"description": "Importance score (0.0 - 1.0)",
},
{
"name": "timestamp",
"dataType": ["date"],
"description": "When the memory was created",
},
{
"name": "tags",
"dataType": ["string[]"],
"description": "Tags associated with the memory",
},
{
"name": "metadata",
"dataType": ["object"],
"description": "Additional metadata",
}
]
}
# Create schema if it doesn't exist
if not self.client.schema.exists("AgentMemory"):
self.client.schema.create_class(schema)
def store_memory(self, memory_id: str, content: str, memory_type: str,
importance: float, tags: Optional[List[str]] = None,
metadata: Optional[Dict] = None) -> bool:
"""Store memory (vectorization happens automatically)"""
try:
self.client.data_object.create(
{
"content": content,
"memoryType": memory_type,
"importance": importance,
"timestamp": datetime.now().isoformat(),
"tags": tags or [],
"metadata": metadata or {}
},
"AgentMemory",
uuid=memory_id
)
return True
except Exception as e:
print(f"Failed to store memory: {e}")
return False
def search_memories(self, query: str, memory_type: Optional[str] = None,
min_importance: float = 0.0, limit: int = 10) -> List[Dict]:
"""Search memories using natural language query"""
# Build the where filter
where_filter = {"path": ["importance"], "operator": "GreaterThanEqual", "valueNumber": min_importance}
if memory_type:
memory_type_filter = {"path": ["memoryType"], "operator": "Equal", "valueString": memory_type}
where_filter = {
"operator": "And",
"operands": [where_filter, memory_type_filter]
}
try:
result = (
self.client.query
.get("AgentMemory", ["content", "memoryType", "importance", "timestamp", "tags", "metadata"])
.with_near_text({"concepts": [query]})
.with_where(where_filter)
.with_limit(limit)
.with_additional(["id", "certainty"])
.do()
)
memories = []
if result.get("data", {}).get("Get", {}).get("AgentMemory"):
for item in result["data"]["Get"]["AgentMemory"]:
memories.append({
"id": item["_additional"]["id"],
"score": item["_additional"]["certainty"],
"content": item["content"],
"memory_type": item["memoryType"],
"importance": item["importance"],
"timestamp": item["timestamp"],
"tags": item["tags"],
"metadata": item["metadata"]
})
return memories
except Exception as e:
print(f"Search failed: {e}")
return []
def search_by_concept(self, concepts: List[str], limit: int = 10) -> List[Dict]:
"""Search by multiple concepts"""
try:
result = (
self.client.query
.get("AgentMemory", ["content", "memoryType", "importance", "timestamp"])
.with_near_text({"concepts": concepts})
.with_limit(limit)
.with_additional(["id", "certainty"])
.do()
)
return self._parse_search_results(result)
except Exception as e:
print(f"Concept search failed: {e}")
return []
def delete_memory(self, memory_id: str) -> bool:
"""Delete a memory by ID"""
try:
self.client.data_object.delete(memory_id, "AgentMemory")
return True
except Exception as e:
print(f"Delete failed: {e}")
return False
# Docker Compose for Weaviate
"""
version: '3.4'
services:
weaviate:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
image: weaviate/weaviate:1.21.8
ports:
- 8080:8080
volumes:
- weaviate_data:/var/lib/weaviate
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'text2vec-openai'
ENABLE_MODULES: 'text2vec-openai'
CLUSTER_HOSTNAME: 'node1'
volumes:
weaviate_data:
"""Traditional Databases
PostgreSQL with pgvector
PostgreSQL with the pgvector extension provides vector capabilities in a familiar relational database.
-- Setup pgvector extension and tables
CREATE EXTENSION vector;
CREATE TABLE agent_memories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
memory_type VARCHAR(20) NOT NULL CHECK (memory_type IN ('episodic', 'semantic', 'working', 'procedural')),
importance REAL NOT NULL CHECK (importance >= 0 AND importance <= 1),
embedding vector(384), -- 384-dimensional vectors
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
tags TEXT[],
metadata JSONB DEFAULT '{}'
);
-- Create indexes for efficient search
CREATE INDEX ON agent_memories USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
CREATE INDEX ON agent_memories (memory_type);
CREATE INDEX ON agent_memories (importance);
CREATE INDEX ON agent_memories (timestamp);
CREATE INDEX ON agent_memories USING GIN (tags);
CREATE INDEX ON agent_memories USING GIN (metadata);
-- Full-text search setup
ALTER TABLE agent_memories ADD COLUMN content_tsvector tsvector;
CREATE INDEX ON agent_memories USING GIN (content_tsvector);
CREATE OR REPLACE FUNCTION update_content_tsvector() RETURNS trigger AS $$
BEGIN
NEW.content_tsvector := to_tsvector('english', NEW.content);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER tsvector_update_trigger
BEFORE INSERT OR UPDATE ON agent_memories
FOR EACH ROW EXECUTE FUNCTION update_content_tsvector();import asyncpg
import json
from typing import List, Dict, Optional, Any
import numpy as np
from datetime import datetime
class PostgreSQLMemoryStore:
"""PostgreSQL with pgvector for agent memory storage"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool: Optional[asyncpg.Pool] = None
async def initialize(self):
"""Initialize connection pool"""
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=5,
max_size=20,
command_timeout=60
)
async def store_memory(self, memory_id: str, content: str,
memory_type: str, importance: float,
embedding: List[float], tags: Optional[List[str]] = None,
metadata: Optional[Dict] = None) -> bool:
"""Store memory with vector embedding"""
try:
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO agent_memories
(id, content, memory_type, importance, embedding, tags, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id)
DO UPDATE SET
content = EXCLUDED.content,
memory_type = EXCLUDED.memory_type,
importance = EXCLUDED.importance,
embedding = EXCLUDED.embedding,
tags = EXCLUDED.tags,
metadata = EXCLUDED.metadata,
timestamp = NOW()
""", memory_id, content, memory_type, importance,
embedding, tags or [], json.dumps(metadata or {}))
return True
except Exception as e:
print(f"Failed to store memory: {e}")
return False
async def search_by_vector(self, query_embedding: List[float],
memory_type: Optional[str] = None,
min_importance: float = 0.0,
limit: int = 10) -> List[Dict]:
"""Vector similarity search"""
conditions = ["importance >= $2"]
params = [query_embedding, min_importance]
param_count = 2
if memory_type:
param_count += 1
conditions.append(f"memory_type = ${param_count}")
params.append(memory_type)
where_clause = " AND ".join(conditions)
query = f"""
SELECT id, content, memory_type, importance, timestamp, tags, metadata,
1 - (embedding <=> $1) as similarity
FROM agent_memories
WHERE {where_clause}
ORDER BY embedding <=> $1
LIMIT ${param_count + 1}
"""
params.append(limit)
try:
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [{
"id": str(row["id"]),
"content": row["content"],
"memory_type": row["memory_type"],
"importance": row["importance"],
"timestamp": row["timestamp"].isoformat(),
"tags": row["tags"],
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
"similarity": float(row["similarity"])
} for row in rows]
except Exception as e:
print(f"Vector search failed: {e}")
return []
async def search_by_text(self, query: str, memory_type: Optional[str] = None,
limit: int = 10) -> List[Dict]:
"""Full-text search with ranking"""
conditions = ["content_tsvector @@ plainto_tsquery('english', $1)"]
params = [query]
param_count = 1
if memory_type:
param_count += 1
conditions.append(f"memory_type = ${param_count}")
params.append(memory_type)
where_clause = " AND ".join(conditions)
query_sql = f"""
SELECT id, content, memory_type, importance, timestamp, tags, metadata,
ts_rank(content_tsvector, plainto_tsquery('english', $1)) as rank
FROM agent_memories
WHERE {where_clause}
ORDER BY rank DESC, importance DESC
LIMIT ${param_count + 1}
"""
params.append(limit)
try:
async with self.pool.acquire() as conn:
rows = await conn.fetch(query_sql, *params)
return [{
"id": str(row["id"]),
"content": row["content"],
"memory_type": row["memory_type"],
"importance": row["importance"],
"timestamp": row["timestamp"].isoformat(),
"tags": row["tags"],
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
"rank": float(row["rank"])
} for row in rows]
except Exception as e:
print(f"Text search failed: {e}")
return []
async def delete_memory(self, memory_id: str) -> bool:
"""Delete a memory by ID"""
try:
async with self.pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM agent_memories WHERE id = $1", memory_id
)
return result == "DELETE 1"
except Exception as e:
print(f"Delete failed: {e}")
return False
async def get_memory_stats(self) -> Dict[str, Any]:
"""Get database statistics"""
try:
async with self.pool.acquire() as conn:
result = await conn.fetchrow("""
SELECT
COUNT(*) as total_memories,
COUNT(*) FILTER (WHERE memory_type = 'episodic') as episodic_count,
COUNT(*) FILTER (WHERE memory_type = 'semantic') as semantic_count,
COUNT(*) FILTER (WHERE memory_type = 'working') as working_count,
COUNT(*) FILTER (WHERE memory_type = 'procedural') as procedural_count,
AVG(importance) as avg_importance,
MAX(timestamp) as latest_memory,
MIN(timestamp) as earliest_memory
FROM agent_memories
""")
return dict(result) if result else {}
except Exception as e:
print(f"Stats query failed: {e}")
return {}
# Example usage with connection pooling
"""
import asyncio
async def main():
store = PostgreSQLMemoryStore("postgresql://user:pass@localhost/memorydb")
await store.initialize()
# Store a memory
success = await store.store_memory(
memory_id="mem_001",
content="User asked about database choices for agents",
memory_type="episodic",
importance=0.8,
embedding=[0.1, 0.2, ...], # 384-dimensional vector
tags=["database", "agent-memory"],
metadata={"session_id": "sess_123"}
)
# Search by vector similarity
similar = await store.search_by_vector(
query_embedding=[0.1, 0.2, ...],
memory_type="episodic",
limit=5
)
# Search by text
text_results = await store.search_by_text(
query="database choices",
limit=5
)
if __name__ == "__main__":
asyncio.run(main())
"""MongoDB with Vector Search
MongoDB Atlas provides vector search capabilities suitable for agent memory systems.
from pymongo import MongoClient
from pymongo.collection import Collection
from datetime import datetime
import json
from typing import List, Dict, Optional, Any
class MongoMemoryStore:
"""MongoDB Atlas with vector search for agent memory"""
def __init__(self, connection_string: str, database_name: str = "agent_memory"):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
self.collection: Collection = self.db.memories
self._setup_indexes()
def _setup_indexes(self):
"""Setup database indexes for optimal performance"""
# Create compound indexes
self.collection.create_index([
("memory_type", 1),
("importance", -1),
("timestamp", -1)
])
self.collection.create_index([("timestamp", -1)])
self.collection.create_index([("tags", 1)])
self.collection.create_index([("metadata.session_id", 1)])
# Create text index for full-text search
self.collection.create_index([("content", "text")])
# Vector search index (requires Atlas Search configuration)
# This needs to be set up in MongoDB Atlas UI:
"""
{
"mappings": {
"fields": {
"embedding": {
"dimensions": 384,
"similarity": "cosine",
"type": "knnVector"
},
"memory_type": {
"type": "string"
},
"importance": {
"type": "number"
}
}
}
}
"""
def store_memory(self, memory_id: str, content: str, memory_type: str,
importance: float, embedding: List[float],
tags: Optional[List[str]] = None,
metadata: Optional[Dict] = None) -> bool:
"""Store memory document"""
try:
document = {
"_id": memory_id,
"content": content,
"memory_type": memory_type,
"importance": importance,
"embedding": embedding,
"timestamp": datetime.now(),
"tags": tags or [],
"metadata": metadata or {}
}
self.collection.replace_one(
{"_id": memory_id},
document,
upsert=True
)
return True
except Exception as e:
print(f"Failed to store memory: {e}")
return False
def search_by_vector(self, query_embedding: List[float],
memory_type: Optional[str] = None,
min_importance: float = 0.0,
limit: int = 10) -> List[Dict]:
"""Vector similarity search using Atlas Search"""
# Build the aggregation pipeline
pipeline = []
# Vector search stage
search_stage = {
"$vectorSearch": {
"index": "vector_index", # Name configured in Atlas
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": 100,
"limit": limit
}
}
# Add filters if specified
if memory_type or min_importance > 0:
filter_conditions = {}
if memory_type:
filter_conditions["memory_type"] = {"$eq": memory_type}
if min_importance > 0:
filter_conditions["importance"] = {"$gte": min_importance}
search_stage["$vectorSearch"]["filter"] = filter_conditions
pipeline.append(search_stage)
# Add score projection
pipeline.append({
"$addFields": {
"score": {"$meta": "vectorSearchScore"}
}
})
try:
results = list(self.collection.aggregate(pipeline))
return [{
"id": str(doc["_id"]),
"content": doc["content"],
"memory_type": doc["memory_type"],
"importance": doc["importance"],
"timestamp": doc["timestamp"].isoformat(),
"tags": doc["tags"],
"metadata": doc["metadata"],
"score": doc.get("score", 0.0)
} for doc in results]
except Exception as e:
print(f"Vector search failed: {e}")
return []
def search_by_text(self, query: str, memory_type: Optional[str] = None,
limit: int = 10) -> List[Dict]:
"""Full-text search"""
search_filter = {"$text": {"$search": query}}
if memory_type:
search_filter["memory_type"] = memory_type
try:
results = self.collection.find(
search_filter,
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})]).limit(limit)
return [{
"id": str(doc["_id"]),
"content": doc["content"],
"memory_type": doc["memory_type"],
"importance": doc["importance"],
"timestamp": doc["timestamp"].isoformat(),
"tags": doc["tags"],
"metadata": doc["metadata"],
"score": doc.get("score", 0.0)
} for doc in results]
except Exception as e:
print(f"Text search failed: {e}")
return []
def aggregate_memories(self, pipeline: List[Dict]) -> List[Dict]:
"""Run custom aggregation queries"""
try:
return list(self.collection.aggregate(pipeline))
except Exception as e:
print(f"Aggregation failed: {e}")
return []
def get_memory_analytics(self) -> Dict[str, Any]:
"""Get memory analytics using aggregation"""
pipeline = [
{
"$group": {
"_id": "$memory_type",
"count": {"$sum": 1},
"avg_importance": {"$avg": "$importance"},
"latest": {"$max": "$timestamp"}
}
},
{
"$group": {
"_id": None,
"by_type": {
"$push": {
"type": "$_id",
"count": "$count",
"avg_importance": "$avg_importance",
"latest": "$latest"
}
},
"total_memories": {"$sum": "$count"}
}
}
]
try:
result = list(self.collection.aggregate(pipeline))
return result[0] if result else {}
except Exception as e:
print(f"Analytics query failed: {e}")
return {}
def delete_memory(self, memory_id: str) -> bool:
"""Delete a memory"""
try:
result = self.collection.delete_one({"_id": memory_id})
return result.deleted_count == 1
except Exception as e:
print(f"Delete failed: {e}")
return False
def cleanup_old_memories(self, days: int = 30,
importance_threshold: float = 0.3) -> int:
"""Clean up old, low-importance memories"""
cutoff_date = datetime.now() - timedelta(days=days)
try:
result = self.collection.delete_many({
"timestamp": {"$lt": cutoff_date},
"importance": {"$lt": importance_threshold}
})
return result.deleted_count
except Exception as e:
print(f"Cleanup failed: {e}")
return 0Decision Framework
Choosing the Right Database
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional
class DatabaseType(Enum):
VECTOR_MANAGED = "vector_managed" # Pinecone, etc.
VECTOR_SELFHOST = "vector_selfhost" # Qdrant, Weaviate
RELATIONAL = "relational" # PostgreSQL + pgvector
DOCUMENT = "document" # MongoDB Atlas
@dataclass
class RequirementProfile:
"""Define your system requirements"""
expected_scale: str # "small", "medium", "large", "enterprise"
vector_search_importance: int # 1-5 scale
budget_sensitivity: int # 1-5 scale (5 = very sensitive)
operational_complexity_tolerance: int # 1-5 scale
data_compliance_requirements: bool
existing_infrastructure: Optional[str] = None
team_expertise: List[str] = None # ["postgresql", "mongodb", "vector_db", etc.]
class DatabaseRecommendationEngine:
"""Recommend database choice based on requirements"""
def __init__(self):
self.database_profiles = {
DatabaseType.VECTOR_MANAGED: {
"pros": [
"Managed service - no ops overhead",
"Excellent vector search performance",
"Built-in scaling",
"Quick to get started"
],
"cons": [
"Vendor lock-in",
"Higher costs at scale",
"Limited customization",
"Potential compliance issues"
],
"best_for": ["rapid_prototyping", "small_to_medium_scale", "vector_first"],
"cost_profile": "medium_to_high",
"complexity": 1
},
DatabaseType.VECTOR_SELFHOST: {
"pros": [
"Full control and customization",
"Cost effective at scale",
"No vendor lock-in",
"Advanced vector features"
],
"cons": [
"Requires operational expertise",
"Setup and maintenance overhead",
"Scaling complexity"
],
"best_for": ["medium_to_large_scale", "cost_optimization", "flexibility"],
"cost_profile": "low_to_medium",
"complexity": 3
},
DatabaseType.RELATIONAL: {
"pros": [
"Familiar technology stack",
"ACID compliance",
"Rich query capabilities",
"Mature ecosystem"
],
"cons": [
"Vector search not native",
"Complex for pure vector workloads",
"May need hybrid approach"
],
"best_for": ["existing_postgresql", "structured_data", "complex_queries"],
"cost_profile": "low",
"complexity": 2
},
DatabaseType.DOCUMENT: {
"pros": [
"Flexible schema",
"Rich aggregation framework",
"Vector search available",
"Horizontal scaling"
],
"cons": [
"Vector search newer feature",
"Atlas dependency for vector search",
"Learning curve"
],
"best_for": ["flexible_schema", "existing_mongodb", "analytics"],
"cost_profile": "medium",
"complexity": 2
}
}
def recommend(self, requirements: RequirementProfile) -> Dict[str, Any]:
"""Generate database recommendation based on requirements"""
scores = {}
for db_type, profile in self.database_profiles.items():
score = 0
reasoning = []
# Scale considerations
if requirements.expected_scale == "small":
if db_type == DatabaseType.VECTOR_MANAGED:
score += 3
reasoning.append("Managed service ideal for small scale")
elif requirements.expected_scale in ["large", "enterprise"]:
if db_type in [DatabaseType.VECTOR_SELFHOST, DatabaseType.DOCUMENT]:
score += 3
reasoning.append("Better scaling economics for large deployments")
# Vector search importance
if requirements.vector_search_importance >= 4:
if db_type in [DatabaseType.VECTOR_MANAGED, DatabaseType.VECTOR_SELFHOST]:
score += 4
reasoning.append("Native vector search capabilities essential")
elif db_type == DatabaseType.RELATIONAL:
score -= 1
reasoning.append("Vector search is extension, not native")
# Budget sensitivity
budget_impact = {
DatabaseType.VECTOR_MANAGED: -requirements.budget_sensitivity,
DatabaseType.VECTOR_SELFHOST: requirements.budget_sensitivity,
DatabaseType.RELATIONAL: requirements.budget_sensitivity + 1,
DatabaseType.DOCUMENT: 0
}
score += budget_impact.get(db_type, 0)
# Operational complexity
complexity_penalty = profile["complexity"] * (5 - requirements.operational_complexity_tolerance)
score -= complexity_penalty / 2
# Existing infrastructure bonus
if requirements.existing_infrastructure:
if (requirements.existing_infrastructure == "postgresql" and
db_type == DatabaseType.RELATIONAL):
score += 4
reasoning.append("Leverages existing PostgreSQL infrastructure")
elif (requirements.existing_infrastructure == "mongodb" and
db_type == DatabaseType.DOCUMENT):
score += 4
reasoning.append("Leverages existing MongoDB infrastructure")
# Team expertise bonus
if requirements.team_expertise:
expertise_bonus = {
"postgresql": {DatabaseType.RELATIONAL: 3},
"mongodb": {DatabaseType.DOCUMENT: 3},
"vector_db": {
DatabaseType.VECTOR_MANAGED: 2,
DatabaseType.VECTOR_SELFHOST: 3
},
"devops": {DatabaseType.VECTOR_SELFHOST: 2}
}
for expertise in requirements.team_expertise:
if expertise in expertise_bonus:
for bonus_db, bonus_score in expertise_bonus[expertise].items():
if bonus_db == db_type:
score += bonus_score
reasoning.append(f"Team has {expertise} expertise")
scores[db_type] = {
"score": score,
"reasoning": reasoning,
"profile": profile
}
# Sort by score
sorted_recommendations = sorted(
scores.items(),
key=lambda x: x[1]["score"],
reverse=True
)
return {
"top_recommendation": sorted_recommendations[0],
"all_options": sorted_recommendations,
"requirements": requirements
}
# Usage Example
"""
requirements = RequirementProfile(
expected_scale="medium",
vector_search_importance=5,
budget_sensitivity=3,
operational_complexity_tolerance=2,
data_compliance_requirements=True,
existing_infrastructure="postgresql",
team_expertise=["postgresql", "python"]
)
engine = DatabaseRecommendationEngine()
recommendation = engine.recommend(requirements)
print(f"Top recommendation: {recommendation['top_recommendation'][0]}")
print(f"Score: {recommendation['top_recommendation'][1]['score']}")
print("Reasoning:")
for reason in recommendation['top_recommendation'][1]['reasoning']:
print(f" - {reason}")
"""Migration and Hybrid Strategies
Database Migration Tools
import asyncio
from typing import AsyncGenerator, Dict, Any
import logging
class MemoryMigrationTool:
"""Tool for migrating between different memory storage systems"""
def __init__(self, source_store, target_store, batch_size: int = 100):
self.source_store = source_store
self.target_store = target_store
self.batch_size = batch_size
self.logger = logging.getLogger(__name__)
async def migrate_all(self, filter_criteria: Optional[Dict] = None) -> Dict[str, Any]:
"""Migrate all memories with optional filtering"""
migration_stats = {
"total_processed": 0,
"successful_migrations": 0,
"failed_migrations": 0,
"errors": []
}
try:
async for batch in self._get_memory_batches(filter_criteria):
batch_results = await self._migrate_batch(batch)
migration_stats["total_processed"] += len(batch)
migration_stats["successful_migrations"] += batch_results["successful"]
migration_stats["failed_migrations"] += batch_results["failed"]
migration_stats["errors"].extend(batch_results["errors"])
self.logger.info(f"Processed batch: {len(batch)} memories")
except Exception as e:
self.logger.error(f"Migration failed: {e}")
migration_stats["errors"].append(str(e))
return migration_stats
async def _get_memory_batches(self, filter_criteria: Optional[Dict]) -> AsyncGenerator[List[Dict], None]:
"""Get memories in batches for migration"""
offset = 0
while True:
# This would need to be implemented based on source store type
if hasattr(self.source_store, 'get_batch'):
batch = await self.source_store.get_batch(
offset=offset,
limit=self.batch_size,
filter_criteria=filter_criteria
)
else:
# Fallback for stores without batch support
break
if not batch:
break
yield batch
offset += len(batch)
async def _migrate_batch(self, batch: List[Dict]) -> Dict[str, Any]:
"""Migrate a batch of memories"""
results = {"successful": 0, "failed": 0, "errors": []}
for memory in batch:
try:
# Transform memory format if needed
transformed_memory = self._transform_memory_format(memory)
# Store in target system
success = await self.target_store.store_memory(**transformed_memory)
if success:
results["successful"] += 1
else:
results["failed"] += 1
results["errors"].append(f"Failed to store memory {memory.get('id', 'unknown')}")
except Exception as e:
results["failed"] += 1
results["errors"].append(f"Error processing memory {memory.get('id', 'unknown')}: {str(e)}")
return results
def _transform_memory_format(self, memory: Dict) -> Dict:
"""Transform memory format between different stores"""
# Basic transformation - extend based on specific store requirements
transformed = {
"memory_id": memory.get("id") or memory.get("_id"),
"content": memory.get("content"),
"memory_type": memory.get("memory_type"),
"importance": memory.get("importance"),
"embedding": memory.get("embedding"),
"tags": memory.get("tags"),
"metadata": memory.get("metadata")
}
# Remove None values
return {k: v for k, v in transformed.items() if v is not None}
# Hybrid storage for gradual migration
class HybridMemoryStore:
"""Use multiple stores simultaneously during migration"""
def __init__(self, primary_store, secondary_store,
write_to_both: bool = True, read_preference: str = "primary"):
self.primary_store = primary_store
self.secondary_store = secondary_store
self.write_to_both = write_to_both
self.read_preference = read_preference # "primary", "secondary", "both"
async def store_memory(self, *args, **kwargs) -> bool:
"""Store memory in one or both stores"""
primary_success = False
secondary_success = False
try:
primary_success = await self.primary_store.store_memory(*args, **kwargs)
except Exception as e:
logging.error(f"Primary store failed: {e}")
if self.write_to_both:
try:
secondary_success = await self.secondary_store.store_memory(*args, **kwargs)
except Exception as e:
logging.error(f"Secondary store failed: {e}")
return primary_success or secondary_success
return primary_success
async def search_memories(self, *args, **kwargs) -> List[Dict]:
"""Search memories based on read preference"""
if self.read_preference == "primary":
try:
return await self.primary_store.search_memories(*args, **kwargs)
except Exception:
return await self.secondary_store.search_memories(*args, **kwargs)
elif self.read_preference == "secondary":
try:
return await self.secondary_store.search_memories(*args, **kwargs)
except Exception:
return await self.primary_store.search_memories(*args, **kwargs)
else: # both
primary_results = []
secondary_results = []
try:
primary_results = await self.primary_store.search_memories(*args, **kwargs)
except Exception as e:
logging.error(f"Primary search failed: {e}")
try:
secondary_results = await self.secondary_store.search_memories(*args, **kwargs)
except Exception as e:
logging.error(f"Secondary search failed: {e}")
# Merge and deduplicate results
return self._merge_search_results(primary_results, secondary_results)
def _merge_search_results(self, primary: List[Dict], secondary: List[Dict]) -> List[Dict]:
"""Merge and deduplicate search results"""
seen_ids = set()
merged = []
# Add primary results first (higher priority)
for result in primary:
memory_id = result.get("id")
if memory_id and memory_id not in seen_ids:
seen_ids.add(memory_id)
merged.append(result)
# Add secondary results if not already seen
for result in secondary:
memory_id = result.get("id")
if memory_id and memory_id not in seen_ids:
seen_ids.add(memory_id)
merged.append(result)
# Sort by score/relevance if available
if merged and "score" in merged[0]:
merged.sort(key=lambda x: x.get("score", 0), reverse=True)
return mergedThis comprehensive database guide provides practical implementations for all major database types suitable for agent memory systems, along with decision frameworks and migration strategies to help you choose and implement the right solution for your specific requirements.