Skip to Content

Python Memory Implementation

Building production-ready agent memory systems in Python requires careful consideration of data structures, persistence layers, and integration patterns. This guide provides hands-on implementation examples using popular Python libraries and frameworks.

Core Memory Classes

Basic Memory Structure

from abc import ABC, abstractmethod from typing import Dict, List, Optional, Any, Union from datetime import datetime import uuid import json from dataclasses import dataclass, asdict from enum import Enum class MemoryType(Enum): EPISODIC = "episodic" SEMANTIC = "semantic" WORKING = "working" PROCEDURAL = "procedural" @dataclass class MemoryEntry: """Core memory entry structure""" id: str content: str memory_type: MemoryType timestamp: datetime importance: float metadata: Dict[str, Any] embedding: Optional[List[float]] = None tags: Optional[List[str]] = None def __post_init__(self): if not self.id: self.id = str(uuid.uuid4()) if self.metadata is None: self.metadata = {} if self.tags is None: self.tags = [] class MemoryStore(ABC): """Abstract base class for memory storage""" @abstractmethod def store(self, entry: MemoryEntry) -> str: """Store a memory entry and return its ID""" pass @abstractmethod def retrieve(self, memory_id: str) -> Optional[MemoryEntry]: """Retrieve a memory by ID""" pass @abstractmethod def search(self, query: str, memory_type: Optional[MemoryType] = None, limit: int = 10) -> List[MemoryEntry]: """Search memories by content""" pass @abstractmethod def delete(self, memory_id: str) -> bool: """Delete a memory entry""" pass

In-Memory Implementation

from collections import defaultdict import threading from typing import Set class InMemoryStore(MemoryStore): """Thread-safe in-memory memory store for development and testing""" def __init__(self): self._memories: Dict[str, MemoryEntry] = {} self._type_index: Dict[MemoryType, Set[str]] = defaultdict(set) self._tag_index: Dict[str, Set[str]] = defaultdict(set) self._lock = threading.RLock() def store(self, entry: MemoryEntry) -> str: with self._lock: self._memories[entry.id] = entry self._type_index[entry.memory_type].add(entry.id) for tag in entry.tags or []: self._tag_index[tag].add(entry.id) return entry.id def retrieve(self, memory_id: str) -> Optional[MemoryEntry]: with self._lock: return self._memories.get(memory_id) def search(self, query: str, memory_type: Optional[MemoryType] = None, limit: int = 10) -> List[MemoryEntry]: with self._lock: candidates = set(self._memories.keys()) if memory_type: candidates &= self._type_index[memory_type] # Simple text matching (replace with vector search in production) matches = [] query_lower = query.lower() for memory_id in candidates: memory = self._memories[memory_id] if query_lower in memory.content.lower(): matches.append((memory, memory.importance)) # Sort by importance and limit results matches.sort(key=lambda x: x[1], reverse=True) return [match[0] for match in matches[:limit]] def delete(self, memory_id: str) -> bool: with self._lock: if memory_id not in self._memories: return False memory = self._memories[memory_id] del self._memories[memory_id] self._type_index[memory.memory_type].discard(memory_id) for tag in memory.tags or []: self._tag_index[tag].discard(memory_id) return True def get_by_type(self, memory_type: MemoryType, limit: int = 50) -> List[MemoryEntry]: """Get memories by type, sorted by timestamp""" with self._lock: memory_ids = self._type_index[memory_type] memories = [self._memories[mid] for mid in memory_ids] return sorted(memories, key=lambda m: m.timestamp, reverse=True)[:limit]

Vector Search Integration

Using Sentence Transformers

