Skip to Content

LLM Integration Patterns

Integrating Large Language Models with memory systems requires careful consideration of prompt engineering, context management, and processing pipelines. This guide provides production-ready implementations for seamless LLM-memory integration.

LLM Memory Interface

Universal LLM Client

from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional, AsyncGenerator, Union from dataclasses import dataclass from enum import Enum import asyncio import aiohttp import json import time from datetime import datetime class LLMProvider(Enum): OPENAI = "openai" ANTHROPIC = "anthropic" GOOGLE = "google" AZURE_OPENAI = "azure_openai" COHERE = "cohere" TOGETHER = "together" @dataclass class LLMMessage: """Standardized message format across LLM providers""" role: str # "system", "user", "assistant" content: str metadata: Optional[Dict[str, Any]] = None @dataclass class LLMResponse: """Standardized response format""" content: str model: str tokens_used: int cost_estimate: Optional[float] = None metadata: Optional[Dict[str, Any]] = None class BaseLLMClient(ABC): """Base class for LLM clients""" @abstractmethod async def chat_completion(self, messages: List[LLMMessage], model: str, **kwargs) -> LLMResponse: pass @abstractmethod async def embedding(self, text: str, model: str) -> List[float]: pass class OpenAIClient(BaseLLMClient): """OpenAI API client with memory integration""" def __init__(self, api_key: str, base_url: Optional[str] = None): self.api_key = api_key self.base_url = base_url or "https://api.openai.com/v1" self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession( headers={"Authorization": f"Bearer {self.api_key}"} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def chat_completion(self, messages: List[LLMMessage], model: str = "gpt-3.5-turbo", **kwargs) -> LLMResponse: """Send chat completion request""" # Convert messages to OpenAI format openai_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] payload = { "model": model, "messages": openai_messages, **kwargs } async with self.session.post( f"{self.base_url}/chat/completions", json=payload ) as response: if response.status != 200: error_text = await response.text() raise Exception(f"OpenAI API error: {error_text}") data = await response.json() return LLMResponse( content=data["choices"][0]["message"]["content"], model=data["model"], tokens_used=data["usage"]["total_tokens"], cost_estimate=self._estimate_cost(data["usage"], model), metadata={"finish_reason": data["choices"][0]["finish_reason"]} ) async def embedding(self, text: str, model: str = "text-embedding-ada-002") -> List[float]: """Generate text embedding""" payload = { "model": model, "input": text } async with self.session.post( f"{self.base_url}/embeddings", json=payload ) as response: if response.status != 200: error_text = await response.text() raise Exception(f"OpenAI embedding error: {error_text}") data = await response.json() return data["data"][0]["embedding"] def _estimate_cost(self, usage: Dict, model: str) -> float: """Estimate API cost based on token usage""" # Simplified cost estimation (update with current pricing) cost_per_1k_tokens = { "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}, "gpt-4": {"input": 0.03, "output": 0.06}, "gpt-4-turbo": {"input": 0.01, "output": 0.03} } if model in cost_per_1k_tokens: input_cost = (usage.get("prompt_tokens", 0) / 1000) * cost_per_1k_tokens[model]["input"] output_cost = (usage.get("completion_tokens", 0) / 1000) * cost_per_1k_tokens[model]["output"] return input_cost + output_cost return 0.0 class AnthropicClient(BaseLLMClient): """Anthropic Claude API client""" def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://api.anthropic.com/v1" self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession( headers={ "x-api-key": self.api_key, "anthropic-version": "2023-06-01" } ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def chat_completion(self, messages: List[LLMMessage], model: str = "claude-3-sonnet-20240229", **kwargs) -> LLMResponse: """Send message to Claude""" # Convert to Anthropic format system_messages = [msg.content for msg in messages if msg.role == "system"] conversation_messages = [ {"role": msg.role, "content": msg.content} for msg in messages if msg.role != "system" ] payload = { "model": model, "messages": conversation_messages, "max_tokens": kwargs.get("max_tokens", 1024), **{k: v for k, v in kwargs.items() if k != "max_tokens"} } if system_messages: payload["system"] = " ".join(system_messages) async with self.session.post( f"{self.base_url}/messages", json=payload ) as response: if response.status != 200: error_text = await response.text() raise Exception(f"Anthropic API error: {error_text}") data = await response.json() return LLMResponse( content=data["content"][0]["text"], model=data["model"], tokens_used=data["usage"]["input_tokens"] + data["usage"]["output_tokens"], metadata={ "stop_reason": data.get("stop_reason"), "input_tokens": data["usage"]["input_tokens"], "output_tokens": data["usage"]["output_tokens"] } ) async def embedding(self, text: str, model: str) -> List[float]: """Anthropic doesn't provide embeddings - use alternative""" raise NotImplementedError("Anthropic doesn't provide embedding endpoints") class LLMClientFactory: """Factory for creating LLM clients""" @staticmethod def create_client(provider: LLMProvider, **kwargs) -> BaseLLMClient: """Create appropriate LLM client""" if provider == LLMProvider.OPENAI: return OpenAIClient(**kwargs) elif provider == LLMProvider.ANTHROPIC: return AnthropicClient(**kwargs) else: raise ValueError(f"Unsupported provider: {provider}")

