Skip to Content

Entity Resolution & Disambiguation

Entity resolution is the process of identifying when different references point to the same real-world entity, while disambiguation determines which specific entity is being referenced when multiple possibilities exist. In agent memory systems, these capabilities are essential for maintaining coherent knowledge representations and enabling sophisticated reasoning about people, places, concepts, and objects across conversations.

Understanding Entity Resolution

What Are Entities?

Person Entities

  • Individual people (users, colleagues, family members)
  • Roles and titles (manager, doctor, customer service rep)
  • Groups and organizations (teams, companies, communities)
  • Fictional characters in discussions

Object Entities

  • Physical items (documents, devices, vehicles)
  • Digital assets (files, applications, accounts)
  • Products and services (software tools, subscriptions)
  • Abstract concepts (projects, ideas, methodologies)

Location Entities

  • Geographic locations (cities, countries, addresses)
  • Virtual spaces (websites, platforms, repositories)
  • Organizational spaces (offices, departments, rooms)
  • Conceptual spaces (markets, domains, fields)

Temporal Entities

  • Events and meetings
  • Deadlines and milestones
  • Recurring schedules
  • Historical periods

The Resolution Challenge

Same Entity, Different References

"John" → "John Smith" → "the project manager" → "my boss" → "JS"

Different Entities, Similar References

"Apple" → Apple Inc. vs. the fruit vs. Apple Records "Python" → programming language vs. snake vs. Monty Python "Jordan" → Michael Jordan vs. country vs. river vs. person named Jordan

Context-Dependent Resolution

"He said the meeting is at 3" → Who is "he"? Which meeting? 3 AM or PM? "Can you send it to me?" → What is "it"? Who is requesting? "The API isn't working" → Which API? Which specific problem?

Core Resolution Techniques

Technique 1: Coreference Resolution

Pronoun Resolution

def resolve_pronouns(text, entity_context): pronouns = extract_pronouns(text) resolved = {} for pronoun in pronouns: # Find candidate entities from context candidates = find_candidate_entities(pronoun, entity_context) # Score candidates based on various features scores = {} for candidate in candidates: score = 0 score += distance_score(pronoun.position, candidate.last_mention) score += gender_agreement_score(pronoun, candidate) score += number_agreement_score(pronoun, candidate) score += salience_score(candidate, entity_context) scores[candidate] = score # Select highest scoring candidate best_match = max(scores.items(), key=lambda x: x[1]) resolved[pronoun] = best_match[0] return resolved

Definite Description Resolution

def resolve_definite_descriptions(text, knowledge_base): descriptions = extract_definite_descriptions(text) # "the project", "the API" resolved = {} for desc in descriptions: # Extract descriptive features features = extract_features(desc) # noun, modifiers, context # Query knowledge base for matching entities candidates = knowledge_base.query_by_features(features) # Rank by specificity and context relevance scored_candidates = [] for candidate in candidates: specificity = calculate_specificity_match(features, candidate) relevance = calculate_context_relevance(desc, candidate) score = combine_scores(specificity, relevance) scored_candidates.append((candidate, score)) # Select best match above confidence threshold if scored_candidates: best_candidate, score = max(scored_candidates, key=lambda x: x[1]) if score > CONFIDENCE_THRESHOLD: resolved[desc] = best_candidate return resolved

Technique 2: Named Entity Linking

Entity Mention Detection

def detect_entity_mentions(text): mentions = [] # Use named entity recognition ner_entities = ner_model.extract(text) mentions.extend(ner_entities) # Pattern-based extraction for domain-specific entities custom_patterns = [ r'(?i)project\s+(\w+)', # "Project Alpha" r'(?i)version\s+([\d.]+)', # "version 2.1.3" r'(?i)ticket\s+#?(\d+)', # "ticket #1234" ] for pattern in custom_patterns: pattern_matches = re.finditer(pattern, text) mentions.extend(pattern_matches) # Noun phrase extraction for potential entities noun_phrases = extract_noun_phrases(text) mentions.extend(noun_phrases) return mentions

Entity Linking Pipeline