from sentence_transformers import SentenceTransformer import numpy as np from sklearn.metrics.pairwise import cosine_similarity import pickle import os class VectorMemoryStore(MemoryStore): """Memory store with vector similarity search""" def __init__(self, model_name: str = "all-MiniLM-L6-v2", persistence_path: Optional[str] = None): self.model = SentenceTransformer(model_name) self.persistence_path = persistence_path self._memories: Dict[str, MemoryEntry] = {} self._embeddings: Dict[str, np.ndarray] = {} self._lock = threading.RLock() if persistence_path and os.path.exists(persistence_path): self._load_from_disk() def _generate_embedding(self, text: str) -> np.ndarray: """Generate embedding for text""" return self.model.encode(text, convert_to_numpy=True) def store(self, entry: MemoryEntry) -> str: with self._lock: # Generate embedding if not provided if entry.embedding is None: embedding = self._generate_embedding(entry.content) entry.embedding = embedding.tolist() else: embedding = np.array(entry.embedding) self._memories[entry.id] = entry self._embeddings[entry.id] = embedding if self.persistence_path: self._save_to_disk() return entry.id def retrieve(self, memory_id: str) -> Optional[MemoryEntry]: with self._lock: return self._memories.get(memory_id) def search(self, query: str, memory_type: Optional[MemoryType] = None, limit: int = 10, similarity_threshold: float = 0.3) -> List[MemoryEntry]: with self._lock: if not self._embeddings: return [] query_embedding = self._generate_embedding(query) similarities = [] for memory_id, embedding in self._embeddings.items(): memory = self._memories[memory_id] # Filter by type if specified if memory_type and memory.memory_type != memory_type: continue # Calculate similarity similarity = cosine_similarity( query_embedding.reshape(1, -1), embedding.reshape(1, -1) )[0][0] if similarity >= similarity_threshold: similarities.append((memory, similarity)) # Sort by similarity and return top results similarities.sort(key=lambda x: x[1], reverse=True) return [sim[0] for sim in similarities[:limit]] def delete(self, memory_id: str) -> bool: with self._lock: if memory_id not in self._memories: return False del self._memories[memory_id] del self._embeddings[memory_id] if self.persistence_path: self._save_to_disk() return True def _save_to_disk(self): """Save memories and embeddings to disk""" data = { 'memories': {k: asdict(v) for k, v in self._memories.items()}, 'embeddings': self._embeddings } with open(self.persistence_path, 'wb') as f: pickle.dump(data, f) def _load_from_disk(self): """Load memories and embeddings from disk""" with open(self.persistence_path, 'rb') as f: data = pickle.load(f) self._memories = { k: MemoryEntry(**v) for k, v in data['memories'].items() } self._embeddings = data['embeddings']

Agent Memory Manager

Complete Memory Management System