Memory-Enhanced LLM Wrapper

Intelligent Memory Integration

from typing import List, Dict, Any, Optional, Tuple import logging from datetime import datetime, timedelta class MemoryEnhancedLLM: """LLM wrapper with integrated memory system""" def __init__(self, llm_client: BaseLLMClient, memory_manager, embedding_client: Optional[BaseLLMClient] = None): self.llm_client = llm_client self.memory_manager = memory_manager self.embedding_client = embedding_client or llm_client self.logger = logging.getLogger(__name__) async def chat_with_memory(self, user_input: str, conversation_id: Optional[str] = None, memory_types: Optional[List[str]] = None, max_context_memories: int = 10, model: str = "gpt-3.5-turbo") -> LLMResponse: """Process user input with memory context""" # Store user input as episodic memory await self._store_interaction_memory( content=f"User: {user_input}", conversation_id=conversation_id, memory_type="episodic" ) # Retrieve relevant memories relevant_memories = await self._get_relevant_memories( query=user_input, memory_types=memory_types or ["episodic", "semantic"], limit=max_context_memories ) # Build context with memories messages = await self._build_message_context( user_input=user_input, relevant_memories=relevant_memories, conversation_id=conversation_id ) # Get LLM response response = await self.llm_client.chat_completion( messages=messages, model=model ) # Store assistant response as memory await self._store_interaction_memory( content=f"Assistant: {response.content}", conversation_id=conversation_id, memory_type="episodic" ) # Extract and store important information await self._extract_and_store_knowledge( user_input=user_input, assistant_response=response.content, conversation_id=conversation_id ) return response async def _store_interaction_memory(self, content: str, conversation_id: Optional[str] = None, memory_type: str = "episodic"): """Store interaction in memory""" metadata = { "interaction_timestamp": datetime.now().isoformat(), "memory_source": "conversation" } if conversation_id: metadata["conversation_id"] = conversation_id memory_id = await self.memory_manager.remember( content=content, memory_type=memory_type, importance=0.7, # Default importance for conversations metadata=metadata ) self.logger.info(f"Stored interaction memory: {memory_id}") return memory_id async def _get_relevant_memories(self, query: str, memory_types: List[str], limit: int) -> List[Dict[str, Any]]: """Retrieve memories relevant to the query""" memories = await self.memory_manager.recall( query=query, memory_types=memory_types, limit=limit ) return memories async def _build_message_context(self, user_input: str, relevant_memories: List[Dict[str, Any]], conversation_id: Optional[str] = None) -> List[LLMMessage]: """Build message context including relevant memories""" messages = [] # System message with memory context system_context = await self._build_system_context(relevant_memories) messages.append(LLMMessage(role="system", content=system_context)) # Recent conversation history if available if conversation_id: recent_history = await self._get_recent_conversation(conversation_id) messages.extend(recent_history) # Current user input messages.append(LLMMessage(role="user", content=user_input)) return messages async def _build_system_context(self, memories: List[Dict[str, Any]]) -> str: """Build system context from relevant memories""" if not memories: return "You are a helpful assistant with access to conversation memory." context_parts = [ "You are a helpful assistant with access to relevant memories from previous conversations.", "", "Relevant memories:" ] for memory in memories: timestamp = memory.get('timestamp', 'Unknown time') content = memory.get('content', '') importance = memory.get('importance', 0) context_parts.append( f"[{timestamp}] (importance: {importance:.2f}) {content}" ) context_parts.extend([ "", "Use this context to provide more personalized and informed responses. " "Reference relevant memories when appropriate, but don't overwhelm the user with details." ]) return "\n".join(context_parts) async def _get_recent_conversation(self, conversation_id: str, limit: int = 10) -> List[LLMMessage]: """Get recent conversation history""" # Search for recent messages in this conversation memories = await self.memory_manager.recall( query=f"conversation_id:{conversation_id}", memory_types=["episodic"], limit=limit * 2 # Get more to filter ) # Filter and sort by timestamp conversation_memories = [ m for m in memories if m.get('metadata', {}).get('conversation_id') == conversation_id ] conversation_memories.sort( key=lambda m: m.get('timestamp', datetime.min), reverse=False ) # Convert to messages messages = [] for memory in conversation_memories[-limit:]: content = memory.get('content', '') if content.startswith('User: '): messages.append(LLMMessage( role="user", content=content[6:] # Remove 'User: ' prefix )) elif content.startswith('Assistant: '): messages.append(LLMMessage( role="assistant", content=content[11:] # Remove 'Assistant: ' prefix )) return messages async def _extract_and_store_knowledge(self, user_input: str, assistant_response: str, conversation_id: Optional[str] = None): """Extract important knowledge and store as semantic memory""" # Use LLM to extract key information extraction_prompt = f""" Analyze this conversation exchange and extract any important facts, preferences, or knowledge that should be remembered for future interactions: User: {user_input} Assistant: {assistant_response} Extract: 1. Any personal preferences or information about the user 2. Important factual information discussed 3. Decisions or conclusions reached 4. Any domain-specific knowledge shared Format as bullet points, or respond with "None" if nothing significant to remember. """ try: extraction_response = await self.llm_client.chat_completion( messages=[LLMMessage(role="user", content=extraction_prompt)], model="gpt-3.5-turbo", max_tokens=200 ) extracted_knowledge = extraction_response.content.strip() if extracted_knowledge and extracted_knowledge.lower() != "none": # Store as semantic memory await self.memory_manager.remember( content=extracted_knowledge, memory_type="semantic", importance=0.8, metadata={ "source": "knowledge_extraction", "conversation_id": conversation_id, "extraction_timestamp": datetime.now().isoformat() } ) self.logger.info(f"Extracted and stored knowledge: {extracted_knowledge[:100]}...") except Exception as e: self.logger.error(f"Failed to extract knowledge: {e}") class MemoryProcessingPipeline: """Pipeline for processing and enhancing memories with LLM""" def __init__(self, llm_client: BaseLLMClient, memory_manager): self.llm_client = llm_client self.memory_manager = memory_manager self.logger = logging.getLogger(__name__) async def enhance_memory_content(self, memory_entry: Dict[str, Any]) -> Dict[str, Any]: """Enhance memory content with LLM analysis""" content = memory_entry.get('content', '') # Generate enhanced description enhancement_prompt = f""" Analyze this memory entry and provide: 1. A clear, concise summary 2. Key topics or themes 3. Emotional context (if applicable) 4. Importance assessment (1-10) 5. Suggested tags Memory content: {content} Respond in JSON format: {{ "summary": "...", "topics": ["topic1", "topic2"], "emotional_context": "neutral/positive/negative/excited/etc", "importance": 7, "suggested_tags": ["tag1", "tag2"] }} """ try: response = await self.llm_client.chat_completion( messages=[LLMMessage(role="user", content=enhancement_prompt)], model="gpt-3.5-turbo", max_tokens=300 ) # Parse JSON response enhancement_data = json.loads(response.content) # Update memory entry enhanced_memory = memory_entry.copy() enhanced_memory['metadata'] = enhanced_memory.get('metadata', {}) enhanced_memory['metadata'].update({ 'ai_summary': enhancement_data.get('summary'), 'ai_topics': enhancement_data.get('topics', []), 'ai_emotional_context': enhancement_data.get('emotional_context'), 'ai_importance': enhancement_data.get('importance'), 'enhancement_timestamp': datetime.now().isoformat() }) # Update importance if AI suggests higher importance ai_importance = enhancement_data.get('importance', 0) / 10.0 if ai_importance > enhanced_memory.get('importance', 0): enhanced_memory['importance'] = ai_importance # Add suggested tags suggested_tags = enhancement_data.get('suggested_tags', []) existing_tags = enhanced_memory.get('tags', []) enhanced_memory['tags'] = list(set(existing_tags + suggested_tags)) return enhanced_memory except Exception as e: self.logger.error(f"Failed to enhance memory: {e}") return memory_entry async def consolidate_memories(self, memories: List[Dict[str, Any]]) -> str: """Consolidate multiple related memories into semantic knowledge""" if len(memories) < 2: return "" # Prepare memories for consolidation memory_summaries = [] for i, memory in enumerate(memories): timestamp = memory.get('timestamp', 'Unknown') content = memory.get('content', '') importance = memory.get('importance', 0) memory_summaries.append( f"{i+1}. [{timestamp}] (importance: {importance:.2f}) {content}" ) consolidation_prompt = f""" Consolidate these related memories into a coherent summary that captures: 1. The main patterns or themes 2. Key facts or insights 3. Any progression or development over time 4. Important relationships between the memories Memories to consolidate: {chr(10).join(memory_summaries)} Provide a clear, informative summary that would be useful for future reference. """ try: response = await self.llm_client.chat_completion( messages=[LLMMessage(role="user", content=consolidation_prompt)], model="gpt-3.5-turbo", max_tokens=500 ) return response.content.strip() except Exception as e: self.logger.error(f"Failed to consolidate memories: {e}") return "" async def generate_insights(self, user_id: str, time_period: timedelta = timedelta(days=7)) -> List[str]: """Generate insights from user's recent memories""" # Get recent memories cutoff_date = datetime.now() - time_period memories = await self.memory_manager.get_memories_since(user_id, cutoff_date) if not memories: return [] # Prepare memory data memory_data = [] for memory in memories: memory_data.append(f"- {memory.get('content', '')}") insight_prompt = f""" Analyze these memories from the past {time_period.days} days and generate insights about: 1. Patterns in user interests or behavior 2. Recurring themes or topics 3. Changes in sentiment or priorities 4. Areas where the user might benefit from reminders or suggestions Memories: {chr(10).join(memory_data)} Provide 3-5 actionable insights as a JSON array of strings. """ try: response = await self.llm_client.chat_completion( messages=[LLMMessage(role="user", content=insight_prompt)], model="gpt-3.5-turbo", max_tokens=400 ) insights = json.loads(response.content) return insights if isinstance(insights, list) else [] except Exception as e: self.logger.error(f"Failed to generate insights: {e}") return []