def link_entities(mentions, knowledge_base): linked_entities = [] for mention in mentions: # Generate candidate entities candidates = generate_candidates(mention, knowledge_base) # Feature extraction for linking features = extract_linking_features(mention, candidates) # Ranking and selection ranked_candidates = rank_candidates(candidates, features) # Apply confidence threshold if ranked_candidates and ranked_candidates[0].score > LINKING_THRESHOLD: linked_entities.append({ 'mention': mention, 'entity': ranked_candidates[0].entity, 'confidence': ranked_candidates[0].score }) return linked_entities def extract_linking_features(mention, candidates): features = {} for candidate in candidates: features[candidate] = { 'string_similarity': calculate_string_similarity(mention.text, candidate.name), 'context_similarity': calculate_context_similarity(mention.context, candidate.description), 'entity_popularity': candidate.popularity_score, 'type_compatibility': check_type_compatibility(mention.type, candidate.type), 'temporal_relevance': calculate_temporal_relevance(mention.timestamp, candidate.last_updated) } return features

Technique 3: Cross-Reference Resolution

Multi-Modal Entity Matching

def resolve_cross_references(entity_mentions, conversation_context): entity_groups = {} for mention in entity_mentions: # Find potential matches in existing groups matched_group = None best_score = 0 for group_id, group_entities in entity_groups.items(): score = calculate_group_match_score(mention, group_entities, conversation_context) if score > best_score and score > MATCH_THRESHOLD: matched_group = group_id best_score = score if matched_group: # Add to existing group entity_groups[matched_group].append(mention) else: # Create new group new_group_id = generate_group_id() entity_groups[new_group_id] = [mention] return entity_groups def calculate_group_match_score(mention, group_entities, context): scores = [] for entity in group_entities: # String similarity string_score = fuzzy_string_match(mention.text, entity.text) # Semantic similarity semantic_score = semantic_similarity(mention.embedding, entity.embedding) # Context compatibility context_score = context_compatibility(mention, entity, context) # Temporal proximity temporal_score = temporal_proximity(mention.timestamp, entity.timestamp) # Combine scores combined_score = weighted_average([ (string_score, 0.3), (semantic_score, 0.3), (context_score, 0.25), (temporal_score, 0.15) ]) scores.append(combined_score) return max(scores) if scores else 0

Advanced Resolution Strategies

Strategy 1: Probabilistic Entity Resolution

Bayesian Entity Matching

class BayesianEntityResolver: def __init__(self): self.prior_probabilities = {} self.feature_likelihoods = {} def calculate_match_probability(self, mention1, mention2, features): # P(match | features) ∝ P(features | match) * P(match) # Prior probability of any two mentions being the same entity prior = self.get_prior_probability(mention1, mention2) # Likelihood of observing features given they are the same entity likelihood = 1.0 for feature, value in features.items(): feature_likelihood = self.get_feature_likelihood(feature, value, match=True) likelihood *= feature_likelihood # Evidence (normalization factor) evidence = self.calculate_evidence(features) posterior = (likelihood * prior) / evidence return posterior def get_prior_probability(self, mention1, mention2): # Base rate depends on entity types and context if mention1.type == mention2.type: return self.prior_probabilities.get((mention1.type, mention2.type), 0.1) return 0.01 # Low prior for different types def update_probabilities(self, training_data): # Update priors and likelihoods based on labeled examples for example in training_data: self._update_prior(example.mention1, example.mention2, example.is_match) self._update_feature_likelihoods(example.features, example.is_match)

Uncertainty Handling

def handle_resolution_uncertainty(candidates, confidence_scores): # When confidence is low, maintain multiple hypotheses if max(confidence_scores) < HIGH_CONFIDENCE_THRESHOLD: # Return top N candidates with probabilities return { 'resolution_type': 'ambiguous', 'candidates': [ {'entity': candidate, 'probability': score} for candidate, score in zip(candidates, confidence_scores) if score > MIN_PROBABILITY_THRESHOLD ], 'requires_clarification': True } else: # High confidence resolution best_idx = np.argmax(confidence_scores) return { 'resolution_type': 'resolved', 'entity': candidates[best_idx], 'confidence': confidence_scores[best_idx], 'requires_clarification': False }

