Skip to Content
DocumentationImplementationDatabase Choices

Database Choices for Agent Memory

Selecting the right database technology is crucial for agent memory systems. This guide compares popular options, provides implementation examples, and offers decision frameworks based on your specific requirements.

Vector Databases

Vector databases are essential for semantic search and similarity-based memory retrieval in modern agent systems.

Pinecone

Pinecone is a managed vector database service that excels at scale and simplicity.

import pinecone from typing import List, Dict, Any, Optional import json from datetime import datetime class PineconeMemoryStore: """Production-ready Pinecone integration for agent memory""" def __init__(self, api_key: str, environment: str, index_name: str, dimension: int = 384): pinecone.init(api_key=api_key, environment=environment) self.index_name = index_name self.dimension = dimension # Create index if it doesn't exist if index_name not in pinecone.list_indexes(): pinecone.create_index( name=index_name, dimension=dimension, metric="cosine", metadata_config={ "indexed": ["memory_type", "importance", "timestamp"] } ) self.index = pinecone.Index(index_name) def store_memory(self, memory_id: str, embedding: List[float], memory_type: str, content: str, importance: float, metadata: Optional[Dict] = None) -> bool: """Store memory with vector embedding""" try: vector_metadata = { "memory_type": memory_type, "content": content, "importance": importance, "timestamp": datetime.now().isoformat(), **(metadata or {}) } self.index.upsert(vectors=[(memory_id, embedding, vector_metadata)]) return True except Exception as e: print(f"Failed to store memory {memory_id}: {e}") return False def search_memories(self, query_embedding: List[float], memory_type: Optional[str] = None, min_importance: float = 0.0, top_k: int = 10) -> List[Dict]: """Search for similar memories""" filter_dict = {} if memory_type: filter_dict["memory_type"] = {"$eq": memory_type} if min_importance > 0: filter_dict["importance"] = {"$gte": min_importance} try: results = self.index.query( vector=query_embedding, top_k=top_k, include_metadata=True, filter=filter_dict if filter_dict else None ) return [{ "id": match.id, "score": match.score, "content": match.metadata.get("content", ""), "memory_type": match.metadata.get("memory_type"), "importance": match.metadata.get("importance"), "timestamp": match.metadata.get("timestamp"), "metadata": {k: v for k, v in match.metadata.items() if k not in ["content", "memory_type", "importance", "timestamp"]} } for match in results.matches] except Exception as e: print(f"Failed to search memories: {e}") return [] def delete_memory(self, memory_id: str) -> bool: """Delete a specific memory""" try: self.index.delete(ids=[memory_id]) return True except Exception as e: print(f"Failed to delete memory {memory_id}: {e}") return False def get_stats(self) -> Dict: """Get index statistics""" try: return self.index.describe_index_stats() except Exception as e: print(f"Failed to get stats: {e}") return {} # Usage example """ store = PineconeMemoryStore( api_key="your-api-key", environment="your-environment", index_name="agent-memories", dimension=384 ) # Store a memory success = store.store_memory( memory_id="mem_001", embedding=[0.1, 0.2, ...], # 384-dimensional vector memory_type="episodic", content="User asked about Python decorators", importance=0.8, metadata={"topic": "programming", "language": "python"} ) # Search for memories results = store.search_memories( query_embedding=[0.1, 0.2, ...], memory_type="episodic", min_importance=0.5, top_k=5 ) """

Qdrant

Qdrant is an open-source vector database with excellent performance and flexibility.

