LLM Integration Patterns
Integrating Large Language Models with memory systems requires careful consideration of prompt engineering, context management, and processing pipelines. This guide provides production-ready implementations for seamless LLM-memory integration.
LLM Memory Interface
Universal LLM Client
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, AsyncGenerator, Union
from dataclasses import dataclass
from enum import Enum
import asyncio
import aiohttp
import json
import time
from datetime import datetime
class LLMProvider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
GOOGLE = "google"
AZURE_OPENAI = "azure_openai"
COHERE = "cohere"
TOGETHER = "together"
@dataclass
class LLMMessage:
"""Standardized message format across LLM providers"""
role: str # "system", "user", "assistant"
content: str
metadata: Optional[Dict[str, Any]] = None
@dataclass
class LLMResponse:
"""Standardized response format"""
content: str
model: str
tokens_used: int
cost_estimate: Optional[float] = None
metadata: Optional[Dict[str, Any]] = None
class BaseLLMClient(ABC):
"""Base class for LLM clients"""
@abstractmethod
async def chat_completion(self, messages: List[LLMMessage],
model: str, **kwargs) -> LLMResponse:
pass
@abstractmethod
async def embedding(self, text: str, model: str) -> List[float]:
pass
class OpenAIClient(BaseLLMClient):
"""OpenAI API client with memory integration"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
self.base_url = base_url or "https://api.openai.com/v1"
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def chat_completion(self, messages: List[LLMMessage],
model: str = "gpt-3.5-turbo", **kwargs) -> LLMResponse:
"""Send chat completion request"""
# Convert messages to OpenAI format
openai_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
payload = {
"model": model,
"messages": openai_messages,
**kwargs
}
async with self.session.post(
f"{self.base_url}/chat/completions",
json=payload
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"OpenAI API error: {error_text}")
data = await response.json()
return LLMResponse(
content=data["choices"][0]["message"]["content"],
model=data["model"],
tokens_used=data["usage"]["total_tokens"],
cost_estimate=self._estimate_cost(data["usage"], model),
metadata={"finish_reason": data["choices"][0]["finish_reason"]}
)
async def embedding(self, text: str, model: str = "text-embedding-ada-002") -> List[float]:
"""Generate text embedding"""
payload = {
"model": model,
"input": text
}
async with self.session.post(
f"{self.base_url}/embeddings",
json=payload
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"OpenAI embedding error: {error_text}")
data = await response.json()
return data["data"][0]["embedding"]
def _estimate_cost(self, usage: Dict, model: str) -> float:
"""Estimate API cost based on token usage"""
# Simplified cost estimation (update with current pricing)
cost_per_1k_tokens = {
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
}
if model in cost_per_1k_tokens:
input_cost = (usage.get("prompt_tokens", 0) / 1000) * cost_per_1k_tokens[model]["input"]
output_cost = (usage.get("completion_tokens", 0) / 1000) * cost_per_1k_tokens[model]["output"]
return input_cost + output_cost
return 0.0
class AnthropicClient(BaseLLMClient):
"""Anthropic Claude API client"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.anthropic.com/v1"
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01"
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def chat_completion(self, messages: List[LLMMessage],
model: str = "claude-3-sonnet-20240229", **kwargs) -> LLMResponse:
"""Send message to Claude"""
# Convert to Anthropic format
system_messages = [msg.content for msg in messages if msg.role == "system"]
conversation_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages if msg.role != "system"
]
payload = {
"model": model,
"messages": conversation_messages,
"max_tokens": kwargs.get("max_tokens", 1024),
**{k: v for k, v in kwargs.items() if k != "max_tokens"}
}
if system_messages:
payload["system"] = " ".join(system_messages)
async with self.session.post(
f"{self.base_url}/messages",
json=payload
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Anthropic API error: {error_text}")
data = await response.json()
return LLMResponse(
content=data["content"][0]["text"],
model=data["model"],
tokens_used=data["usage"]["input_tokens"] + data["usage"]["output_tokens"],
metadata={
"stop_reason": data.get("stop_reason"),
"input_tokens": data["usage"]["input_tokens"],
"output_tokens": data["usage"]["output_tokens"]
}
)
async def embedding(self, text: str, model: str) -> List[float]:
"""Anthropic doesn't provide embeddings - use alternative"""
raise NotImplementedError("Anthropic doesn't provide embedding endpoints")
class LLMClientFactory:
"""Factory for creating LLM clients"""
@staticmethod
def create_client(provider: LLMProvider, **kwargs) -> BaseLLMClient:
"""Create appropriate LLM client"""
if provider == LLMProvider.OPENAI:
return OpenAIClient(**kwargs)
elif provider == LLMProvider.ANTHROPIC:
return AnthropicClient(**kwargs)
else:
raise ValueError(f"Unsupported provider: {provider}")Memory-Enhanced LLM Wrapper
Intelligent Memory Integration
from typing import List, Dict, Any, Optional, Tuple
import logging
from datetime import datetime, timedelta
class MemoryEnhancedLLM:
"""LLM wrapper with integrated memory system"""
def __init__(self, llm_client: BaseLLMClient, memory_manager,
embedding_client: Optional[BaseLLMClient] = None):
self.llm_client = llm_client
self.memory_manager = memory_manager
self.embedding_client = embedding_client or llm_client
self.logger = logging.getLogger(__name__)
async def chat_with_memory(self, user_input: str,
conversation_id: Optional[str] = None,
memory_types: Optional[List[str]] = None,
max_context_memories: int = 10,
model: str = "gpt-3.5-turbo") -> LLMResponse:
"""Process user input with memory context"""
# Store user input as episodic memory
await self._store_interaction_memory(
content=f"User: {user_input}",
conversation_id=conversation_id,
memory_type="episodic"
)
# Retrieve relevant memories
relevant_memories = await self._get_relevant_memories(
query=user_input,
memory_types=memory_types or ["episodic", "semantic"],
limit=max_context_memories
)
# Build context with memories
messages = await self._build_message_context(
user_input=user_input,
relevant_memories=relevant_memories,
conversation_id=conversation_id
)
# Get LLM response
response = await self.llm_client.chat_completion(
messages=messages,
model=model
)
# Store assistant response as memory
await self._store_interaction_memory(
content=f"Assistant: {response.content}",
conversation_id=conversation_id,
memory_type="episodic"
)
# Extract and store important information
await self._extract_and_store_knowledge(
user_input=user_input,
assistant_response=response.content,
conversation_id=conversation_id
)
return response
async def _store_interaction_memory(self, content: str,
conversation_id: Optional[str] = None,
memory_type: str = "episodic"):
"""Store interaction in memory"""
metadata = {
"interaction_timestamp": datetime.now().isoformat(),
"memory_source": "conversation"
}
if conversation_id:
metadata["conversation_id"] = conversation_id
memory_id = await self.memory_manager.remember(
content=content,
memory_type=memory_type,
importance=0.7, # Default importance for conversations
metadata=metadata
)
self.logger.info(f"Stored interaction memory: {memory_id}")
return memory_id
async def _get_relevant_memories(self, query: str,
memory_types: List[str],
limit: int) -> List[Dict[str, Any]]:
"""Retrieve memories relevant to the query"""
memories = await self.memory_manager.recall(
query=query,
memory_types=memory_types,
limit=limit
)
return memories
async def _build_message_context(self, user_input: str,
relevant_memories: List[Dict[str, Any]],
conversation_id: Optional[str] = None) -> List[LLMMessage]:
"""Build message context including relevant memories"""
messages = []
# System message with memory context
system_context = await self._build_system_context(relevant_memories)
messages.append(LLMMessage(role="system", content=system_context))
# Recent conversation history if available
if conversation_id:
recent_history = await self._get_recent_conversation(conversation_id)
messages.extend(recent_history)
# Current user input
messages.append(LLMMessage(role="user", content=user_input))
return messages
async def _build_system_context(self, memories: List[Dict[str, Any]]) -> str:
"""Build system context from relevant memories"""
if not memories:
return "You are a helpful assistant with access to conversation memory."
context_parts = [
"You are a helpful assistant with access to relevant memories from previous conversations.",
"",
"Relevant memories:"
]
for memory in memories:
timestamp = memory.get('timestamp', 'Unknown time')
content = memory.get('content', '')
importance = memory.get('importance', 0)
context_parts.append(
f"[{timestamp}] (importance: {importance:.2f}) {content}"
)
context_parts.extend([
"",
"Use this context to provide more personalized and informed responses. "
"Reference relevant memories when appropriate, but don't overwhelm the user with details."
])
return "\n".join(context_parts)
async def _get_recent_conversation(self, conversation_id: str,
limit: int = 10) -> List[LLMMessage]:
"""Get recent conversation history"""
# Search for recent messages in this conversation
memories = await self.memory_manager.recall(
query=f"conversation_id:{conversation_id}",
memory_types=["episodic"],
limit=limit * 2 # Get more to filter
)
# Filter and sort by timestamp
conversation_memories = [
m for m in memories
if m.get('metadata', {}).get('conversation_id') == conversation_id
]
conversation_memories.sort(
key=lambda m: m.get('timestamp', datetime.min),
reverse=False
)
# Convert to messages
messages = []
for memory in conversation_memories[-limit:]:
content = memory.get('content', '')
if content.startswith('User: '):
messages.append(LLMMessage(
role="user",
content=content[6:] # Remove 'User: ' prefix
))
elif content.startswith('Assistant: '):
messages.append(LLMMessage(
role="assistant",
content=content[11:] # Remove 'Assistant: ' prefix
))
return messages
async def _extract_and_store_knowledge(self, user_input: str,
assistant_response: str,
conversation_id: Optional[str] = None):
"""Extract important knowledge and store as semantic memory"""
# Use LLM to extract key information
extraction_prompt = f"""
Analyze this conversation exchange and extract any important facts, preferences,
or knowledge that should be remembered for future interactions:
User: {user_input}
Assistant: {assistant_response}
Extract:
1. Any personal preferences or information about the user
2. Important factual information discussed
3. Decisions or conclusions reached
4. Any domain-specific knowledge shared
Format as bullet points, or respond with "None" if nothing significant to remember.
"""
try:
extraction_response = await self.llm_client.chat_completion(
messages=[LLMMessage(role="user", content=extraction_prompt)],
model="gpt-3.5-turbo",
max_tokens=200
)
extracted_knowledge = extraction_response.content.strip()
if extracted_knowledge and extracted_knowledge.lower() != "none":
# Store as semantic memory
await self.memory_manager.remember(
content=extracted_knowledge,
memory_type="semantic",
importance=0.8,
metadata={
"source": "knowledge_extraction",
"conversation_id": conversation_id,
"extraction_timestamp": datetime.now().isoformat()
}
)
self.logger.info(f"Extracted and stored knowledge: {extracted_knowledge[:100]}...")
except Exception as e:
self.logger.error(f"Failed to extract knowledge: {e}")
class MemoryProcessingPipeline:
"""Pipeline for processing and enhancing memories with LLM"""
def __init__(self, llm_client: BaseLLMClient, memory_manager):
self.llm_client = llm_client
self.memory_manager = memory_manager
self.logger = logging.getLogger(__name__)
async def enhance_memory_content(self, memory_entry: Dict[str, Any]) -> Dict[str, Any]:
"""Enhance memory content with LLM analysis"""
content = memory_entry.get('content', '')
# Generate enhanced description
enhancement_prompt = f"""
Analyze this memory entry and provide:
1. A clear, concise summary
2. Key topics or themes
3. Emotional context (if applicable)
4. Importance assessment (1-10)
5. Suggested tags
Memory content: {content}
Respond in JSON format:
{{
"summary": "...",
"topics": ["topic1", "topic2"],
"emotional_context": "neutral/positive/negative/excited/etc",
"importance": 7,
"suggested_tags": ["tag1", "tag2"]
}}
"""
try:
response = await self.llm_client.chat_completion(
messages=[LLMMessage(role="user", content=enhancement_prompt)],
model="gpt-3.5-turbo",
max_tokens=300
)
# Parse JSON response
enhancement_data = json.loads(response.content)
# Update memory entry
enhanced_memory = memory_entry.copy()
enhanced_memory['metadata'] = enhanced_memory.get('metadata', {})
enhanced_memory['metadata'].update({
'ai_summary': enhancement_data.get('summary'),
'ai_topics': enhancement_data.get('topics', []),
'ai_emotional_context': enhancement_data.get('emotional_context'),
'ai_importance': enhancement_data.get('importance'),
'enhancement_timestamp': datetime.now().isoformat()
})
# Update importance if AI suggests higher importance
ai_importance = enhancement_data.get('importance', 0) / 10.0
if ai_importance > enhanced_memory.get('importance', 0):
enhanced_memory['importance'] = ai_importance
# Add suggested tags
suggested_tags = enhancement_data.get('suggested_tags', [])
existing_tags = enhanced_memory.get('tags', [])
enhanced_memory['tags'] = list(set(existing_tags + suggested_tags))
return enhanced_memory
except Exception as e:
self.logger.error(f"Failed to enhance memory: {e}")
return memory_entry
async def consolidate_memories(self, memories: List[Dict[str, Any]]) -> str:
"""Consolidate multiple related memories into semantic knowledge"""
if len(memories) < 2:
return ""
# Prepare memories for consolidation
memory_summaries = []
for i, memory in enumerate(memories):
timestamp = memory.get('timestamp', 'Unknown')
content = memory.get('content', '')
importance = memory.get('importance', 0)
memory_summaries.append(
f"{i+1}. [{timestamp}] (importance: {importance:.2f}) {content}"
)
consolidation_prompt = f"""
Consolidate these related memories into a coherent summary that captures:
1. The main patterns or themes
2. Key facts or insights
3. Any progression or development over time
4. Important relationships between the memories
Memories to consolidate:
{chr(10).join(memory_summaries)}
Provide a clear, informative summary that would be useful for future reference.
"""
try:
response = await self.llm_client.chat_completion(
messages=[LLMMessage(role="user", content=consolidation_prompt)],
model="gpt-3.5-turbo",
max_tokens=500
)
return response.content.strip()
except Exception as e:
self.logger.error(f"Failed to consolidate memories: {e}")
return ""
async def generate_insights(self, user_id: str,
time_period: timedelta = timedelta(days=7)) -> List[str]:
"""Generate insights from user's recent memories"""
# Get recent memories
cutoff_date = datetime.now() - time_period
memories = await self.memory_manager.get_memories_since(user_id, cutoff_date)
if not memories:
return []
# Prepare memory data
memory_data = []
for memory in memories:
memory_data.append(f"- {memory.get('content', '')}")
insight_prompt = f"""
Analyze these memories from the past {time_period.days} days and generate insights about:
1. Patterns in user interests or behavior
2. Recurring themes or topics
3. Changes in sentiment or priorities
4. Areas where the user might benefit from reminders or suggestions
Memories:
{chr(10).join(memory_data)}
Provide 3-5 actionable insights as a JSON array of strings.
"""
try:
response = await self.llm_client.chat_completion(
messages=[LLMMessage(role="user", content=insight_prompt)],
model="gpt-3.5-turbo",
max_tokens=400
)
insights = json.loads(response.content)
return insights if isinstance(insights, list) else []
except Exception as e:
self.logger.error(f"Failed to generate insights: {e}")
return []Prompt Engineering for Memory
Memory-Aware Prompt Templates
from typing import Dict, List, Any, Optional
from jinja2 import Environment, BaseLoader
from datetime import datetime
class MemoryPromptTemplate:
"""Template system for memory-aware prompts"""
def __init__(self):
self.env = Environment(loader=BaseLoader())
self._register_filters()
def _register_filters(self):
"""Register custom filters for memory processing"""
@self.env.filters.register
def format_timestamp(timestamp):
"""Format timestamp for human reading"""
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)
return timestamp.strftime("%Y-%m-%d %H:%M")
@self.env.filters.register
def importance_level(importance):
"""Convert importance score to descriptive level"""
if importance >= 0.8:
return "High"
elif importance >= 0.6:
return "Medium"
else:
return "Low"
@self.env.filters.register
def truncate_content(content, max_length=200):
"""Truncate content with ellipsis"""
if len(content) <= max_length:
return content
return content[:max_length-3] + "..."
def render_memory_context(self, memories: List[Dict[str, Any]],
context_type: str = "general") -> str:
"""Render memory context for prompts"""
if context_type == "conversational":
template_str = """
Based on our previous conversations:
{% for memory in memories %}
[{{ memory.timestamp | format_timestamp }}] {{ memory.content | truncate_content }}
{% if memory.importance %}({{ memory.importance | importance_level }} importance){% endif %}
{% endfor %}
Please continue our conversation with this context in mind.
"""
elif context_type == "factual":
template_str = """
Relevant information from your knowledge base:
{% for memory in memories %}
• {{ memory.content | truncate_content }}
{% if memory.metadata and memory.metadata.source %}
Source: {{ memory.metadata.source }}
{% endif %}
{% endfor %}
Use this information to inform your response.
"""
elif context_type == "task_oriented":
template_str = """
Previous work and context:
{% for memory in memories %}
{% if memory.memory_type == 'procedural' %}
đź“‹ Process: {{ memory.content | truncate_content }}
{% elif memory.memory_type == 'episodic' %}
📝 Previous action: {{ memory.content | truncate_content }}
{% else %}
đź’ˇ Knowledge: {{ memory.content | truncate_content }}
{% endif %}
{% endfor %}
Continue working on this task with the above context.
"""
else: # general
template_str = """
Relevant memories:
{% for memory in memories %}
[{{ memory.timestamp | format_timestamp }}] {{ memory.content | truncate_content }}
({{ memory.memory_type }} memory, {{ memory.importance | importance_level }} importance)
{% endfor %}
"""
template = self.env.from_string(template_str)
return template.render(memories=memories).strip()
class PromptStrategy:
"""Different strategies for incorporating memory into prompts"""
@staticmethod
def summarized_context(memories: List[Dict[str, Any]],
max_context_length: int = 500) -> str:
"""Create a summarized context from memories"""
if not memories:
return ""
# Sort by importance
sorted_memories = sorted(memories,
key=lambda m: m.get('importance', 0),
reverse=True)
context_parts = []
current_length = 0
for memory in sorted_memories:
content = memory.get('content', '')
addition = f"• {content}\n"
if current_length + len(addition) > max_context_length:
if context_parts: # Only break if we have some context
break
else:
# If even the first memory is too long, truncate it
remaining = max_context_length - current_length - 3
addition = f"• {content[:remaining]}...\n"
context_parts.append(addition)
current_length += len(addition)
return "".join(context_parts).strip()
@staticmethod
def hierarchical_context(memories: List[Dict[str, Any]]) -> str:
"""Organize memories hierarchically by type and importance"""
if not memories:
return ""
# Group by memory type
by_type = {}
for memory in memories:
memory_type = memory.get('memory_type', 'unknown')
if memory_type not in by_type:
by_type[memory_type] = []
by_type[memory_type].append(memory)
# Sort each type by importance
for memory_type in by_type:
by_type[memory_type].sort(
key=lambda m: m.get('importance', 0),
reverse=True
)
# Build hierarchical context
context_parts = []
type_labels = {
'semantic': 'Knowledge & Facts',
'episodic': 'Previous Conversations',
'procedural': 'Processes & Methods',
'working': 'Current Context'
}
for memory_type, memories_list in by_type.items():
if not memories_list:
continue
label = type_labels.get(memory_type, memory_type.title())
context_parts.append(f"{label}:")
for memory in memories_list[:5]: # Limit per type
content = memory.get('content', '')
timestamp = memory.get('timestamp', '')
importance = memory.get('importance', 0)
if len(content) > 150:
content = content[:147] + "..."
context_parts.append(f" • {content}")
if timestamp and importance:
context_parts.append(f" ({timestamp} | importance: {importance:.1f})")
context_parts.append("") # Empty line between types
return "\n".join(context_parts).strip()
@staticmethod
def temporal_context(memories: List[Dict[str, Any]],
include_timeline: bool = True) -> str:
"""Create context emphasizing temporal relationships"""
if not memories:
return ""
# Sort by timestamp
sorted_memories = sorted(
[m for m in memories if m.get('timestamp')],
key=lambda m: m.get('timestamp'),
reverse=True
)
if not sorted_memories:
return PromptStrategy.summarized_context(memories)
context_parts = []
if include_timeline:
context_parts.append("Timeline of relevant memories:")
else:
context_parts.append("Recent relevant context:")
current_date = None
for memory in sorted_memories:
timestamp_str = memory.get('timestamp', '')
content = memory.get('content', '')
try:
if isinstance(timestamp_str, str):
timestamp = datetime.fromisoformat(timestamp_str)
else:
timestamp = timestamp_str
memory_date = timestamp.strftime("%Y-%m-%d")
if include_timeline and memory_date != current_date:
if current_date is not None:
context_parts.append("")
context_parts.append(f"--- {memory_date} ---")
current_date = memory_date
time_str = timestamp.strftime("%H:%M")
truncated_content = content[:200] + "..." if len(content) > 200 else content
context_parts.append(f"{time_str}: {truncated_content}")
except Exception:
# Fallback for bad timestamps
context_parts.append(f"• {content}")
return "\n".join(context_parts)
class AdaptivePromptBuilder:
"""Builds prompts that adapt to context and user patterns"""
def __init__(self, memory_manager, llm_client):
self.memory_manager = memory_manager
self.llm_client = llm_client
self.template_engine = MemoryPromptTemplate()
async def build_adaptive_prompt(self, user_input: str,
conversation_history: List[Dict],
user_preferences: Dict[str, Any]) -> str:
"""Build prompt adapted to user and context"""
# Analyze user input to determine best memory strategy
context_analysis = await self._analyze_context_needs(user_input)
# Get relevant memories based on analysis
memories = await self._get_targeted_memories(
user_input,
context_analysis
)
# Choose prompt strategy
strategy = self._choose_prompt_strategy(
context_analysis,
len(memories),
user_preferences
)
# Build context based on strategy
if strategy == "hierarchical":
memory_context = PromptStrategy.hierarchical_context(memories)
elif strategy == "temporal":
memory_context = PromptStrategy.temporal_context(memories)
else: # summarized
memory_context = PromptStrategy.summarized_context(memories)
# Build final prompt
system_prompt_parts = []
# Base instruction
system_prompt_parts.append(
"You are a helpful assistant with access to conversation memory. "
"Provide thoughtful, context-aware responses."
)
# Memory context
if memory_context:
system_prompt_parts.extend([
"",
memory_context,
"",
"Use this context to provide more personalized and informed responses."
])
# User preferences
if user_preferences:
prefs_text = self._format_user_preferences(user_preferences)
if prefs_text:
system_prompt_parts.extend([
"",
f"User preferences: {prefs_text}"
])
return "\n".join(system_prompt_parts)
async def _analyze_context_needs(self, user_input: str) -> Dict[str, Any]:
"""Analyze what type of context would be most helpful"""
analysis_prompt = f"""
Analyze this user input and determine what type of memory context would be most helpful:
User input: "{user_input}"
Consider:
1. Is this a factual question that needs knowledge context?
2. Is this continuing a previous conversation?
3. Is this a task that needs procedural memory?
4. Is this personal and needs conversational context?
Respond with JSON:
{{
"primary_need": "factual|conversational|procedural|personal",
"memory_types_needed": ["semantic", "episodic", "procedural", "working"],
"context_depth": "light|medium|deep",
"temporal_relevance": "recent|all_time|specific_period"
}}
"""
try:
response = await self.llm_client.chat_completion(
messages=[LLMMessage(role="user", content=analysis_prompt)],
model="gpt-3.5-turbo",
max_tokens=200
)
return json.loads(response.content)
except Exception:
# Fallback analysis
return {
"primary_need": "conversational",
"memory_types_needed": ["episodic", "semantic"],
"context_depth": "medium",
"temporal_relevance": "recent"
}
async def _get_targeted_memories(self, user_input: str,
context_analysis: Dict[str, Any]) -> List[Dict]:
"""Get memories targeted to the context analysis"""
memory_types = context_analysis.get('memory_types_needed', ['episodic'])
context_depth = context_analysis.get('context_depth', 'medium')
# Determine limit based on depth
limits = {"light": 5, "medium": 10, "deep": 20}
limit = limits.get(context_depth, 10)
memories = await self.memory_manager.recall(
query=user_input,
memory_types=memory_types,
limit=limit
)
return memories
def _choose_prompt_strategy(self, context_analysis: Dict[str, Any],
memory_count: int,
user_preferences: Dict[str, Any]) -> str:
"""Choose the best prompt strategy"""
primary_need = context_analysis.get('primary_need', 'conversational')
# User preference override
preferred_style = user_preferences.get('context_style')
if preferred_style in ['hierarchical', 'temporal', 'summarized']:
return preferred_style
# Strategy based on primary need and memory count
if primary_need == 'factual' and memory_count > 8:
return "hierarchical"
elif primary_need == 'conversational':
return "temporal"
else:
return "summarized"
def _format_user_preferences(self, preferences: Dict[str, Any]) -> str:
"""Format user preferences for inclusion in prompt"""
pref_parts = []
if preferences.get('communication_style'):
pref_parts.append(f"prefers {preferences['communication_style']} communication")
if preferences.get('detail_level'):
pref_parts.append(f"likes {preferences['detail_level']} detail level")
if preferences.get('topics_of_interest'):
topics = preferences['topics_of_interest']
if isinstance(topics, list) and topics:
pref_parts.append(f"interested in {', '.join(topics[:3])}")
return "; ".join(pref_parts)Streaming and Real-time Integration
Real-time Memory Updates
import asyncio
from typing import AsyncGenerator, Callable, Optional
import json
from datetime import datetime
class StreamingMemoryIntegration:
"""Real-time memory integration with streaming LLM responses"""
def __init__(self, llm_client: BaseLLMClient, memory_manager):
self.llm_client = llm_client
self.memory_manager = memory_manager
self.active_streams: Dict[str, Dict[str, Any]] = {}
async def streaming_chat_with_memory(self, user_input: str,
conversation_id: str,
model: str = "gpt-3.5-turbo",
on_token: Optional[Callable[[str], None]] = None,
on_memory_update: Optional[Callable[[Dict], None]] = None
) -> AsyncGenerator[str, None]:
"""Stream LLM response while updating memory in real-time"""
stream_id = f"stream_{conversation_id}_{int(time.time())}"
# Initialize stream context
self.active_streams[stream_id] = {
"user_input": user_input,
"conversation_id": conversation_id,
"response_parts": [],
"start_time": datetime.now(),
"metadata": {}
}
try:
# Store user input immediately
user_memory_id = await self.memory_manager.remember(
content=f"User: {user_input}",
memory_type="episodic",
importance=0.7,
metadata={
"conversation_id": conversation_id,
"role": "user",
"stream_id": stream_id
}
)
if on_memory_update:
on_memory_update({
"type": "user_memory_stored",
"memory_id": user_memory_id,
"content": user_input
})
# Get relevant memories for context
relevant_memories = await self.memory_manager.recall(
query=user_input,
limit=10
)
# Build streaming prompt
messages = await self._build_streaming_context(
user_input, relevant_memories, conversation_id
)
# Start streaming LLM response
response_parts = []
async for token in self._stream_llm_response(messages, model):
response_parts.append(token)
self.active_streams[stream_id]["response_parts"] = response_parts
if on_token:
on_token(token)
# Yield token to caller
yield token
# Periodically update working memory with partial response
if len(response_parts) % 50 == 0: # Every 50 tokens
partial_response = "".join(response_parts)
await self._update_working_memory(
stream_id, partial_response, conversation_id
)
# Store complete response
complete_response = "".join(response_parts)
assistant_memory_id = await self.memory_manager.remember(
content=f"Assistant: {complete_response}",
memory_type="episodic",
importance=0.7,
metadata={
"conversation_id": conversation_id,
"role": "assistant",
"stream_id": stream_id,
"response_length": len(complete_response)
}
)
if on_memory_update:
on_memory_update({
"type": "assistant_memory_stored",
"memory_id": assistant_memory_id,
"content": complete_response
})
# Extract knowledge asynchronously
asyncio.create_task(
self._extract_knowledge_async(
user_input, complete_response, conversation_id
)
)
finally:
# Cleanup stream context
if stream_id in self.active_streams:
del self.active_streams[stream_id]
async def _stream_llm_response(self, messages: List[LLMMessage],
model: str) -> AsyncGenerator[str, None]:
"""Stream tokens from LLM (implementation depends on provider)"""
# This is a simplified example - actual implementation depends on LLM provider
# For OpenAI, you'd use the streaming API
try:
# Convert messages for API call
api_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# Simulate streaming (replace with actual streaming API call)
async with self.llm_client.session.post(
f"{self.llm_client.base_url}/chat/completions",
json={
"model": model,
"messages": api_messages,
"stream": True
}
) as response:
async for line in response.content:
if line:
line_text = line.decode('utf-8').strip()
if line_text.startswith('data: '):
data_text = line_text[6:]
if data_text == '[DONE]':
break
try:
data = json.loads(data_text)
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
yield delta['content']
except json.JSONDecodeError:
continue
except Exception as e:
self.logger.error(f"Streaming failed: {e}")
# Fallback to non-streaming
response = await self.llm_client.chat_completion(messages, model)
yield response.content
async def _update_working_memory(self, stream_id: str,
partial_response: str,
conversation_id: str):
"""Update working memory with partial response"""
working_memory_id = f"working_{stream_id}"
await self.memory_manager.remember(
content=f"Partial response: {partial_response}",
memory_type="working",
importance=0.5,
metadata={
"conversation_id": conversation_id,
"stream_id": stream_id,
"is_partial": True,
"timestamp": datetime.now().isoformat()
},
memory_id=working_memory_id # Overwrite previous partial
)
async def _extract_knowledge_async(self, user_input: str,
response: str,
conversation_id: str):
"""Extract knowledge asynchronously without blocking stream"""
try:
# Use a simpler, faster model for extraction
extraction_prompt = f"""
Extract key facts or insights from this exchange (max 100 words):
User: {user_input}
Assistant: {response}
Focus on: facts, preferences, decisions, important information.
"""
extraction_response = await self.llm_client.chat_completion(
messages=[LLMMessage(role="user", content=extraction_prompt)],
model="gpt-3.5-turbo",
max_tokens=150
)
extracted = extraction_response.content.strip()
if extracted and len(extracted) > 10:
await self.memory_manager.remember(
content=extracted,
memory_type="semantic",
importance=0.8,
metadata={
"conversation_id": conversation_id,
"source": "async_extraction",
"extraction_timestamp": datetime.now().isoformat()
}
)
except Exception as e:
self.logger.error(f"Async knowledge extraction failed: {e}")
class MemoryWebSocketHandler:
"""WebSocket handler for real-time memory updates"""
def __init__(self, streaming_integration: StreamingMemoryIntegration):
self.streaming_integration = streaming_integration
self.active_connections: Dict[str, Any] = {}
async def handle_connection(self, websocket, user_id: str):
"""Handle WebSocket connection for real-time memory integration"""
self.active_connections[user_id] = websocket
try:
async for message in websocket:
data = json.loads(message)
if data.get("type") == "chat_message":
await self._handle_chat_message(
websocket, user_id, data
)
elif data.get("type") == "memory_query":
await self._handle_memory_query(
websocket, user_id, data
)
except Exception as e:
self.logger.error(f"WebSocket error for user {user_id}: {e}")
finally:
if user_id in self.active_connections:
del self.active_connections[user_id]
async def _handle_chat_message(self, websocket, user_id: str, data: Dict):
"""Handle streaming chat with memory"""
user_input = data.get("message", "")
conversation_id = data.get("conversation_id", f"conv_{user_id}")
# Send acknowledgment
await websocket.send(json.dumps({
"type": "message_received",
"conversation_id": conversation_id
}))
# Stream response with memory updates
async for token in self.streaming_integration.streaming_chat_with_memory(
user_input=user_input,
conversation_id=conversation_id,
on_token=lambda t: asyncio.create_task(
websocket.send(json.dumps({
"type": "token",
"content": t,
"conversation_id": conversation_id
}))
),
on_memory_update=lambda m: asyncio.create_task(
websocket.send(json.dumps({
"type": "memory_update",
**m,
"conversation_id": conversation_id
}))
)
):
pass # Tokens are sent via on_token callback
# Send completion signal
await websocket.send(json.dumps({
"type": "response_complete",
"conversation_id": conversation_id
}))
async def _handle_memory_query(self, websocket, user_id: str, data: Dict):
"""Handle memory queries via WebSocket"""
query = data.get("query", "")
memory_types = data.get("memory_types", ["episodic", "semantic"])
limit = data.get("limit", 10)
try:
memories = await self.streaming_integration.memory_manager.recall(
query=query,
memory_types=memory_types,
limit=limit
)
await websocket.send(json.dumps({
"type": "memory_results",
"query": query,
"memories": memories,
"count": len(memories)
}))
except Exception as e:
await websocket.send(json.dumps({
"type": "error",
"message": f"Memory query failed: {str(e)}"
}))This comprehensive LLM integration guide provides production-ready implementations for seamlessly connecting Large Language Models with agent memory systems, including streaming capabilities, adaptive prompting, and real-time memory updates.