Strategy 2: Graph-Based Entity Resolution

Entity Relationship Graphs

class EntityRelationshipGraph: def __init__(self): self.entities = {} self.relationships = {} def add_entity(self, entity_id, entity_data): self.entities[entity_id] = entity_data def add_relationship(self, entity1_id, entity2_id, relationship_type, strength): if entity1_id not in self.relationships: self.relationships[entity1_id] = {} self.relationships[entity1_id][entity2_id] = { 'type': relationship_type, 'strength': strength } def resolve_entity_with_graph(self, mention, context): # Find candidate entities candidates = self.find_candidate_entities(mention) # Use graph structure to improve resolution for candidate in candidates: # Calculate relationship-based score relationship_score = self.calculate_relationship_score( candidate, context.recent_entities ) # Combine with other features candidate.total_score = combine_scores( candidate.similarity_score, relationship_score, candidate.frequency_score ) # Return highest scoring candidate return max(candidates, key=lambda c: c.total_score) def calculate_relationship_score(self, candidate, context_entities): score = 0 for context_entity in context_entities: if candidate.id in self.relationships.get(context_entity.id, {}): relationship = self.relationships[context_entity.id][candidate.id] score += relationship['strength'] * RELATIONSHIP_WEIGHT return score

Graph-Based Clustering

def cluster_entities_by_graph(entity_mentions, relationship_graph): # Create similarity graph similarity_graph = create_similarity_graph(entity_mentions) # Add relationship-based edges for mention1 in entity_mentions: for mention2 in entity_mentions: if mention1 != mention2: relationship_strength = relationship_graph.get_strength(mention1, mention2) if relationship_strength > RELATIONSHIP_THRESHOLD: similarity_graph.add_edge(mention1, mention2, weight=relationship_strength) # Apply graph clustering algorithm clusters = apply_graph_clustering(similarity_graph) # Convert clusters to entity groups entity_groups = {} for cluster_id, cluster_nodes in clusters.items(): entity_groups[cluster_id] = { 'mentions': cluster_nodes, 'canonical_entity': select_canonical_entity(cluster_nodes), 'confidence': calculate_cluster_confidence(cluster_nodes, similarity_graph) } return entity_groups

Strategy 3: Contextual Disambiguation

Context-Aware Resolution

class ContextualEntityResolver: def __init__(self): self.context_models = {} def resolve_with_context(self, mention, conversation_context): # Extract contextual features context_features = self.extract_context_features(mention, conversation_context) # Get domain-specific context model domain = self.detect_domain(conversation_context) context_model = self.context_models.get(domain, self.default_model) # Score candidates using contextual model candidates = self.get_candidate_entities(mention) scored_candidates = [] for candidate in candidates: context_score = context_model.score_candidate(candidate, context_features) base_score = self.calculate_base_score(mention, candidate) total_score = combine_scores(base_score, context_score) scored_candidates.append((candidate, total_score)) return max(scored_candidates, key=lambda x: x[1]) def extract_context_features(self, mention, conversation_context): features = { 'surrounding_entities': self.get_surrounding_entities(mention, conversation_context), 'topic_context': self.extract_topics(conversation_context), 'temporal_context': self.extract_temporal_context(conversation_context), 'user_context': self.extract_user_context(conversation_context), 'task_context': self.extract_task_context(conversation_context) } return features

Multi-Turn Resolution

def resolve_across_turns(mention, conversation_history): # Track entity mentions across conversation turns historical_mentions = extract_historical_mentions(conversation_history) # Build entity continuity graph continuity_graph = build_continuity_graph(historical_mentions) # Find potential antecedents for current mention antecedent_candidates = find_antecedent_candidates( mention, continuity_graph, conversation_history ) # Score candidates based on recency, salience, and compatibility scored_antecedents = [] for candidate in antecedent_candidates: score = calculate_antecedent_score(mention, candidate, conversation_history) scored_antecedents.append((candidate, score)) # Return best antecedent if confidence is high enough if scored_antecedents: best_antecedent, score = max(scored_antecedents, key=lambda x: x[1]) if score > ANTECEDENT_CONFIDENCE_THRESHOLD: return best_antecedent return None # No confident resolution found def calculate_antecedent_score(mention, candidate, conversation_history): # Distance-based decay distance = calculate_mention_distance(mention, candidate, conversation_history) recency_score = math.exp(-distance / RECENCY_DECAY_RATE) # Salience based on mention frequency and importance salience_score = calculate_mention_salience(candidate, conversation_history) # Type and gender agreement agreement_score = calculate_agreement_score(mention, candidate) # Semantic compatibility semantic_score = calculate_semantic_compatibility(mention, candidate) return weighted_average([ (recency_score, 0.3), (salience_score, 0.25), (agreement_score, 0.25), (semantic_score, 0.2) ])