from qdrant_client import QdrantClient from qdrant_client.http import models from qdrant_client.http.models import Distance, VectorParams, PointStruct from typing import List, Dict, Any, Optional import uuid class QdrantMemoryStore: """Qdrant vector database integration""" def __init__(self, host: str = "localhost", port: int = 6333, collection_name: str = "agent_memories", dimension: int = 384): self.client = QdrantClient(host=host, port=port) self.collection_name = collection_name self.dimension = dimension # Create collection if it doesn't exist try: self.client.get_collection(collection_name) except: self.client.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=dimension, distance=Distance.COSINE), ) def store_memory(self, memory_id: str, embedding: List[float], content: str, memory_type: str, importance: float, metadata: Optional[Dict] = None) -> bool: """Store memory with metadata""" try: point = PointStruct( id=memory_id, vector=embedding, payload={ "content": content, "memory_type": memory_type, "importance": importance, "timestamp": datetime.now().isoformat(), **(metadata or {}) } ) self.client.upsert( collection_name=self.collection_name, points=[point] ) return True except Exception as e: print(f"Failed to store memory: {e}") return False def search_memories(self, query_embedding: List[float], memory_type: Optional[str] = None, min_importance: float = 0.0, limit: int = 10) -> List[Dict]: """Search for similar memories with filtering""" # Build filter conditions filter_conditions = [] if memory_type: filter_conditions.append( models.FieldCondition( key="memory_type", match=models.MatchValue(value=memory_type) ) ) if min_importance > 0: filter_conditions.append( models.FieldCondition( key="importance", range=models.Range(gte=min_importance) ) ) filter_clause = None if filter_conditions: filter_clause = models.Filter(must=filter_conditions) try: results = self.client.search( collection_name=self.collection_name, query_vector=query_embedding, query_filter=filter_clause, limit=limit, with_payload=True ) return [{ "id": hit.id, "score": hit.score, "content": hit.payload.get("content"), "memory_type": hit.payload.get("memory_type"), "importance": hit.payload.get("importance"), "timestamp": hit.payload.get("timestamp"), "metadata": {k: v for k, v in hit.payload.items() if k not in ["content", "memory_type", "importance", "timestamp"]} } for hit in results] except Exception as e: print(f"Search failed: {e}") return [] def delete_memory(self, memory_id: str) -> bool: """Delete a specific memory""" try: self.client.delete( collection_name=self.collection_name, points_selector=models.PointIdsList( points=[memory_id] ) ) return True except Exception as e: print(f"Delete failed: {e}") return False def batch_store(self, memories: List[Dict]) -> bool: """Store multiple memories efficiently""" try: points = [] for memory in memories: point = PointStruct( id=memory["id"], vector=memory["embedding"], payload={ "content": memory["content"], "memory_type": memory["memory_type"], "importance": memory["importance"], "timestamp": memory.get("timestamp", datetime.now().isoformat()), **memory.get("metadata", {}) } ) points.append(point) self.client.upsert( collection_name=self.collection_name, points=points ) return True except Exception as e: print(f"Batch store failed: {e}") return False # Docker Compose for Qdrant """ version: '3.8' services: qdrant: image: qdrant/qdrant ports: - "6333:6333" volumes: - ./qdrant_data:/qdrant/storage environment: - QDRANT__SERVICE__HTTP_PORT=6333 """

Weaviate

Weaviate combines vector search with graph capabilities and automatic vectorization.