from typing import Callable, Optional import logging from datetime import datetime, timedelta class MemoryManager: """High-level memory management for agents""" def __init__(self, store: MemoryStore, importance_threshold: float = 0.5, max_working_memory: int = 20, cleanup_interval: timedelta = timedelta(hours=24)): self.store = store self.importance_threshold = importance_threshold self.max_working_memory = max_working_memory self.cleanup_interval = cleanup_interval self.last_cleanup = datetime.now() self.logger = logging.getLogger(__name__) def remember(self, content: str, memory_type: MemoryType, importance: float = 0.5, metadata: Optional[Dict] = None, tags: Optional[List[str]] = None) -> str: """Store a new memory""" entry = MemoryEntry( id="", # Will be generated content=content, memory_type=memory_type, timestamp=datetime.now(), importance=importance, metadata=metadata or {}, tags=tags ) memory_id = self.store.store(entry) self.logger.info(f"Stored memory {memory_id} of type {memory_type}") # Trigger cleanup if needed if datetime.now() - self.last_cleanup > self.cleanup_interval: self._cleanup_memories() return memory_id def recall(self, query: str, memory_types: Optional[List[MemoryType]] = None, limit: int = 10) -> List[MemoryEntry]: """Recall memories matching the query""" if memory_types: all_results = [] for memory_type in memory_types: results = self.store.search(query, memory_type, limit) all_results.extend(results) # Remove duplicates and sort by importance seen = set() unique_results = [] for memory in all_results: if memory.id not in seen: seen.add(memory.id) unique_results.append(memory) return sorted(unique_results, key=lambda m: m.importance, reverse=True)[:limit] else: return self.store.search(query, limit=limit) def get_working_memory(self) -> List[MemoryEntry]: """Get current working memory (recent high-importance memories)""" if hasattr(self.store, 'get_by_type'): working_memories = self.store.get_by_type(MemoryType.WORKING, self.max_working_memory) else: # Fallback for stores without type indexing working_memories = self.store.search("", MemoryType.WORKING, self.max_working_memory) return working_memories def consolidate_memories(self, consolidation_fn: Callable[[List[MemoryEntry]], str]): """Consolidate related memories into semantic memory""" # Get recent episodic memories if hasattr(self.store, 'get_by_type'): episodic_memories = self.store.get_by_type(MemoryType.EPISODIC, 100) else: episodic_memories = self.store.search("", MemoryType.EPISODIC, 100) if len(episodic_memories) < 5: return # Group memories by similarity (simplified grouping) # In production, use more sophisticated clustering memory_groups = self._group_similar_memories(episodic_memories) for group in memory_groups: if len(group) >= 3: # Consolidate groups of 3+ related memories consolidated_content = consolidation_fn(group) # Create semantic memory self.remember( content=consolidated_content, memory_type=MemoryType.SEMANTIC, importance=max(m.importance for m in group), metadata={ 'consolidated_from': [m.id for m in group], 'consolidation_date': datetime.now().isoformat() } ) # Remove original episodic memories for memory in group: self.store.delete(memory.id) def _group_similar_memories(self, memories: List[MemoryEntry]) -> List[List[MemoryEntry]]: """Group similar memories for consolidation""" # Simplified grouping by content similarity # In production, use clustering algorithms groups = [] used_memories = set() for memory in memories: if memory.id in used_memories: continue group = [memory] used_memories.add(memory.id) # Find similar memories for other_memory in memories: if other_memory.id in used_memories: continue # Simple similarity check (replace with proper similarity) if self._are_similar(memory.content, other_memory.content): group.append(other_memory) used_memories.add(other_memory.id) if len(group) >= 2: groups.append(group) return groups def _are_similar(self, content1: str, content2: str) -> bool: """Check if two memory contents are similar""" # Simplified similarity check words1 = set(content1.lower().split()) words2 = set(content2.lower().split()) intersection = len(words1 & words2) union = len(words1 | words2) return union > 0 and intersection / union > 0.3 def _cleanup_memories(self): """Clean up old, low-importance memories""" self.logger.info("Starting memory cleanup") cutoff_date = datetime.now() - timedelta(days=30) # This is a simplified cleanup - implement proper cleanup logic # based on your specific needs self.last_cleanup = datetime.now() self.logger.info("Memory cleanup completed")

Integration Examples

LangChain Integration

from langchain.memory import BaseMemory from langchain.schema import BaseMessage from typing import Dict, List, Any class AgentMemoryLangChain(BaseMemory): """LangChain memory integration""" def __init__(self, memory_manager: MemoryManager): self.memory_manager = memory_manager self.memory_key = "agent_memory" @property def memory_variables(self) -> List[str]: return [self.memory_key] def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """Load relevant memories based on current input""" query = inputs.get("input", "") # Get relevant memories memories = self.memory_manager.recall(query, limit=5) working_memory = self.memory_manager.get_working_memory() # Format memories for the prompt memory_text = self._format_memories(memories + working_memory) return {self.memory_key: memory_text} def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]): """Save the conversation context to memory""" input_text = inputs.get("input", "") output_text = outputs.get("output", "") # Store the interaction interaction = f"Human: {input_text}\nAssistant: {output_text}" self.memory_manager.remember( content=interaction, memory_type=MemoryType.EPISODIC, importance=0.7, metadata={ "interaction_type": "conversation", "input": input_text, "output": output_text } ) def clear(self): """Clear working memory""" # Implementation depends on your cleanup strategy pass def _format_memories(self, memories: List[MemoryEntry]) -> str: """Format memories for inclusion in prompts""" if not memories: return "No relevant memories found." formatted = ["Relevant memories:"] for memory in memories: timestamp = memory.timestamp.strftime("%Y-%m-%d %H:%M") formatted.append(f"[{timestamp}] {memory.content}") return "\n".join(formatted)

FastAPI Integration

from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from typing import List, Optional import asyncio from datetime import datetime app = FastAPI(title="Agent Memory API") # Global memory manager instance memory_manager: Optional[MemoryManager] = None class MemoryRequest(BaseModel): content: str memory_type: str importance: float = 0.5 metadata: Optional[Dict[str, Any]] = None tags: Optional[List[str]] = None class MemoryResponse(BaseModel): id: str content: str memory_type: str timestamp: datetime importance: float metadata: Dict[str, Any] tags: List[str] class SearchRequest(BaseModel): query: str memory_types: Optional[List[str]] = None limit: int = 10 @app.on_event("startup") async def startup_event(): global memory_manager # Initialize with vector store for production store = VectorMemoryStore(persistence_path="memories.pkl") memory_manager = MemoryManager(store) @app.post("/memories/", response_model=str) async def create_memory(request: MemoryRequest): """Store a new memory""" try: memory_type = MemoryType(request.memory_type) except ValueError: raise HTTPException(status_code=400, detail="Invalid memory type") memory_id = memory_manager.remember( content=request.content, memory_type=memory_type, importance=request.importance, metadata=request.metadata, tags=request.tags ) return memory_id @app.get("/memories/{memory_id}", response_model=MemoryResponse) async def get_memory(memory_id: str): """Retrieve a specific memory""" memory = memory_manager.store.retrieve(memory_id) if not memory: raise HTTPException(status_code=404, detail="Memory not found") return MemoryResponse( id=memory.id, content=memory.content, memory_type=memory.memory_type.value, timestamp=memory.timestamp, importance=memory.importance, metadata=memory.metadata, tags=memory.tags or [] ) @app.post("/memories/search", response_model=List[MemoryResponse]) async def search_memories(request: SearchRequest): """Search for memories""" memory_types = None if request.memory_types: try: memory_types = [MemoryType(mt) for mt in request.memory_types] except ValueError: raise HTTPException(status_code=400, detail="Invalid memory type") memories = memory_manager.recall( query=request.query, memory_types=memory_types, limit=request.limit ) return [ MemoryResponse( id=memory.id, content=memory.content, memory_type=memory.memory_type.value, timestamp=memory.timestamp, importance=memory.importance, metadata=memory.metadata, tags=memory.tags or [] ) for memory in memories ] @app.delete("/memories/{memory_id}") async def delete_memory(memory_id: str): """Delete a memory""" success = memory_manager.store.delete(memory_id) if not success: raise HTTPException(status_code=404, detail="Memory not found") return {"message": "Memory deleted successfully"}

Testing Strategies

Unit Tests

import unittest from unittest.mock import patch, MagicMock import tempfile import os class TestMemoryStore(unittest.TestCase): def setUp(self): self.store = InMemoryStore() self.sample_entry = MemoryEntry( id="test-id", content="Test memory content", memory_type=MemoryType.EPISODIC, timestamp=datetime.now(), importance=0.8, metadata={"test": True} ) def test_store_and_retrieve(self): """Test basic store and retrieve functionality""" memory_id = self.store.store(self.sample_entry) self.assertEqual(memory_id, self.sample_entry.id) retrieved = self.store.retrieve(memory_id) self.assertIsNotNone(retrieved) self.assertEqual(retrieved.content, self.sample_entry.content) def test_search_functionality(self): """Test search capabilities""" self.store.store(self.sample_entry) results = self.store.search("Test memory") self.assertEqual(len(results), 1) self.assertEqual(results[0].id, self.sample_entry.id) # Test empty results empty_results = self.store.search("nonexistent") self.assertEqual(len(empty_results), 0) def test_delete_functionality(self): """Test memory deletion""" memory_id = self.store.store(self.sample_entry) success = self.store.delete(memory_id) self.assertTrue(success) retrieved = self.store.retrieve(memory_id) self.assertIsNone(retrieved) class TestVectorMemoryStore(unittest.TestCase): def setUp(self): # Create temporary file for testing persistence self.temp_file = tempfile.NamedTemporaryFile(delete=False) self.temp_file.close() self.store = VectorMemoryStore(persistence_path=self.temp_file.name) def tearDown(self): os.unlink(self.temp_file.name) def test_embedding_generation(self): """Test that embeddings are generated for stored memories""" entry = MemoryEntry( id="test-embed", content="This is a test for embedding generation", memory_type=MemoryType.SEMANTIC, timestamp=datetime.now(), importance=0.5, metadata={} ) memory_id = self.store.store(entry) retrieved = self.store.retrieve(memory_id) self.assertIsNotNone(retrieved.embedding) self.assertIsInstance(retrieved.embedding, list) self.assertGreater(len(retrieved.embedding), 0) def test_similarity_search(self): """Test vector similarity search""" # Store related memories entry1 = MemoryEntry( id="sim1", content="Python programming language", memory_type=MemoryType.SEMANTIC, timestamp=datetime.now(), importance=0.7, metadata={} ) entry2 = MemoryEntry( id="sim2", content="Cooking pasta recipes", memory_type=MemoryType.SEMANTIC, timestamp=datetime.now(), importance=0.6, metadata={} ) self.store.store(entry1) self.store.store(entry2) # Search for programming-related content results = self.store.search("coding in Python", limit=5) # Should find the programming-related memory self.assertGreater(len(results), 0) self.assertIn("Python", results[0].content) if __name__ == "__main__": unittest.main()

Integration Tests

import pytest import asyncio from httpx import AsyncClient from fastapi.testclient import TestClient @pytest.fixture def test_client(): """Create test client with in-memory store""" global memory_manager store = InMemoryStore() memory_manager = MemoryManager(store) return TestClient(app) def test_memory_lifecycle(test_client): """Test complete memory lifecycle via API""" # Create memory create_response = test_client.post("/memories/", json={ "content": "Test memory for API", "memory_type": "episodic", "importance": 0.8, "tags": ["test", "api"] }) assert create_response.status_code == 200 memory_id = create_response.json() # Retrieve memory get_response = test_client.get(f"/memories/{memory_id}") assert get_response.status_code == 200 memory_data = get_response.json() assert memory_data["content"] == "Test memory for API" assert memory_data["memory_type"] == "episodic" # Search for memory search_response = test_client.post("/memories/search", json={ "query": "Test memory", "limit": 10 }) assert search_response.status_code == 200 search_results = search_response.json() assert len(search_results) >= 1 # Delete memory delete_response = test_client.delete(f"/memories/{memory_id}") assert delete_response.status_code == 200 # Verify deletion get_after_delete = test_client.get(f"/memories/{memory_id}") assert get_after_delete.status_code == 404

Production Considerations

Performance Optimization

import asyncio from concurrent.futures import ThreadPoolExecutor from typing import AsyncGenerator class AsyncMemoryStore(MemoryStore): """Async wrapper for memory store operations""" def __init__(self, sync_store: MemoryStore, max_workers: int = 4): self.sync_store = sync_store self.executor = ThreadPoolExecutor(max_workers=max_workers) async def store(self, entry: MemoryEntry) -> str: loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, self.sync_store.store, entry ) async def retrieve(self, memory_id: str) -> Optional[MemoryEntry]: loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, self.sync_store.retrieve, memory_id ) async def search(self, query: str, memory_type: Optional[MemoryType] = None, limit: int = 10) -> List[MemoryEntry]: loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, self.sync_store.search, query, memory_type, limit ) # Batch processing for efficiency class BatchMemoryProcessor: """Process memories in batches for better performance""" def __init__(self, store: MemoryStore, batch_size: int = 100): self.store = store self.batch_size = batch_size self.pending_entries: List[MemoryEntry] = [] self.processing_lock = asyncio.Lock() async def add_entry(self, entry: MemoryEntry): async with self.processing_lock: self.pending_entries.append(entry) if len(self.pending_entries) >= self.batch_size: await self._process_batch() async def _process_batch(self): """Process accumulated entries""" if not self.pending_entries: return batch = self.pending_entries.copy() self.pending_entries.clear() # Process batch in parallel tasks = [ asyncio.create_task(self._store_single(entry)) for entry in batch ] await asyncio.gather(*tasks) async def _store_single(self, entry: MemoryEntry): if isinstance(self.store, AsyncMemoryStore): await self.store.store(entry) else: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.store.store, entry) async def flush(self): """Process any remaining entries""" async with self.processing_lock: await self._process_batch()