Disambiguation Strategies

Strategy 1: Interactive Clarification

Clarification Request Generation

def generate_clarification_request(ambiguous_mention, candidates): # Determine the type of ambiguity ambiguity_type = classify_ambiguity(ambiguous_mention, candidates) if ambiguity_type == 'person_ambiguity': return generate_person_clarification(ambiguous_mention, candidates) elif ambiguity_type == 'object_ambiguity': return generate_object_clarification(ambiguous_mention, candidates) elif ambiguity_type == 'temporal_ambiguity': return generate_temporal_clarification(ambiguous_mention, candidates) else: return generate_generic_clarification(ambiguous_mention, candidates) def generate_person_clarification(mention, candidates): # Create distinguishing questions distinguishing_features = find_distinguishing_features(candidates) if 'role' in distinguishing_features: roles = [candidate.role for candidate in candidates] return f"Which {mention.text} do you mean? The {' or the '.join(roles)}?" elif 'location' in distinguishing_features: locations = [candidate.location for candidate in candidates] return f"Which {mention.text}? The one in {' or '.join(locations)}?" else: # Fallback to listing options options = [candidate.description for candidate in candidates] return f"I found multiple people named {mention.text}. Do you mean: {format_options(options)}?"

Adaptive Clarification Strategy

class AdaptiveClarificationManager: def __init__(self): self.user_preferences = {} self.clarification_history = {} def should_request_clarification(self, mention, candidates, user_id): # Check user preferences for disambiguation tolerance tolerance = self.user_preferences.get(user_id, {}).get('ambiguity_tolerance', 0.5) # Calculate ambiguity score ambiguity_score = self.calculate_ambiguity_score(candidates) # Check recent clarification frequency recent_clarifications = self.get_recent_clarifications(user_id) clarification_fatigue = min(len(recent_clarifications) / 10, 0.8) # Adjust threshold based on context adjusted_threshold = tolerance + clarification_fatigue return ambiguity_score > adjusted_threshold def learn_from_clarification_response(self, user_id, clarification, user_response): # Update user preferences based on response patterns if user_response.type == 'frustrated': self.increase_tolerance(user_id) elif user_response.type == 'appreciative': self.decrease_tolerance(user_id) # Store clarification for future reference self.clarification_history[user_id].append({ 'clarification': clarification, 'response': user_response, 'timestamp': datetime.now() })

Strategy 2: Context-Driven Disambiguation

Implicit Disambiguation

def disambiguate_implicitly(mention, candidates, conversation_context): # Use conversation context to filter candidates relevant_candidates = [] for candidate in candidates: relevance_score = calculate_contextual_relevance(candidate, conversation_context) if relevance_score > RELEVANCE_THRESHOLD: relevant_candidates.append((candidate, relevance_score)) # If context sufficiently narrows candidates, resolve implicitly if len(relevant_candidates) == 1: return relevant_candidates[0][0] elif len(relevant_candidates) > 1: # Select most contextually relevant return max(relevant_candidates, key=lambda x: x[1])[0] else: return None # Cannot disambiguate from context def calculate_contextual_relevance(candidate, context): relevance = 0 # Topic alignment if context.current_topic: topic_relevance = calculate_topic_alignment(candidate, context.current_topic) relevance += topic_relevance * TOPIC_WEIGHT # Recent entity co-occurrence recent_entities = context.get_recent_entities() for entity in recent_entities: co_occurrence_score = calculate_co_occurrence(candidate, entity) relevance += co_occurrence_score * CO_OCCURRENCE_WEIGHT # Temporal relevance temporal_relevance = calculate_temporal_relevance(candidate, context.current_time) relevance += temporal_relevance * TEMPORAL_WEIGHT return relevance