Prompt Engineering for Memory

Memory-Aware Prompt Templates

from typing import Dict, List, Any, Optional from jinja2 import Environment, BaseLoader from datetime import datetime class MemoryPromptTemplate: """Template system for memory-aware prompts""" def __init__(self): self.env = Environment(loader=BaseLoader()) self._register_filters() def _register_filters(self): """Register custom filters for memory processing""" @self.env.filters.register def format_timestamp(timestamp): """Format timestamp for human reading""" if isinstance(timestamp, str): timestamp = datetime.fromisoformat(timestamp) return timestamp.strftime("%Y-%m-%d %H:%M") @self.env.filters.register def importance_level(importance): """Convert importance score to descriptive level""" if importance >= 0.8: return "High" elif importance >= 0.6: return "Medium" else: return "Low" @self.env.filters.register def truncate_content(content, max_length=200): """Truncate content with ellipsis""" if len(content) <= max_length: return content return content[:max_length-3] + "..." def render_memory_context(self, memories: List[Dict[str, Any]], context_type: str = "general") -> str: """Render memory context for prompts""" if context_type == "conversational": template_str = """ Based on our previous conversations: {% for memory in memories %} [{{ memory.timestamp | format_timestamp }}] {{ memory.content | truncate_content }} {% if memory.importance %}({{ memory.importance | importance_level }} importance){% endif %} {% endfor %} Please continue our conversation with this context in mind. """ elif context_type == "factual": template_str = """ Relevant information from your knowledge base: {% for memory in memories %} • {{ memory.content | truncate_content }} {% if memory.metadata and memory.metadata.source %} Source: {{ memory.metadata.source }} {% endif %} {% endfor %} Use this information to inform your response. """ elif context_type == "task_oriented": template_str = """ Previous work and context: {% for memory in memories %} {% if memory.memory_type == 'procedural' %} 📋 Process: {{ memory.content | truncate_content }} {% elif memory.memory_type == 'episodic' %} 📝 Previous action: {{ memory.content | truncate_content }} {% else %} 💡 Knowledge: {{ memory.content | truncate_content }} {% endif %} {% endfor %} Continue working on this task with the above context. """ else: # general template_str = """ Relevant memories: {% for memory in memories %} [{{ memory.timestamp | format_timestamp }}] {{ memory.content | truncate_content }} ({{ memory.memory_type }} memory, {{ memory.importance | importance_level }} importance) {% endfor %} """ template = self.env.from_string(template_str) return template.render(memories=memories).strip() class PromptStrategy: """Different strategies for incorporating memory into prompts""" @staticmethod def summarized_context(memories: List[Dict[str, Any]], max_context_length: int = 500) -> str: """Create a summarized context from memories""" if not memories: return "" # Sort by importance sorted_memories = sorted(memories, key=lambda m: m.get('importance', 0), reverse=True) context_parts = [] current_length = 0 for memory in sorted_memories: content = memory.get('content', '') addition = f"• {content}\n" if current_length + len(addition) > max_context_length: if context_parts: # Only break if we have some context break else: # If even the first memory is too long, truncate it remaining = max_context_length - current_length - 3 addition = f"• {content[:remaining]}...\n" context_parts.append(addition) current_length += len(addition) return "".join(context_parts).strip() @staticmethod def hierarchical_context(memories: List[Dict[str, Any]]) -> str: """Organize memories hierarchically by type and importance""" if not memories: return "" # Group by memory type by_type = {} for memory in memories: memory_type = memory.get('memory_type', 'unknown') if memory_type not in by_type: by_type[memory_type] = [] by_type[memory_type].append(memory) # Sort each type by importance for memory_type in by_type: by_type[memory_type].sort( key=lambda m: m.get('importance', 0), reverse=True ) # Build hierarchical context context_parts = [] type_labels = { 'semantic': 'Knowledge & Facts', 'episodic': 'Previous Conversations', 'procedural': 'Processes & Methods', 'working': 'Current Context' } for memory_type, memories_list in by_type.items(): if not memories_list: continue label = type_labels.get(memory_type, memory_type.title()) context_parts.append(f"{label}:") for memory in memories_list[:5]: # Limit per type content = memory.get('content', '') timestamp = memory.get('timestamp', '') importance = memory.get('importance', 0) if len(content) > 150: content = content[:147] + "..." context_parts.append(f" • {content}") if timestamp and importance: context_parts.append(f" ({timestamp} | importance: {importance:.1f})") context_parts.append("") # Empty line between types return "\n".join(context_parts).strip() @staticmethod def temporal_context(memories: List[Dict[str, Any]], include_timeline: bool = True) -> str: """Create context emphasizing temporal relationships""" if not memories: return "" # Sort by timestamp sorted_memories = sorted( [m for m in memories if m.get('timestamp')], key=lambda m: m.get('timestamp'), reverse=True ) if not sorted_memories: return PromptStrategy.summarized_context(memories) context_parts = [] if include_timeline: context_parts.append("Timeline of relevant memories:") else: context_parts.append("Recent relevant context:") current_date = None for memory in sorted_memories: timestamp_str = memory.get('timestamp', '') content = memory.get('content', '') try: if isinstance(timestamp_str, str): timestamp = datetime.fromisoformat(timestamp_str) else: timestamp = timestamp_str memory_date = timestamp.strftime("%Y-%m-%d") if include_timeline and memory_date != current_date: if current_date is not None: context_parts.append("") context_parts.append(f"--- {memory_date} ---") current_date = memory_date time_str = timestamp.strftime("%H:%M") truncated_content = content[:200] + "..." if len(content) > 200 else content context_parts.append(f"{time_str}: {truncated_content}") except Exception: # Fallback for bad timestamps context_parts.append(f"• {content}") return "\n".join(context_parts) class AdaptivePromptBuilder: """Builds prompts that adapt to context and user patterns""" def __init__(self, memory_manager, llm_client): self.memory_manager = memory_manager self.llm_client = llm_client self.template_engine = MemoryPromptTemplate() async def build_adaptive_prompt(self, user_input: str, conversation_history: List[Dict], user_preferences: Dict[str, Any]) -> str: """Build prompt adapted to user and context""" # Analyze user input to determine best memory strategy context_analysis = await self._analyze_context_needs(user_input) # Get relevant memories based on analysis memories = await self._get_targeted_memories( user_input, context_analysis ) # Choose prompt strategy strategy = self._choose_prompt_strategy( context_analysis, len(memories), user_preferences ) # Build context based on strategy if strategy == "hierarchical": memory_context = PromptStrategy.hierarchical_context(memories) elif strategy == "temporal": memory_context = PromptStrategy.temporal_context(memories) else: # summarized memory_context = PromptStrategy.summarized_context(memories) # Build final prompt system_prompt_parts = [] # Base instruction system_prompt_parts.append( "You are a helpful assistant with access to conversation memory. " "Provide thoughtful, context-aware responses." ) # Memory context if memory_context: system_prompt_parts.extend([ "", memory_context, "", "Use this context to provide more personalized and informed responses." ]) # User preferences if user_preferences: prefs_text = self._format_user_preferences(user_preferences) if prefs_text: system_prompt_parts.extend([ "", f"User preferences: {prefs_text}" ]) return "\n".join(system_prompt_parts) async def _analyze_context_needs(self, user_input: str) -> Dict[str, Any]: """Analyze what type of context would be most helpful""" analysis_prompt = f""" Analyze this user input and determine what type of memory context would be most helpful: User input: "{user_input}" Consider: 1. Is this a factual question that needs knowledge context? 2. Is this continuing a previous conversation? 3. Is this a task that needs procedural memory? 4. Is this personal and needs conversational context? Respond with JSON: {{ "primary_need": "factual|conversational|procedural|personal", "memory_types_needed": ["semantic", "episodic", "procedural", "working"], "context_depth": "light|medium|deep", "temporal_relevance": "recent|all_time|specific_period" }} """ try: response = await self.llm_client.chat_completion( messages=[LLMMessage(role="user", content=analysis_prompt)], model="gpt-3.5-turbo", max_tokens=200 ) return json.loads(response.content) except Exception: # Fallback analysis return { "primary_need": "conversational", "memory_types_needed": ["episodic", "semantic"], "context_depth": "medium", "temporal_relevance": "recent" } async def _get_targeted_memories(self, user_input: str, context_analysis: Dict[str, Any]) -> List[Dict]: """Get memories targeted to the context analysis""" memory_types = context_analysis.get('memory_types_needed', ['episodic']) context_depth = context_analysis.get('context_depth', 'medium') # Determine limit based on depth limits = {"light": 5, "medium": 10, "deep": 20} limit = limits.get(context_depth, 10) memories = await self.memory_manager.recall( query=user_input, memory_types=memory_types, limit=limit ) return memories def _choose_prompt_strategy(self, context_analysis: Dict[str, Any], memory_count: int, user_preferences: Dict[str, Any]) -> str: """Choose the best prompt strategy""" primary_need = context_analysis.get('primary_need', 'conversational') # User preference override preferred_style = user_preferences.get('context_style') if preferred_style in ['hierarchical', 'temporal', 'summarized']: return preferred_style # Strategy based on primary need and memory count if primary_need == 'factual' and memory_count > 8: return "hierarchical" elif primary_need == 'conversational': return "temporal" else: return "summarized" def _format_user_preferences(self, preferences: Dict[str, Any]) -> str: """Format user preferences for inclusion in prompt""" pref_parts = [] if preferences.get('communication_style'): pref_parts.append(f"prefers {preferences['communication_style']} communication") if preferences.get('detail_level'): pref_parts.append(f"likes {preferences['detail_level']} detail level") if preferences.get('topics_of_interest'): topics = preferences['topics_of_interest'] if isinstance(topics, list) and topics: pref_parts.append(f"interested in {', '.join(topics[:3])}") return "; ".join(pref_parts)

