Python Memory Implementation
Building production-ready agent memory systems in Python requires careful consideration of data structures, persistence layers, and integration patterns. This guide provides hands-on implementation examples using popular Python libraries and frameworks.
Core Memory Classes
Basic Memory Structure
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import uuid
import json
from dataclasses import dataclass, asdict
from enum import Enum
class MemoryType(Enum):
EPISODIC = "episodic"
SEMANTIC = "semantic"
WORKING = "working"
PROCEDURAL = "procedural"
@dataclass
class MemoryEntry:
"""Core memory entry structure"""
id: str
content: str
memory_type: MemoryType
timestamp: datetime
importance: float
metadata: Dict[str, Any]
embedding: Optional[List[float]] = None
tags: Optional[List[str]] = None
def __post_init__(self):
if not self.id:
self.id = str(uuid.uuid4())
if self.metadata is None:
self.metadata = {}
if self.tags is None:
self.tags = []
class MemoryStore(ABC):
"""Abstract base class for memory storage"""
@abstractmethod
def store(self, entry: MemoryEntry) -> str:
"""Store a memory entry and return its ID"""
pass
@abstractmethod
def retrieve(self, memory_id: str) -> Optional[MemoryEntry]:
"""Retrieve a memory by ID"""
pass
@abstractmethod
def search(self, query: str, memory_type: Optional[MemoryType] = None,
limit: int = 10) -> List[MemoryEntry]:
"""Search memories by content"""
pass
@abstractmethod
def delete(self, memory_id: str) -> bool:
"""Delete a memory entry"""
passIn-Memory Implementation
from collections import defaultdict
import threading
from typing import Set
class InMemoryStore(MemoryStore):
"""Thread-safe in-memory memory store for development and testing"""
def __init__(self):
self._memories: Dict[str, MemoryEntry] = {}
self._type_index: Dict[MemoryType, Set[str]] = defaultdict(set)
self._tag_index: Dict[str, Set[str]] = defaultdict(set)
self._lock = threading.RLock()
def store(self, entry: MemoryEntry) -> str:
with self._lock:
self._memories[entry.id] = entry
self._type_index[entry.memory_type].add(entry.id)
for tag in entry.tags or []:
self._tag_index[tag].add(entry.id)
return entry.id
def retrieve(self, memory_id: str) -> Optional[MemoryEntry]:
with self._lock:
return self._memories.get(memory_id)
def search(self, query: str, memory_type: Optional[MemoryType] = None,
limit: int = 10) -> List[MemoryEntry]:
with self._lock:
candidates = set(self._memories.keys())
if memory_type:
candidates &= self._type_index[memory_type]
# Simple text matching (replace with vector search in production)
matches = []
query_lower = query.lower()
for memory_id in candidates:
memory = self._memories[memory_id]
if query_lower in memory.content.lower():
matches.append((memory, memory.importance))
# Sort by importance and limit results
matches.sort(key=lambda x: x[1], reverse=True)
return [match[0] for match in matches[:limit]]
def delete(self, memory_id: str) -> bool:
with self._lock:
if memory_id not in self._memories:
return False
memory = self._memories[memory_id]
del self._memories[memory_id]
self._type_index[memory.memory_type].discard(memory_id)
for tag in memory.tags or []:
self._tag_index[tag].discard(memory_id)
return True
def get_by_type(self, memory_type: MemoryType, limit: int = 50) -> List[MemoryEntry]:
"""Get memories by type, sorted by timestamp"""
with self._lock:
memory_ids = self._type_index[memory_type]
memories = [self._memories[mid] for mid in memory_ids]
return sorted(memories, key=lambda m: m.timestamp, reverse=True)[:limit]Vector Search Integration
Using Sentence Transformers
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import pickle
import os
class VectorMemoryStore(MemoryStore):
"""Memory store with vector similarity search"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2",
persistence_path: Optional[str] = None):
self.model = SentenceTransformer(model_name)
self.persistence_path = persistence_path
self._memories: Dict[str, MemoryEntry] = {}
self._embeddings: Dict[str, np.ndarray] = {}
self._lock = threading.RLock()
if persistence_path and os.path.exists(persistence_path):
self._load_from_disk()
def _generate_embedding(self, text: str) -> np.ndarray:
"""Generate embedding for text"""
return self.model.encode(text, convert_to_numpy=True)
def store(self, entry: MemoryEntry) -> str:
with self._lock:
# Generate embedding if not provided
if entry.embedding is None:
embedding = self._generate_embedding(entry.content)
entry.embedding = embedding.tolist()
else:
embedding = np.array(entry.embedding)
self._memories[entry.id] = entry
self._embeddings[entry.id] = embedding
if self.persistence_path:
self._save_to_disk()
return entry.id
def retrieve(self, memory_id: str) -> Optional[MemoryEntry]:
with self._lock:
return self._memories.get(memory_id)
def search(self, query: str, memory_type: Optional[MemoryType] = None,
limit: int = 10, similarity_threshold: float = 0.3) -> List[MemoryEntry]:
with self._lock:
if not self._embeddings:
return []
query_embedding = self._generate_embedding(query)
similarities = []
for memory_id, embedding in self._embeddings.items():
memory = self._memories[memory_id]
# Filter by type if specified
if memory_type and memory.memory_type != memory_type:
continue
# Calculate similarity
similarity = cosine_similarity(
query_embedding.reshape(1, -1),
embedding.reshape(1, -1)
)[0][0]
if similarity >= similarity_threshold:
similarities.append((memory, similarity))
# Sort by similarity and return top results
similarities.sort(key=lambda x: x[1], reverse=True)
return [sim[0] for sim in similarities[:limit]]
def delete(self, memory_id: str) -> bool:
with self._lock:
if memory_id not in self._memories:
return False
del self._memories[memory_id]
del self._embeddings[memory_id]
if self.persistence_path:
self._save_to_disk()
return True
def _save_to_disk(self):
"""Save memories and embeddings to disk"""
data = {
'memories': {k: asdict(v) for k, v in self._memories.items()},
'embeddings': self._embeddings
}
with open(self.persistence_path, 'wb') as f:
pickle.dump(data, f)
def _load_from_disk(self):
"""Load memories and embeddings from disk"""
with open(self.persistence_path, 'rb') as f:
data = pickle.load(f)
self._memories = {
k: MemoryEntry(**v) for k, v in data['memories'].items()
}
self._embeddings = data['embeddings']Agent Memory Manager
Complete Memory Management System
from typing import Callable, Optional
import logging
from datetime import datetime, timedelta
class MemoryManager:
"""High-level memory management for agents"""
def __init__(self, store: MemoryStore,
importance_threshold: float = 0.5,
max_working_memory: int = 20,
cleanup_interval: timedelta = timedelta(hours=24)):
self.store = store
self.importance_threshold = importance_threshold
self.max_working_memory = max_working_memory
self.cleanup_interval = cleanup_interval
self.last_cleanup = datetime.now()
self.logger = logging.getLogger(__name__)
def remember(self, content: str, memory_type: MemoryType,
importance: float = 0.5, metadata: Optional[Dict] = None,
tags: Optional[List[str]] = None) -> str:
"""Store a new memory"""
entry = MemoryEntry(
id="", # Will be generated
content=content,
memory_type=memory_type,
timestamp=datetime.now(),
importance=importance,
metadata=metadata or {},
tags=tags
)
memory_id = self.store.store(entry)
self.logger.info(f"Stored memory {memory_id} of type {memory_type}")
# Trigger cleanup if needed
if datetime.now() - self.last_cleanup > self.cleanup_interval:
self._cleanup_memories()
return memory_id
def recall(self, query: str, memory_types: Optional[List[MemoryType]] = None,
limit: int = 10) -> List[MemoryEntry]:
"""Recall memories matching the query"""
if memory_types:
all_results = []
for memory_type in memory_types:
results = self.store.search(query, memory_type, limit)
all_results.extend(results)
# Remove duplicates and sort by importance
seen = set()
unique_results = []
for memory in all_results:
if memory.id not in seen:
seen.add(memory.id)
unique_results.append(memory)
return sorted(unique_results,
key=lambda m: m.importance, reverse=True)[:limit]
else:
return self.store.search(query, limit=limit)
def get_working_memory(self) -> List[MemoryEntry]:
"""Get current working memory (recent high-importance memories)"""
if hasattr(self.store, 'get_by_type'):
working_memories = self.store.get_by_type(MemoryType.WORKING,
self.max_working_memory)
else:
# Fallback for stores without type indexing
working_memories = self.store.search("", MemoryType.WORKING,
self.max_working_memory)
return working_memories
def consolidate_memories(self, consolidation_fn: Callable[[List[MemoryEntry]], str]):
"""Consolidate related memories into semantic memory"""
# Get recent episodic memories
if hasattr(self.store, 'get_by_type'):
episodic_memories = self.store.get_by_type(MemoryType.EPISODIC, 100)
else:
episodic_memories = self.store.search("", MemoryType.EPISODIC, 100)
if len(episodic_memories) < 5:
return
# Group memories by similarity (simplified grouping)
# In production, use more sophisticated clustering
memory_groups = self._group_similar_memories(episodic_memories)
for group in memory_groups:
if len(group) >= 3: # Consolidate groups of 3+ related memories
consolidated_content = consolidation_fn(group)
# Create semantic memory
self.remember(
content=consolidated_content,
memory_type=MemoryType.SEMANTIC,
importance=max(m.importance for m in group),
metadata={
'consolidated_from': [m.id for m in group],
'consolidation_date': datetime.now().isoformat()
}
)
# Remove original episodic memories
for memory in group:
self.store.delete(memory.id)
def _group_similar_memories(self, memories: List[MemoryEntry]) -> List[List[MemoryEntry]]:
"""Group similar memories for consolidation"""
# Simplified grouping by content similarity
# In production, use clustering algorithms
groups = []
used_memories = set()
for memory in memories:
if memory.id in used_memories:
continue
group = [memory]
used_memories.add(memory.id)
# Find similar memories
for other_memory in memories:
if other_memory.id in used_memories:
continue
# Simple similarity check (replace with proper similarity)
if self._are_similar(memory.content, other_memory.content):
group.append(other_memory)
used_memories.add(other_memory.id)
if len(group) >= 2:
groups.append(group)
return groups
def _are_similar(self, content1: str, content2: str) -> bool:
"""Check if two memory contents are similar"""
# Simplified similarity check
words1 = set(content1.lower().split())
words2 = set(content2.lower().split())
intersection = len(words1 & words2)
union = len(words1 | words2)
return union > 0 and intersection / union > 0.3
def _cleanup_memories(self):
"""Clean up old, low-importance memories"""
self.logger.info("Starting memory cleanup")
cutoff_date = datetime.now() - timedelta(days=30)
# This is a simplified cleanup - implement proper cleanup logic
# based on your specific needs
self.last_cleanup = datetime.now()
self.logger.info("Memory cleanup completed")Integration Examples
LangChain Integration
from langchain.memory import BaseMemory
from langchain.schema import BaseMessage
from typing import Dict, List, Any
class AgentMemoryLangChain(BaseMemory):
"""LangChain memory integration"""
def __init__(self, memory_manager: MemoryManager):
self.memory_manager = memory_manager
self.memory_key = "agent_memory"
@property
def memory_variables(self) -> List[str]:
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Load relevant memories based on current input"""
query = inputs.get("input", "")
# Get relevant memories
memories = self.memory_manager.recall(query, limit=5)
working_memory = self.memory_manager.get_working_memory()
# Format memories for the prompt
memory_text = self._format_memories(memories + working_memory)
return {self.memory_key: memory_text}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]):
"""Save the conversation context to memory"""
input_text = inputs.get("input", "")
output_text = outputs.get("output", "")
# Store the interaction
interaction = f"Human: {input_text}\nAssistant: {output_text}"
self.memory_manager.remember(
content=interaction,
memory_type=MemoryType.EPISODIC,
importance=0.7,
metadata={
"interaction_type": "conversation",
"input": input_text,
"output": output_text
}
)
def clear(self):
"""Clear working memory"""
# Implementation depends on your cleanup strategy
pass
def _format_memories(self, memories: List[MemoryEntry]) -> str:
"""Format memories for inclusion in prompts"""
if not memories:
return "No relevant memories found."
formatted = ["Relevant memories:"]
for memory in memories:
timestamp = memory.timestamp.strftime("%Y-%m-%d %H:%M")
formatted.append(f"[{timestamp}] {memory.content}")
return "\n".join(formatted)FastAPI Integration
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from datetime import datetime
app = FastAPI(title="Agent Memory API")
# Global memory manager instance
memory_manager: Optional[MemoryManager] = None
class MemoryRequest(BaseModel):
content: str
memory_type: str
importance: float = 0.5
metadata: Optional[Dict[str, Any]] = None
tags: Optional[List[str]] = None
class MemoryResponse(BaseModel):
id: str
content: str
memory_type: str
timestamp: datetime
importance: float
metadata: Dict[str, Any]
tags: List[str]
class SearchRequest(BaseModel):
query: str
memory_types: Optional[List[str]] = None
limit: int = 10
@app.on_event("startup")
async def startup_event():
global memory_manager
# Initialize with vector store for production
store = VectorMemoryStore(persistence_path="memories.pkl")
memory_manager = MemoryManager(store)
@app.post("/memories/", response_model=str)
async def create_memory(request: MemoryRequest):
"""Store a new memory"""
try:
memory_type = MemoryType(request.memory_type)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid memory type")
memory_id = memory_manager.remember(
content=request.content,
memory_type=memory_type,
importance=request.importance,
metadata=request.metadata,
tags=request.tags
)
return memory_id
@app.get("/memories/{memory_id}", response_model=MemoryResponse)
async def get_memory(memory_id: str):
"""Retrieve a specific memory"""
memory = memory_manager.store.retrieve(memory_id)
if not memory:
raise HTTPException(status_code=404, detail="Memory not found")
return MemoryResponse(
id=memory.id,
content=memory.content,
memory_type=memory.memory_type.value,
timestamp=memory.timestamp,
importance=memory.importance,
metadata=memory.metadata,
tags=memory.tags or []
)
@app.post("/memories/search", response_model=List[MemoryResponse])
async def search_memories(request: SearchRequest):
"""Search for memories"""
memory_types = None
if request.memory_types:
try:
memory_types = [MemoryType(mt) for mt in request.memory_types]
except ValueError:
raise HTTPException(status_code=400, detail="Invalid memory type")
memories = memory_manager.recall(
query=request.query,
memory_types=memory_types,
limit=request.limit
)
return [
MemoryResponse(
id=memory.id,
content=memory.content,
memory_type=memory.memory_type.value,
timestamp=memory.timestamp,
importance=memory.importance,
metadata=memory.metadata,
tags=memory.tags or []
)
for memory in memories
]
@app.delete("/memories/{memory_id}")
async def delete_memory(memory_id: str):
"""Delete a memory"""
success = memory_manager.store.delete(memory_id)
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"message": "Memory deleted successfully"}Testing Strategies
Unit Tests
import unittest
from unittest.mock import patch, MagicMock
import tempfile
import os
class TestMemoryStore(unittest.TestCase):
def setUp(self):
self.store = InMemoryStore()
self.sample_entry = MemoryEntry(
id="test-id",
content="Test memory content",
memory_type=MemoryType.EPISODIC,
timestamp=datetime.now(),
importance=0.8,
metadata={"test": True}
)
def test_store_and_retrieve(self):
"""Test basic store and retrieve functionality"""
memory_id = self.store.store(self.sample_entry)
self.assertEqual(memory_id, self.sample_entry.id)
retrieved = self.store.retrieve(memory_id)
self.assertIsNotNone(retrieved)
self.assertEqual(retrieved.content, self.sample_entry.content)
def test_search_functionality(self):
"""Test search capabilities"""
self.store.store(self.sample_entry)
results = self.store.search("Test memory")
self.assertEqual(len(results), 1)
self.assertEqual(results[0].id, self.sample_entry.id)
# Test empty results
empty_results = self.store.search("nonexistent")
self.assertEqual(len(empty_results), 0)
def test_delete_functionality(self):
"""Test memory deletion"""
memory_id = self.store.store(self.sample_entry)
success = self.store.delete(memory_id)
self.assertTrue(success)
retrieved = self.store.retrieve(memory_id)
self.assertIsNone(retrieved)
class TestVectorMemoryStore(unittest.TestCase):
def setUp(self):
# Create temporary file for testing persistence
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
self.temp_file.close()
self.store = VectorMemoryStore(persistence_path=self.temp_file.name)
def tearDown(self):
os.unlink(self.temp_file.name)
def test_embedding_generation(self):
"""Test that embeddings are generated for stored memories"""
entry = MemoryEntry(
id="test-embed",
content="This is a test for embedding generation",
memory_type=MemoryType.SEMANTIC,
timestamp=datetime.now(),
importance=0.5,
metadata={}
)
memory_id = self.store.store(entry)
retrieved = self.store.retrieve(memory_id)
self.assertIsNotNone(retrieved.embedding)
self.assertIsInstance(retrieved.embedding, list)
self.assertGreater(len(retrieved.embedding), 0)
def test_similarity_search(self):
"""Test vector similarity search"""
# Store related memories
entry1 = MemoryEntry(
id="sim1", content="Python programming language",
memory_type=MemoryType.SEMANTIC, timestamp=datetime.now(),
importance=0.7, metadata={}
)
entry2 = MemoryEntry(
id="sim2", content="Cooking pasta recipes",
memory_type=MemoryType.SEMANTIC, timestamp=datetime.now(),
importance=0.6, metadata={}
)
self.store.store(entry1)
self.store.store(entry2)
# Search for programming-related content
results = self.store.search("coding in Python", limit=5)
# Should find the programming-related memory
self.assertGreater(len(results), 0)
self.assertIn("Python", results[0].content)
if __name__ == "__main__":
unittest.main()Integration Tests
import pytest
import asyncio
from httpx import AsyncClient
from fastapi.testclient import TestClient
@pytest.fixture
def test_client():
"""Create test client with in-memory store"""
global memory_manager
store = InMemoryStore()
memory_manager = MemoryManager(store)
return TestClient(app)
def test_memory_lifecycle(test_client):
"""Test complete memory lifecycle via API"""
# Create memory
create_response = test_client.post("/memories/", json={
"content": "Test memory for API",
"memory_type": "episodic",
"importance": 0.8,
"tags": ["test", "api"]
})
assert create_response.status_code == 200
memory_id = create_response.json()
# Retrieve memory
get_response = test_client.get(f"/memories/{memory_id}")
assert get_response.status_code == 200
memory_data = get_response.json()
assert memory_data["content"] == "Test memory for API"
assert memory_data["memory_type"] == "episodic"
# Search for memory
search_response = test_client.post("/memories/search", json={
"query": "Test memory",
"limit": 10
})
assert search_response.status_code == 200
search_results = search_response.json()
assert len(search_results) >= 1
# Delete memory
delete_response = test_client.delete(f"/memories/{memory_id}")
assert delete_response.status_code == 200
# Verify deletion
get_after_delete = test_client.get(f"/memories/{memory_id}")
assert get_after_delete.status_code == 404Production Considerations
Performance Optimization
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncGenerator
class AsyncMemoryStore(MemoryStore):
"""Async wrapper for memory store operations"""
def __init__(self, sync_store: MemoryStore, max_workers: int = 4):
self.sync_store = sync_store
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def store(self, entry: MemoryEntry) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor, self.sync_store.store, entry
)
async def retrieve(self, memory_id: str) -> Optional[MemoryEntry]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor, self.sync_store.retrieve, memory_id
)
async def search(self, query: str, memory_type: Optional[MemoryType] = None,
limit: int = 10) -> List[MemoryEntry]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor, self.sync_store.search, query, memory_type, limit
)
# Batch processing for efficiency
class BatchMemoryProcessor:
"""Process memories in batches for better performance"""
def __init__(self, store: MemoryStore, batch_size: int = 100):
self.store = store
self.batch_size = batch_size
self.pending_entries: List[MemoryEntry] = []
self.processing_lock = asyncio.Lock()
async def add_entry(self, entry: MemoryEntry):
async with self.processing_lock:
self.pending_entries.append(entry)
if len(self.pending_entries) >= self.batch_size:
await self._process_batch()
async def _process_batch(self):
"""Process accumulated entries"""
if not self.pending_entries:
return
batch = self.pending_entries.copy()
self.pending_entries.clear()
# Process batch in parallel
tasks = [
asyncio.create_task(self._store_single(entry))
for entry in batch
]
await asyncio.gather(*tasks)
async def _store_single(self, entry: MemoryEntry):
if isinstance(self.store, AsyncMemoryStore):
await self.store.store(entry)
else:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.store.store, entry)
async def flush(self):
"""Process any remaining entries"""
async with self.processing_lock:
await self._process_batch()Error Handling and Resilience
import traceback
from functools import wraps
import time
def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying operations with exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt)
time.sleep(wait_time)
continue
break
# Log the failure and re-raise
logging.error(f"Operation failed after {max_retries} attempts: {last_exception}")
raise last_exception
return wrapper
return decorator
class ResilientMemoryManager(MemoryManager):
"""Memory manager with enhanced error handling"""
@retry_on_failure(max_retries=3)
def remember(self, content: str, memory_type: MemoryType,
importance: float = 0.5, metadata: Optional[Dict] = None,
tags: Optional[List[str]] = None) -> Optional[str]:
try:
return super().remember(content, memory_type, importance, metadata, tags)
except Exception as e:
self.logger.error(f"Failed to store memory: {e}")
self.logger.error(traceback.format_exc())
# Could implement fallback storage here
return None
@retry_on_failure(max_retries=2)
def recall(self, query: str, memory_types: Optional[List[MemoryType]] = None,
limit: int = 10) -> List[MemoryEntry]:
try:
return super().recall(query, memory_types, limit)
except Exception as e:
self.logger.error(f"Failed to recall memories for query '{query}': {e}")
# Return empty list as fallback
return []Monitoring and Metrics
import time
from contextlib import contextmanager
from dataclasses import dataclass
from collections import defaultdict
import json
@dataclass
class MemoryMetrics:
"""Memory system metrics"""
total_memories: int = 0
memories_by_type: Dict[MemoryType, int] = None
avg_retrieval_time: float = 0.0
avg_storage_time: float = 0.0
search_requests: int = 0
failed_operations: int = 0
def __post_init__(self):
if self.memories_by_type is None:
self.memories_by_type = defaultdict(int)
class MonitoredMemoryManager(MemoryManager):
"""Memory manager with built-in monitoring"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = MemoryMetrics()
self._operation_times: Dict[str, List[float]] = defaultdict(list)
@contextmanager
def _measure_time(self, operation: str):
start_time = time.time()
try:
yield
finally:
elapsed = time.time() - start_time
self._operation_times[operation].append(elapsed)
def remember(self, *args, **kwargs) -> str:
with self._measure_time("storage"):
try:
memory_id = super().remember(*args, **kwargs)
self.metrics.total_memories += 1
memory_type = kwargs.get('memory_type') or args[1]
self.metrics.memories_by_type[memory_type] += 1
return memory_id
except Exception as e:
self.metrics.failed_operations += 1
raise
def recall(self, *args, **kwargs) -> List[MemoryEntry]:
with self._measure_time("retrieval"):
try:
self.metrics.search_requests += 1
return super().recall(*args, **kwargs)
except Exception as e:
self.metrics.failed_operations += 1
raise
def get_metrics(self) -> Dict[str, Any]:
"""Get current system metrics"""
# Calculate averages
if self._operation_times["storage"]:
self.metrics.avg_storage_time = sum(self._operation_times["storage"]) / len(self._operation_times["storage"])
if self._operation_times["retrieval"]:
self.metrics.avg_retrieval_time = sum(self._operation_times["retrieval"]) / len(self._operation_times["retrieval"])
return {
"total_memories": self.metrics.total_memories,
"memories_by_type": dict(self.metrics.memories_by_type),
"average_storage_time_ms": self.metrics.avg_storage_time * 1000,
"average_retrieval_time_ms": self.metrics.avg_retrieval_time * 1000,
"search_requests": self.metrics.search_requests,
"failed_operations": self.metrics.failed_operations,
"success_rate": 1.0 - (self.metrics.failed_operations / max(1, self.metrics.search_requests + self.metrics.total_memories))
}This comprehensive implementation guide provides production-ready Python code for building agent memory systems with proper error handling, testing, monitoring, and integration patterns. The modular design allows for easy customization and scaling based on specific requirements.