Implementation Patterns

Pattern 1: Entity Registry

Centralized Entity Management

class EntityRegistry: def __init__(self): self.entities = {} self.aliases = {} self.relationships = {} self.resolution_cache = {} def register_entity(self, entity_id, entity_data): self.entities[entity_id] = entity_data # Index aliases for quick lookup for alias in entity_data.get('aliases', []): if alias not in self.aliases: self.aliases[alias] = [] self.aliases[alias].append(entity_id) def resolve_mention(self, mention_text, context=None): # Check cache first cache_key = self.create_cache_key(mention_text, context) if cache_key in self.resolution_cache: return self.resolution_cache[cache_key] # Find candidate entities candidates = self.find_candidates(mention_text) # Apply resolution algorithm resolved_entity = self.apply_resolution_algorithm( mention_text, candidates, context ) # Cache result self.resolution_cache[cache_key] = resolved_entity return resolved_entity def update_entity(self, entity_id, updates): if entity_id in self.entities: self.entities[entity_id].update(updates) # Invalidate related cache entries self.invalidate_cache_for_entity(entity_id) def merge_entities(self, entity_id1, entity_id2): # Combine entity data entity1 = self.entities[entity_id1] entity2 = self.entities[entity_id2] merged_entity = self.merge_entity_data(entity1, entity2) # Update registry self.entities[entity_id1] = merged_entity del self.entities[entity_id2] # Update aliases and relationships self.update_aliases_for_merge(entity_id1, entity_id2) self.update_relationships_for_merge(entity_id1, entity_id2) # Clear cache self.resolution_cache.clear()

Pattern 2: Resolution Pipeline

Multi-Stage Resolution Process

class EntityResolutionPipeline: def __init__(self): self.stages = [ ExactMatchStage(), FuzzyMatchStage(), SemanticMatchStage(), ContextualResolutionStage(), DisambiguationStage() ] def resolve(self, mention, context): resolution_result = ResolutionResult(mention, context) for stage in self.stages: resolution_result = stage.process(resolution_result) # Early termination if high-confidence resolution found if resolution_result.confidence > HIGH_CONFIDENCE_THRESHOLD: break # Skip remaining stages if no candidates remain if not resolution_result.candidates: break return resolution_result class ExactMatchStage(ResolutionStage): def process(self, resolution_result): exact_matches = [] for candidate in resolution_result.candidates: if self.is_exact_match(resolution_result.mention, candidate): exact_matches.append(candidate) if exact_matches: resolution_result.candidates = exact_matches resolution_result.confidence *= EXACT_MATCH_BOOST return resolution_result class SemanticMatchStage(ResolutionStage): def process(self, resolution_result): mention_embedding = self.get_embedding(resolution_result.mention.text) scored_candidates = [] for candidate in resolution_result.candidates: candidate_embedding = self.get_embedding(candidate.canonical_name) similarity = cosine_similarity(mention_embedding, candidate_embedding) if similarity > SEMANTIC_THRESHOLD: candidate.semantic_score = similarity scored_candidates.append(candidate) resolution_result.candidates = scored_candidates return resolution_result

Pattern 3: Hierarchical Entity Models

Multi-Level Entity Representation

class HierarchicalEntityModel: def __init__(self): self.entity_hierarchy = {} self.type_hierarchies = { 'person': ['individual', 'role', 'group'], 'location': ['specific', 'general', 'virtual'], 'object': ['physical', 'digital', 'abstract'] } def add_entity(self, entity_id, entity_type, parent_id=None): self.entity_hierarchy[entity_id] = { 'type': entity_type, 'parent': parent_id, 'children': [], 'attributes': {} } if parent_id and parent_id in self.entity_hierarchy: self.entity_hierarchy[parent_id]['children'].append(entity_id) def resolve_at_appropriate_level(self, mention, context): # Determine appropriate resolution granularity required_specificity = self.determine_required_specificity(mention, context) # Find candidates at different hierarchy levels candidates_by_level = {} for level in self.type_hierarchies[mention.type]: candidates_by_level[level] = self.get_candidates_at_level(mention, level) # Select level that best matches required specificity target_level = self.select_target_level(required_specificity, candidates_by_level) return self.resolve_at_level(mention, target_level, candidates_by_level[target_level]) def determine_required_specificity(self, mention, context): specificity_indicators = { 'high': ['exact', 'specific', 'particular', 'that specific'], 'medium': ['the', 'this'], 'low': ['a', 'any', 'some'] } for level, indicators in specificity_indicators.items(): if any(indicator in mention.context.lower() for indicator in indicators): return level return 'medium' # Default specificity level