import weaviate from weaviate.classes.config import Configure, Property, DataType import json from typing import List, Dict, Optional, Any class WeaviateMemoryStore: """Weaviate vector database with automatic vectorization""" def __init__(self, url: str = "http://localhost:8080", api_key: Optional[str] = None, openai_api_key: Optional[str] = None): # Configure authentication auth_config = None if api_key: auth_config = weaviate.AuthApiKey(api_key=api_key) # Configure headers for OpenAI integration headers = {} if openai_api_key: headers["X-OpenAI-Api-Key"] = openai_api_key self.client = weaviate.Client( url=url, auth_client_secret=auth_config, additional_headers=headers ) self._setup_schema() def _setup_schema(self): """Setup the memory schema in Weaviate""" schema = { "class": "AgentMemory", "description": "Agent memory entries with automatic vectorization", "vectorizer": "text2vec-openai", "moduleConfig": { "text2vec-openai": { "model": "ada", "type": "text" } }, "properties": [ { "name": "content", "dataType": ["text"], "description": "The memory content", }, { "name": "memoryType", "dataType": ["string"], "description": "Type of memory (episodic, semantic, working, procedural)", }, { "name": "importance", "dataType": ["number"], "description": "Importance score (0.0 - 1.0)", }, { "name": "timestamp", "dataType": ["date"], "description": "When the memory was created", }, { "name": "tags", "dataType": ["string[]"], "description": "Tags associated with the memory", }, { "name": "metadata", "dataType": ["object"], "description": "Additional metadata", } ] } # Create schema if it doesn't exist if not self.client.schema.exists("AgentMemory"): self.client.schema.create_class(schema) def store_memory(self, memory_id: str, content: str, memory_type: str, importance: float, tags: Optional[List[str]] = None, metadata: Optional[Dict] = None) -> bool: """Store memory (vectorization happens automatically)""" try: self.client.data_object.create( { "content": content, "memoryType": memory_type, "importance": importance, "timestamp": datetime.now().isoformat(), "tags": tags or [], "metadata": metadata or {} }, "AgentMemory", uuid=memory_id ) return True except Exception as e: print(f"Failed to store memory: {e}") return False def search_memories(self, query: str, memory_type: Optional[str] = None, min_importance: float = 0.0, limit: int = 10) -> List[Dict]: """Search memories using natural language query""" # Build the where filter where_filter = {"path": ["importance"], "operator": "GreaterThanEqual", "valueNumber": min_importance} if memory_type: memory_type_filter = {"path": ["memoryType"], "operator": "Equal", "valueString": memory_type} where_filter = { "operator": "And", "operands": [where_filter, memory_type_filter] } try: result = ( self.client.query .get("AgentMemory", ["content", "memoryType", "importance", "timestamp", "tags", "metadata"]) .with_near_text({"concepts": [query]}) .with_where(where_filter) .with_limit(limit) .with_additional(["id", "certainty"]) .do() ) memories = [] if result.get("data", {}).get("Get", {}).get("AgentMemory"): for item in result["data"]["Get"]["AgentMemory"]: memories.append({ "id": item["_additional"]["id"], "score": item["_additional"]["certainty"], "content": item["content"], "memory_type": item["memoryType"], "importance": item["importance"], "timestamp": item["timestamp"], "tags": item["tags"], "metadata": item["metadata"] }) return memories except Exception as e: print(f"Search failed: {e}") return [] def search_by_concept(self, concepts: List[str], limit: int = 10) -> List[Dict]: """Search by multiple concepts""" try: result = ( self.client.query .get("AgentMemory", ["content", "memoryType", "importance", "timestamp"]) .with_near_text({"concepts": concepts}) .with_limit(limit) .with_additional(["id", "certainty"]) .do() ) return self._parse_search_results(result) except Exception as e: print(f"Concept search failed: {e}") return [] def delete_memory(self, memory_id: str) -> bool: """Delete a memory by ID""" try: self.client.data_object.delete(memory_id, "AgentMemory") return True except Exception as e: print(f"Delete failed: {e}") return False # Docker Compose for Weaviate """ version: '3.4' services: weaviate: command: - --host - 0.0.0.0 - --port - '8080' - --scheme - http image: weaviate/weaviate:1.21.8 ports: - 8080:8080 volumes: - weaviate_data:/var/lib/weaviate restart: on-failure:0 environment: QUERY_DEFAULTS_LIMIT: 25 AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' PERSISTENCE_DATA_PATH: '/var/lib/weaviate' DEFAULT_VECTORIZER_MODULE: 'text2vec-openai' ENABLE_MODULES: 'text2vec-openai' CLUSTER_HOSTNAME: 'node1' volumes: weaviate_data: """

Traditional Databases

PostgreSQL with pgvector

PostgreSQL with the pgvector extension provides vector capabilities in a familiar relational database.