Error Handling and Resilience

import traceback from functools import wraps import time def retry_on_failure(max_retries: int = 3, delay: float = 1.0): """Decorator for retrying operations with exponential backoff""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_retries - 1: wait_time = delay * (2 ** attempt) time.sleep(wait_time) continue break # Log the failure and re-raise logging.error(f"Operation failed after {max_retries} attempts: {last_exception}") raise last_exception return wrapper return decorator class ResilientMemoryManager(MemoryManager): """Memory manager with enhanced error handling""" @retry_on_failure(max_retries=3) def remember(self, content: str, memory_type: MemoryType, importance: float = 0.5, metadata: Optional[Dict] = None, tags: Optional[List[str]] = None) -> Optional[str]: try: return super().remember(content, memory_type, importance, metadata, tags) except Exception as e: self.logger.error(f"Failed to store memory: {e}") self.logger.error(traceback.format_exc()) # Could implement fallback storage here return None @retry_on_failure(max_retries=2) def recall(self, query: str, memory_types: Optional[List[MemoryType]] = None, limit: int = 10) -> List[MemoryEntry]: try: return super().recall(query, memory_types, limit) except Exception as e: self.logger.error(f"Failed to recall memories for query '{query}': {e}") # Return empty list as fallback return []

Monitoring and Metrics

import time from contextlib import contextmanager from dataclasses import dataclass from collections import defaultdict import json @dataclass class MemoryMetrics: """Memory system metrics""" total_memories: int = 0 memories_by_type: Dict[MemoryType, int] = None avg_retrieval_time: float = 0.0 avg_storage_time: float = 0.0 search_requests: int = 0 failed_operations: int = 0 def __post_init__(self): if self.memories_by_type is None: self.memories_by_type = defaultdict(int) class MonitoredMemoryManager(MemoryManager): """Memory manager with built-in monitoring""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.metrics = MemoryMetrics() self._operation_times: Dict[str, List[float]] = defaultdict(list) @contextmanager def _measure_time(self, operation: str): start_time = time.time() try: yield finally: elapsed = time.time() - start_time self._operation_times[operation].append(elapsed) def remember(self, *args, **kwargs) -> str: with self._measure_time("storage"): try: memory_id = super().remember(*args, **kwargs) self.metrics.total_memories += 1 memory_type = kwargs.get('memory_type') or args[1] self.metrics.memories_by_type[memory_type] += 1 return memory_id except Exception as e: self.metrics.failed_operations += 1 raise def recall(self, *args, **kwargs) -> List[MemoryEntry]: with self._measure_time("retrieval"): try: self.metrics.search_requests += 1 return super().recall(*args, **kwargs) except Exception as e: self.metrics.failed_operations += 1 raise def get_metrics(self) -> Dict[str, Any]: """Get current system metrics""" # Calculate averages if self._operation_times["storage"]: self.metrics.avg_storage_time = sum(self._operation_times["storage"]) / len(self._operation_times["storage"]) if self._operation_times["retrieval"]: self.metrics.avg_retrieval_time = sum(self._operation_times["retrieval"]) / len(self._operation_times["retrieval"]) return { "total_memories": self.metrics.total_memories, "memories_by_type": dict(self.metrics.memories_by_type), "average_storage_time_ms": self.metrics.avg_storage_time * 1000, "average_retrieval_time_ms": self.metrics.avg_retrieval_time * 1000, "search_requests": self.metrics.search_requests, "failed_operations": self.metrics.failed_operations, "success_rate": 1.0 - (self.metrics.failed_operations / max(1, self.metrics.search_requests + self.metrics.total_memories)) }

This comprehensive implementation guide provides production-ready Python code for building agent memory systems with proper error handling, testing, monitoring, and integration patterns. The modular design allows for easy customization and scaling based on specific requirements.