Evaluation and Metrics

Resolution Quality Metrics

Accuracy Metrics

def evaluate_resolution_accuracy(test_cases): correct_resolutions = 0 total_resolutions = 0 for test_case in test_cases: predicted_entity = resolve_entity(test_case.mention, test_case.context) ground_truth = test_case.ground_truth_entity if predicted_entity == ground_truth: correct_resolutions += 1 total_resolutions += 1 accuracy = correct_resolutions / total_resolutions return accuracy def evaluate_disambiguation_quality(test_cases): metrics = { 'precision': 0, 'recall': 0, 'f1': 0, 'disambiguation_rate': 0 } true_positives = 0 false_positives = 0 false_negatives = 0 successful_disambiguations = 0 for test_case in test_cases: prediction = disambiguate_entity(test_case.mention, test_case.candidates) ground_truth = test_case.correct_entity if prediction == ground_truth: true_positives += 1 successful_disambiguations += 1 elif prediction is not None: false_positives += 1 else: false_negatives += 1 metrics['precision'] = true_positives / (true_positives + false_positives) metrics['recall'] = true_positives / (true_positives + false_negatives) metrics['f1'] = 2 * (metrics['precision'] * metrics['recall']) / (metrics['precision'] + metrics['recall']) metrics['disambiguation_rate'] = successful_disambiguations / len(test_cases) return metrics

Performance Metrics

Efficiency Tracking

class ResolutionPerformanceTracker: def __init__(self): self.resolution_times = [] self.cache_hit_rates = [] self.memory_usage = [] def track_resolution(self, mention, resolution_time, cache_hit, memory_used): self.resolution_times.append(resolution_time) self.cache_hit_rates.append(1 if cache_hit else 0) self.memory_usage.append(memory_used) def get_performance_summary(self): return { 'avg_resolution_time': np.mean(self.resolution_times), 'p95_resolution_time': np.percentile(self.resolution_times, 95), 'cache_hit_rate': np.mean(self.cache_hit_rates), 'avg_memory_usage': np.mean(self.memory_usage), 'total_resolutions': len(self.resolution_times) }

Best Practices

Design Principles

  1. Incremental Resolution: Start with simple techniques and add complexity as needed
  2. Context Awareness: Always consider the conversational and domain context
  3. User Feedback Integration: Learn from user corrections and clarifications
  4. Graceful Degradation: Handle ambiguity and uncertainty gracefully
  5. Privacy Preservation: Respect user privacy in entity tracking and storage

Implementation Guidelines

  1. Caching Strategy: Implement intelligent caching for frequently resolved entities
  2. Confidence Thresholds: Establish appropriate confidence thresholds for different use cases
  3. Error Handling: Plan for resolution failures and provide fallback mechanisms
  4. Performance Monitoring: Track resolution accuracy and performance metrics
  5. User Control: Allow users to correct entity resolutions and manage entity data

Common Pitfalls

  1. Over-Resolution: Attempting to resolve every mention when some ambiguity is acceptable
  2. Context Ignoring: Failing to use conversation context for disambiguation
  3. Static Models: Not adapting resolution strategies based on user feedback
  4. Privacy Violations: Storing too much identifying information about entities
  5. Performance Neglect: Not optimizing resolution for real-time conversation requirements

Next Steps


Entity resolution and disambiguation are the foundation of sophisticated agent reasoning. Master these techniques to build agents that understand not just what users say, but who and what they’re talking about.