-- Setup pgvector extension and tables CREATE EXTENSION vector; CREATE TABLE agent_memories ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), content TEXT NOT NULL, memory_type VARCHAR(20) NOT NULL CHECK (memory_type IN ('episodic', 'semantic', 'working', 'procedural')), importance REAL NOT NULL CHECK (importance >= 0 AND importance <= 1), embedding vector(384), -- 384-dimensional vectors timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(), tags TEXT[], metadata JSONB DEFAULT '{}' ); -- Create indexes for efficient search CREATE INDEX ON agent_memories USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); CREATE INDEX ON agent_memories (memory_type); CREATE INDEX ON agent_memories (importance); CREATE INDEX ON agent_memories (timestamp); CREATE INDEX ON agent_memories USING GIN (tags); CREATE INDEX ON agent_memories USING GIN (metadata); -- Full-text search setup ALTER TABLE agent_memories ADD COLUMN content_tsvector tsvector; CREATE INDEX ON agent_memories USING GIN (content_tsvector); CREATE OR REPLACE FUNCTION update_content_tsvector() RETURNS trigger AS $$ BEGIN NEW.content_tsvector := to_tsvector('english', NEW.content); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER tsvector_update_trigger BEFORE INSERT OR UPDATE ON agent_memories FOR EACH ROW EXECUTE FUNCTION update_content_tsvector();
import asyncpg import json from typing import List, Dict, Optional, Any import numpy as np from datetime import datetime class PostgreSQLMemoryStore: """PostgreSQL with pgvector for agent memory storage""" def __init__(self, connection_string: str): self.connection_string = connection_string self.pool: Optional[asyncpg.Pool] = None async def initialize(self): """Initialize connection pool""" self.pool = await asyncpg.create_pool( self.connection_string, min_size=5, max_size=20, command_timeout=60 ) async def store_memory(self, memory_id: str, content: str, memory_type: str, importance: float, embedding: List[float], tags: Optional[List[str]] = None, metadata: Optional[Dict] = None) -> bool: """Store memory with vector embedding""" try: async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO agent_memories (id, content, memory_type, importance, embedding, tags, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content, memory_type = EXCLUDED.memory_type, importance = EXCLUDED.importance, embedding = EXCLUDED.embedding, tags = EXCLUDED.tags, metadata = EXCLUDED.metadata, timestamp = NOW() """, memory_id, content, memory_type, importance, embedding, tags or [], json.dumps(metadata or {})) return True except Exception as e: print(f"Failed to store memory: {e}") return False async def search_by_vector(self, query_embedding: List[float], memory_type: Optional[str] = None, min_importance: float = 0.0, limit: int = 10) -> List[Dict]: """Vector similarity search""" conditions = ["importance >= $2"] params = [query_embedding, min_importance] param_count = 2 if memory_type: param_count += 1 conditions.append(f"memory_type = ${param_count}") params.append(memory_type) where_clause = " AND ".join(conditions) query = f""" SELECT id, content, memory_type, importance, timestamp, tags, metadata, 1 - (embedding <=> $1) as similarity FROM agent_memories WHERE {where_clause} ORDER BY embedding <=> $1 LIMIT ${param_count + 1} """ params.append(limit) try: async with self.pool.acquire() as conn: rows = await conn.fetch(query, *params) return [{ "id": str(row["id"]), "content": row["content"], "memory_type": row["memory_type"], "importance": row["importance"], "timestamp": row["timestamp"].isoformat(), "tags": row["tags"], "metadata": json.loads(row["metadata"]) if row["metadata"] else {}, "similarity": float(row["similarity"]) } for row in rows] except Exception as e: print(f"Vector search failed: {e}") return [] async def search_by_text(self, query: str, memory_type: Optional[str] = None, limit: int = 10) -> List[Dict]: """Full-text search with ranking""" conditions = ["content_tsvector @@ plainto_tsquery('english', $1)"] params = [query] param_count = 1 if memory_type: param_count += 1 conditions.append(f"memory_type = ${param_count}") params.append(memory_type) where_clause = " AND ".join(conditions) query_sql = f""" SELECT id, content, memory_type, importance, timestamp, tags, metadata, ts_rank(content_tsvector, plainto_tsquery('english', $1)) as rank FROM agent_memories WHERE {where_clause} ORDER BY rank DESC, importance DESC LIMIT ${param_count + 1} """ params.append(limit) try: async with self.pool.acquire() as conn: rows = await conn.fetch(query_sql, *params) return [{ "id": str(row["id"]), "content": row["content"], "memory_type": row["memory_type"], "importance": row["importance"], "timestamp": row["timestamp"].isoformat(), "tags": row["tags"], "metadata": json.loads(row["metadata"]) if row["metadata"] else {}, "rank": float(row["rank"]) } for row in rows] except Exception as e: print(f"Text search failed: {e}") return [] async def delete_memory(self, memory_id: str) -> bool: """Delete a memory by ID""" try: async with self.pool.acquire() as conn: result = await conn.execute( "DELETE FROM agent_memories WHERE id = $1", memory_id ) return result == "DELETE 1" except Exception as e: print(f"Delete failed: {e}") return False async def get_memory_stats(self) -> Dict[str, Any]: """Get database statistics""" try: async with self.pool.acquire() as conn: result = await conn.fetchrow(""" SELECT COUNT(*) as total_memories, COUNT(*) FILTER (WHERE memory_type = 'episodic') as episodic_count, COUNT(*) FILTER (WHERE memory_type = 'semantic') as semantic_count, COUNT(*) FILTER (WHERE memory_type = 'working') as working_count, COUNT(*) FILTER (WHERE memory_type = 'procedural') as procedural_count, AVG(importance) as avg_importance, MAX(timestamp) as latest_memory, MIN(timestamp) as earliest_memory FROM agent_memories """) return dict(result) if result else {} except Exception as e: print(f"Stats query failed: {e}") return {} # Example usage with connection pooling """ import asyncio async def main(): store = PostgreSQLMemoryStore("postgresql://user:pass@localhost/memorydb") await store.initialize() # Store a memory success = await store.store_memory( memory_id="mem_001", content="User asked about database choices for agents", memory_type="episodic", importance=0.8, embedding=[0.1, 0.2, ...], # 384-dimensional vector tags=["database", "agent-memory"], metadata={"session_id": "sess_123"} ) # Search by vector similarity similar = await store.search_by_vector( query_embedding=[0.1, 0.2, ...], memory_type="episodic", limit=5 ) # Search by text text_results = await store.search_by_text( query="database choices", limit=5 ) if __name__ == "__main__": asyncio.run(main()) """

MongoDB Atlas provides vector search capabilities suitable for agent memory systems.

from pymongo import MongoClient from pymongo.collection import Collection from datetime import datetime import json from typing import List, Dict, Optional, Any class MongoMemoryStore: """MongoDB Atlas with vector search for agent memory""" def __init__(self, connection_string: str, database_name: str = "agent_memory"): self.client = MongoClient(connection_string) self.db = self.client[database_name] self.collection: Collection = self.db.memories self._setup_indexes() def _setup_indexes(self): """Setup database indexes for optimal performance""" # Create compound indexes self.collection.create_index([ ("memory_type", 1), ("importance", -1), ("timestamp", -1) ]) self.collection.create_index([("timestamp", -1)]) self.collection.create_index([("tags", 1)]) self.collection.create_index([("metadata.session_id", 1)]) # Create text index for full-text search self.collection.create_index([("content", "text")]) # Vector search index (requires Atlas Search configuration) # This needs to be set up in MongoDB Atlas UI: """ { "mappings": { "fields": { "embedding": { "dimensions": 384, "similarity": "cosine", "type": "knnVector" }, "memory_type": { "type": "string" }, "importance": { "type": "number" } } } } """ def store_memory(self, memory_id: str, content: str, memory_type: str, importance: float, embedding: List[float], tags: Optional[List[str]] = None, metadata: Optional[Dict] = None) -> bool: """Store memory document""" try: document = { "_id": memory_id, "content": content, "memory_type": memory_type, "importance": importance, "embedding": embedding, "timestamp": datetime.now(), "tags": tags or [], "metadata": metadata or {} } self.collection.replace_one( {"_id": memory_id}, document, upsert=True ) return True except Exception as e: print(f"Failed to store memory: {e}") return False def search_by_vector(self, query_embedding: List[float], memory_type: Optional[str] = None, min_importance: float = 0.0, limit: int = 10) -> List[Dict]: """Vector similarity search using Atlas Search""" # Build the aggregation pipeline pipeline = [] # Vector search stage search_stage = { "$vectorSearch": { "index": "vector_index", # Name configured in Atlas "path": "embedding", "queryVector": query_embedding, "numCandidates": 100, "limit": limit } } # Add filters if specified if memory_type or min_importance > 0: filter_conditions = {} if memory_type: filter_conditions["memory_type"] = {"$eq": memory_type} if min_importance > 0: filter_conditions["importance"] = {"$gte": min_importance} search_stage["$vectorSearch"]["filter"] = filter_conditions pipeline.append(search_stage) # Add score projection pipeline.append({ "$addFields": { "score": {"$meta": "vectorSearchScore"} } }) try: results = list(self.collection.aggregate(pipeline)) return [{ "id": str(doc["_id"]), "content": doc["content"], "memory_type": doc["memory_type"], "importance": doc["importance"], "timestamp": doc["timestamp"].isoformat(), "tags": doc["tags"], "metadata": doc["metadata"], "score": doc.get("score", 0.0) } for doc in results] except Exception as e: print(f"Vector search failed: {e}") return [] def search_by_text(self, query: str, memory_type: Optional[str] = None, limit: int = 10) -> List[Dict]: """Full-text search""" search_filter = {"$text": {"$search": query}} if memory_type: search_filter["memory_type"] = memory_type try: results = self.collection.find( search_filter, {"score": {"$meta": "textScore"}} ).sort([("score", {"$meta": "textScore"})]).limit(limit) return [{ "id": str(doc["_id"]), "content": doc["content"], "memory_type": doc["memory_type"], "importance": doc["importance"], "timestamp": doc["timestamp"].isoformat(), "tags": doc["tags"], "metadata": doc["metadata"], "score": doc.get("score", 0.0) } for doc in results] except Exception as e: print(f"Text search failed: {e}") return [] def aggregate_memories(self, pipeline: List[Dict]) -> List[Dict]: """Run custom aggregation queries""" try: return list(self.collection.aggregate(pipeline)) except Exception as e: print(f"Aggregation failed: {e}") return [] def get_memory_analytics(self) -> Dict[str, Any]: """Get memory analytics using aggregation""" pipeline = [ { "$group": { "_id": "$memory_type", "count": {"$sum": 1}, "avg_importance": {"$avg": "$importance"}, "latest": {"$max": "$timestamp"} } }, { "$group": { "_id": None, "by_type": { "$push": { "type": "$_id", "count": "$count", "avg_importance": "$avg_importance", "latest": "$latest" } }, "total_memories": {"$sum": "$count"} } } ] try: result = list(self.collection.aggregate(pipeline)) return result[0] if result else {} except Exception as e: print(f"Analytics query failed: {e}") return {} def delete_memory(self, memory_id: str) -> bool: """Delete a memory""" try: result = self.collection.delete_one({"_id": memory_id}) return result.deleted_count == 1 except Exception as e: print(f"Delete failed: {e}") return False def cleanup_old_memories(self, days: int = 30, importance_threshold: float = 0.3) -> int: """Clean up old, low-importance memories""" cutoff_date = datetime.now() - timedelta(days=days) try: result = self.collection.delete_many({ "timestamp": {"$lt": cutoff_date}, "importance": {"$lt": importance_threshold} }) return result.deleted_count except Exception as e: print(f"Cleanup failed: {e}") return 0

Decision Framework

Choosing the Right Database

from enum import Enum from dataclasses import dataclass from typing import List, Optional class DatabaseType(Enum): VECTOR_MANAGED = "vector_managed" # Pinecone, etc. VECTOR_SELFHOST = "vector_selfhost" # Qdrant, Weaviate RELATIONAL = "relational" # PostgreSQL + pgvector DOCUMENT = "document" # MongoDB Atlas @dataclass class RequirementProfile: """Define your system requirements""" expected_scale: str # "small", "medium", "large", "enterprise" vector_search_importance: int # 1-5 scale budget_sensitivity: int # 1-5 scale (5 = very sensitive) operational_complexity_tolerance: int # 1-5 scale data_compliance_requirements: bool existing_infrastructure: Optional[str] = None team_expertise: List[str] = None # ["postgresql", "mongodb", "vector_db", etc.] class DatabaseRecommendationEngine: """Recommend database choice based on requirements""" def __init__(self): self.database_profiles = { DatabaseType.VECTOR_MANAGED: { "pros": [ "Managed service - no ops overhead", "Excellent vector search performance", "Built-in scaling", "Quick to get started" ], "cons": [ "Vendor lock-in", "Higher costs at scale", "Limited customization", "Potential compliance issues" ], "best_for": ["rapid_prototyping", "small_to_medium_scale", "vector_first"], "cost_profile": "medium_to_high", "complexity": 1 }, DatabaseType.VECTOR_SELFHOST: { "pros": [ "Full control and customization", "Cost effective at scale", "No vendor lock-in", "Advanced vector features" ], "cons": [ "Requires operational expertise", "Setup and maintenance overhead", "Scaling complexity" ], "best_for": ["medium_to_large_scale", "cost_optimization", "flexibility"], "cost_profile": "low_to_medium", "complexity": 3 }, DatabaseType.RELATIONAL: { "pros": [ "Familiar technology stack", "ACID compliance", "Rich query capabilities", "Mature ecosystem" ], "cons": [ "Vector search not native", "Complex for pure vector workloads", "May need hybrid approach" ], "best_for": ["existing_postgresql", "structured_data", "complex_queries"], "cost_profile": "low", "complexity": 2 }, DatabaseType.DOCUMENT: { "pros": [ "Flexible schema", "Rich aggregation framework", "Vector search available", "Horizontal scaling" ], "cons": [ "Vector search newer feature", "Atlas dependency for vector search", "Learning curve" ], "best_for": ["flexible_schema", "existing_mongodb", "analytics"], "cost_profile": "medium", "complexity": 2 } } def recommend(self, requirements: RequirementProfile) -> Dict[str, Any]: """Generate database recommendation based on requirements""" scores = {} for db_type, profile in self.database_profiles.items(): score = 0 reasoning = [] # Scale considerations if requirements.expected_scale == "small": if db_type == DatabaseType.VECTOR_MANAGED: score += 3 reasoning.append("Managed service ideal for small scale") elif requirements.expected_scale in ["large", "enterprise"]: if db_type in [DatabaseType.VECTOR_SELFHOST, DatabaseType.DOCUMENT]: score += 3 reasoning.append("Better scaling economics for large deployments") # Vector search importance if requirements.vector_search_importance >= 4: if db_type in [DatabaseType.VECTOR_MANAGED, DatabaseType.VECTOR_SELFHOST]: score += 4 reasoning.append("Native vector search capabilities essential") elif db_type == DatabaseType.RELATIONAL: score -= 1 reasoning.append("Vector search is extension, not native") # Budget sensitivity budget_impact = { DatabaseType.VECTOR_MANAGED: -requirements.budget_sensitivity, DatabaseType.VECTOR_SELFHOST: requirements.budget_sensitivity, DatabaseType.RELATIONAL: requirements.budget_sensitivity + 1, DatabaseType.DOCUMENT: 0 } score += budget_impact.get(db_type, 0) # Operational complexity complexity_penalty = profile["complexity"] * (5 - requirements.operational_complexity_tolerance) score -= complexity_penalty / 2 # Existing infrastructure bonus if requirements.existing_infrastructure: if (requirements.existing_infrastructure == "postgresql" and db_type == DatabaseType.RELATIONAL): score += 4 reasoning.append("Leverages existing PostgreSQL infrastructure") elif (requirements.existing_infrastructure == "mongodb" and db_type == DatabaseType.DOCUMENT): score += 4 reasoning.append("Leverages existing MongoDB infrastructure") # Team expertise bonus if requirements.team_expertise: expertise_bonus = { "postgresql": {DatabaseType.RELATIONAL: 3}, "mongodb": {DatabaseType.DOCUMENT: 3}, "vector_db": { DatabaseType.VECTOR_MANAGED: 2, DatabaseType.VECTOR_SELFHOST: 3 }, "devops": {DatabaseType.VECTOR_SELFHOST: 2} } for expertise in requirements.team_expertise: if expertise in expertise_bonus: for bonus_db, bonus_score in expertise_bonus[expertise].items(): if bonus_db == db_type: score += bonus_score reasoning.append(f"Team has {expertise} expertise") scores[db_type] = { "score": score, "reasoning": reasoning, "profile": profile } # Sort by score sorted_recommendations = sorted( scores.items(), key=lambda x: x[1]["score"], reverse=True ) return { "top_recommendation": sorted_recommendations[0], "all_options": sorted_recommendations, "requirements": requirements } # Usage Example """ requirements = RequirementProfile( expected_scale="medium", vector_search_importance=5, budget_sensitivity=3, operational_complexity_tolerance=2, data_compliance_requirements=True, existing_infrastructure="postgresql", team_expertise=["postgresql", "python"] ) engine = DatabaseRecommendationEngine() recommendation = engine.recommend(requirements) print(f"Top recommendation: {recommendation['top_recommendation'][0]}") print(f"Score: {recommendation['top_recommendation'][1]['score']}") print("Reasoning:") for reason in recommendation['top_recommendation'][1]['reasoning']: print(f" - {reason}") """

Migration and Hybrid Strategies

Database Migration Tools

import asyncio from typing import AsyncGenerator, Dict, Any import logging class MemoryMigrationTool: """Tool for migrating between different memory storage systems""" def __init__(self, source_store, target_store, batch_size: int = 100): self.source_store = source_store self.target_store = target_store self.batch_size = batch_size self.logger = logging.getLogger(__name__) async def migrate_all(self, filter_criteria: Optional[Dict] = None) -> Dict[str, Any]: """Migrate all memories with optional filtering""" migration_stats = { "total_processed": 0, "successful_migrations": 0, "failed_migrations": 0, "errors": [] } try: async for batch in self._get_memory_batches(filter_criteria): batch_results = await self._migrate_batch(batch) migration_stats["total_processed"] += len(batch) migration_stats["successful_migrations"] += batch_results["successful"] migration_stats["failed_migrations"] += batch_results["failed"] migration_stats["errors"].extend(batch_results["errors"]) self.logger.info(f"Processed batch: {len(batch)} memories") except Exception as e: self.logger.error(f"Migration failed: {e}") migration_stats["errors"].append(str(e)) return migration_stats async def _get_memory_batches(self, filter_criteria: Optional[Dict]) -> AsyncGenerator[List[Dict], None]: """Get memories in batches for migration""" offset = 0 while True: # This would need to be implemented based on source store type if hasattr(self.source_store, 'get_batch'): batch = await self.source_store.get_batch( offset=offset, limit=self.batch_size, filter_criteria=filter_criteria ) else: # Fallback for stores without batch support break if not batch: break yield batch offset += len(batch) async def _migrate_batch(self, batch: List[Dict]) -> Dict[str, Any]: """Migrate a batch of memories""" results = {"successful": 0, "failed": 0, "errors": []} for memory in batch: try: # Transform memory format if needed transformed_memory = self._transform_memory_format(memory) # Store in target system success = await self.target_store.store_memory(**transformed_memory) if success: results["successful"] += 1 else: results["failed"] += 1 results["errors"].append(f"Failed to store memory {memory.get('id', 'unknown')}") except Exception as e: results["failed"] += 1 results["errors"].append(f"Error processing memory {memory.get('id', 'unknown')}: {str(e)}") return results def _transform_memory_format(self, memory: Dict) -> Dict: """Transform memory format between different stores""" # Basic transformation - extend based on specific store requirements transformed = { "memory_id": memory.get("id") or memory.get("_id"), "content": memory.get("content"), "memory_type": memory.get("memory_type"), "importance": memory.get("importance"), "embedding": memory.get("embedding"), "tags": memory.get("tags"), "metadata": memory.get("metadata") } # Remove None values return {k: v for k, v in transformed.items() if v is not None} # Hybrid storage for gradual migration class HybridMemoryStore: """Use multiple stores simultaneously during migration""" def __init__(self, primary_store, secondary_store, write_to_both: bool = True, read_preference: str = "primary"): self.primary_store = primary_store self.secondary_store = secondary_store self.write_to_both = write_to_both self.read_preference = read_preference # "primary", "secondary", "both" async def store_memory(self, *args, **kwargs) -> bool: """Store memory in one or both stores""" primary_success = False secondary_success = False try: primary_success = await self.primary_store.store_memory(*args, **kwargs) except Exception as e: logging.error(f"Primary store failed: {e}") if self.write_to_both: try: secondary_success = await self.secondary_store.store_memory(*args, **kwargs) except Exception as e: logging.error(f"Secondary store failed: {e}") return primary_success or secondary_success return primary_success async def search_memories(self, *args, **kwargs) -> List[Dict]: """Search memories based on read preference""" if self.read_preference == "primary": try: return await self.primary_store.search_memories(*args, **kwargs) except Exception: return await self.secondary_store.search_memories(*args, **kwargs) elif self.read_preference == "secondary": try: return await self.secondary_store.search_memories(*args, **kwargs) except Exception: return await self.primary_store.search_memories(*args, **kwargs) else: # both primary_results = [] secondary_results = [] try: primary_results = await self.primary_store.search_memories(*args, **kwargs) except Exception as e: logging.error(f"Primary search failed: {e}") try: secondary_results = await self.secondary_store.search_memories(*args, **kwargs) except Exception as e: logging.error(f"Secondary search failed: {e}") # Merge and deduplicate results return self._merge_search_results(primary_results, secondary_results) def _merge_search_results(self, primary: List[Dict], secondary: List[Dict]) -> List[Dict]: """Merge and deduplicate search results""" seen_ids = set() merged = [] # Add primary results first (higher priority) for result in primary: memory_id = result.get("id") if memory_id and memory_id not in seen_ids: seen_ids.add(memory_id) merged.append(result) # Add secondary results if not already seen for result in secondary: memory_id = result.get("id") if memory_id and memory_id not in seen_ids: seen_ids.add(memory_id) merged.append(result) # Sort by score/relevance if available if merged and "score" in merged[0]: merged.sort(key=lambda x: x.get("score", 0), reverse=True) return merged

This comprehensive database guide provides practical implementations for all major database types suitable for agent memory systems, along with decision frameworks and migration strategies to help you choose and implement the right solution for your specific requirements.