Streaming and Real-time Integration

Real-time Memory Updates

import asyncio from typing import AsyncGenerator, Callable, Optional import json from datetime import datetime class StreamingMemoryIntegration: """Real-time memory integration with streaming LLM responses""" def __init__(self, llm_client: BaseLLMClient, memory_manager): self.llm_client = llm_client self.memory_manager = memory_manager self.active_streams: Dict[str, Dict[str, Any]] = {} async def streaming_chat_with_memory(self, user_input: str, conversation_id: str, model: str = "gpt-3.5-turbo", on_token: Optional[Callable[[str], None]] = None, on_memory_update: Optional[Callable[[Dict], None]] = None ) -> AsyncGenerator[str, None]: """Stream LLM response while updating memory in real-time""" stream_id = f"stream_{conversation_id}_{int(time.time())}" # Initialize stream context self.active_streams[stream_id] = { "user_input": user_input, "conversation_id": conversation_id, "response_parts": [], "start_time": datetime.now(), "metadata": {} } try: # Store user input immediately user_memory_id = await self.memory_manager.remember( content=f"User: {user_input}", memory_type="episodic", importance=0.7, metadata={ "conversation_id": conversation_id, "role": "user", "stream_id": stream_id } ) if on_memory_update: on_memory_update({ "type": "user_memory_stored", "memory_id": user_memory_id, "content": user_input }) # Get relevant memories for context relevant_memories = await self.memory_manager.recall( query=user_input, limit=10 ) # Build streaming prompt messages = await self._build_streaming_context( user_input, relevant_memories, conversation_id ) # Start streaming LLM response response_parts = [] async for token in self._stream_llm_response(messages, model): response_parts.append(token) self.active_streams[stream_id]["response_parts"] = response_parts if on_token: on_token(token) # Yield token to caller yield token # Periodically update working memory with partial response if len(response_parts) % 50 == 0: # Every 50 tokens partial_response = "".join(response_parts) await self._update_working_memory( stream_id, partial_response, conversation_id ) # Store complete response complete_response = "".join(response_parts) assistant_memory_id = await self.memory_manager.remember( content=f"Assistant: {complete_response}", memory_type="episodic", importance=0.7, metadata={ "conversation_id": conversation_id, "role": "assistant", "stream_id": stream_id, "response_length": len(complete_response) } ) if on_memory_update: on_memory_update({ "type": "assistant_memory_stored", "memory_id": assistant_memory_id, "content": complete_response }) # Extract knowledge asynchronously asyncio.create_task( self._extract_knowledge_async( user_input, complete_response, conversation_id ) ) finally: # Cleanup stream context if stream_id in self.active_streams: del self.active_streams[stream_id] async def _stream_llm_response(self, messages: List[LLMMessage], model: str) -> AsyncGenerator[str, None]: """Stream tokens from LLM (implementation depends on provider)""" # This is a simplified example - actual implementation depends on LLM provider # For OpenAI, you'd use the streaming API try: # Convert messages for API call api_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] # Simulate streaming (replace with actual streaming API call) async with self.llm_client.session.post( f"{self.llm_client.base_url}/chat/completions", json={ "model": model, "messages": api_messages, "stream": True } ) as response: async for line in response.content: if line: line_text = line.decode('utf-8').strip() if line_text.startswith('data: '): data_text = line_text[6:] if data_text == '[DONE]': break try: data = json.loads(data_text) if 'choices' in data and len(data['choices']) > 0: delta = data['choices'][0].get('delta', {}) if 'content' in delta: yield delta['content'] except json.JSONDecodeError: continue except Exception as e: self.logger.error(f"Streaming failed: {e}") # Fallback to non-streaming response = await self.llm_client.chat_completion(messages, model) yield response.content async def _update_working_memory(self, stream_id: str, partial_response: str, conversation_id: str): """Update working memory with partial response""" working_memory_id = f"working_{stream_id}" await self.memory_manager.remember( content=f"Partial response: {partial_response}", memory_type="working", importance=0.5, metadata={ "conversation_id": conversation_id, "stream_id": stream_id, "is_partial": True, "timestamp": datetime.now().isoformat() }, memory_id=working_memory_id # Overwrite previous partial ) async def _extract_knowledge_async(self, user_input: str, response: str, conversation_id: str): """Extract knowledge asynchronously without blocking stream""" try: # Use a simpler, faster model for extraction extraction_prompt = f""" Extract key facts or insights from this exchange (max 100 words): User: {user_input} Assistant: {response} Focus on: facts, preferences, decisions, important information. """ extraction_response = await self.llm_client.chat_completion( messages=[LLMMessage(role="user", content=extraction_prompt)], model="gpt-3.5-turbo", max_tokens=150 ) extracted = extraction_response.content.strip() if extracted and len(extracted) > 10: await self.memory_manager.remember( content=extracted, memory_type="semantic", importance=0.8, metadata={ "conversation_id": conversation_id, "source": "async_extraction", "extraction_timestamp": datetime.now().isoformat() } ) except Exception as e: self.logger.error(f"Async knowledge extraction failed: {e}") class MemoryWebSocketHandler: """WebSocket handler for real-time memory updates""" def __init__(self, streaming_integration: StreamingMemoryIntegration): self.streaming_integration = streaming_integration self.active_connections: Dict[str, Any] = {} async def handle_connection(self, websocket, user_id: str): """Handle WebSocket connection for real-time memory integration""" self.active_connections[user_id] = websocket try: async for message in websocket: data = json.loads(message) if data.get("type") == "chat_message": await self._handle_chat_message( websocket, user_id, data ) elif data.get("type") == "memory_query": await self._handle_memory_query( websocket, user_id, data ) except Exception as e: self.logger.error(f"WebSocket error for user {user_id}: {e}") finally: if user_id in self.active_connections: del self.active_connections[user_id] async def _handle_chat_message(self, websocket, user_id: str, data: Dict): """Handle streaming chat with memory""" user_input = data.get("message", "") conversation_id = data.get("conversation_id", f"conv_{user_id}") # Send acknowledgment await websocket.send(json.dumps({ "type": "message_received", "conversation_id": conversation_id })) # Stream response with memory updates async for token in self.streaming_integration.streaming_chat_with_memory( user_input=user_input, conversation_id=conversation_id, on_token=lambda t: asyncio.create_task( websocket.send(json.dumps({ "type": "token", "content": t, "conversation_id": conversation_id })) ), on_memory_update=lambda m: asyncio.create_task( websocket.send(json.dumps({ "type": "memory_update", **m, "conversation_id": conversation_id })) ) ): pass # Tokens are sent via on_token callback # Send completion signal await websocket.send(json.dumps({ "type": "response_complete", "conversation_id": conversation_id })) async def _handle_memory_query(self, websocket, user_id: str, data: Dict): """Handle memory queries via WebSocket""" query = data.get("query", "") memory_types = data.get("memory_types", ["episodic", "semantic"]) limit = data.get("limit", 10) try: memories = await self.streaming_integration.memory_manager.recall( query=query, memory_types=memory_types, limit=limit ) await websocket.send(json.dumps({ "type": "memory_results", "query": query, "memories": memories, "count": len(memories) })) except Exception as e: await websocket.send(json.dumps({ "type": "error", "message": f"Memory query failed: {str(e)}" }))

This comprehensive LLM integration guide provides production-ready implementations for seamlessly connecting Large Language Models with agent memory systems, including streaming capabilities, adaptive prompting, and real-time memory updates.