Skip to Content
DocumentationBenchmarksMemory Compression

Memory Compression Benchmarks

Memory compression is essential for efficient agent memory systems, reducing storage costs and improving retrieval performance. This guide provides comprehensive benchmarking methodologies for evaluating compression algorithms, techniques, and trade-offs in agent memory architectures.

Overview

Effective memory compression balances three key factors:

  • Compression Ratio: How much space is saved
  • Performance Impact: Speed of compression/decompression
  • Information Preservation: Quality of compressed representations

Core Compression Metrics

Compression Ratio Analysis

import pickle import gzip import lzma import bz2 import zlib import time import numpy as np from typing import List, Dict, Any class CompressionBenchmark: def __init__(self): self.algorithms = { 'gzip': self._compress_gzip, 'lzma': self._compress_lzma, 'bz2': self._compress_bz2, 'zlib': self._compress_zlib } def benchmark_compression_algorithms(self, data_samples: List[Any]) -> Dict: """ Benchmark different compression algorithms on various data types Args: data_samples: List of different data structures to compress """ results = {} for alg_name, compress_func in self.algorithms.items(): print(f"Benchmarking {alg_name}...") alg_results = [] for i, sample in enumerate(data_samples): # Serialize data first original_data = pickle.dumps(sample) original_size = len(original_data) # Measure compression start_time = time.time() compressed_data = compress_func(original_data) compress_time = time.time() - start_time # Measure decompression start_time = time.time() decompressed_data = self._decompress(compressed_data, alg_name) decompress_time = time.time() - start_time # Verify integrity integrity_ok = decompressed_data == original_data compressed_size = len(compressed_data) compression_ratio = original_size / compressed_size space_savings = 1 - (compressed_size / original_size) alg_results.append({ 'sample_id': i, 'original_size': original_size, 'compressed_size': compressed_size, 'compression_ratio': compression_ratio, 'space_savings': space_savings, 'compress_time': compress_time, 'decompress_time': decompress_time, 'total_time': compress_time + decompress_time, 'integrity_ok': integrity_ok, 'throughput_mb_s': (original_size / (1024*1024)) / (compress_time + decompress_time) }) # Aggregate results results[alg_name] = { 'individual_results': alg_results, 'avg_compression_ratio': np.mean([r['compression_ratio'] for r in alg_results]), 'avg_space_savings': np.mean([r['space_savings'] for r in alg_results]), 'avg_compress_time': np.mean([r['compress_time'] for r in alg_results]), 'avg_decompress_time': np.mean([r['decompress_time'] for r in alg_results]), 'avg_throughput': np.mean([r['throughput_mb_s'] for r in alg_results]), 'integrity_success_rate': np.mean([r['integrity_ok'] for r in alg_results]) } return results def _compress_gzip(self, data): return gzip.compress(data) def _compress_lzma(self, data): return lzma.compress(data) def _compress_bz2(self, data): return bz2.compress(data) def _compress_zlib(self, data): return zlib.compress(data) def _decompress(self, data, algorithm): if algorithm == 'gzip': return gzip.decompress(data) elif algorithm == 'lzma': return lzma.decompress(data) elif algorithm == 'bz2': return bz2.decompress(data) elif algorithm == 'zlib': return zlib.decompress(data) # Example usage def generate_test_data(): """Generate diverse test data for compression benchmarking""" return [ # Text documents "This is a sample document with repeated text patterns. " * 100, # JSON-like structured data { "users": [ {"id": i, "name": f"user_{i}", "email": f"user_{i}@example.com"} for i in range(1000) ] }, # Embedding vectors (common in agent memory) [np.random.rand(384).tolist() for _ in range(100)], # Mixed conversation history [ {"role": "user", "content": "What is machine learning?" + " " * 50}, {"role": "assistant", "content": "Machine learning is a subset of AI..." + " " * 100} ] * 50, # Large text corpus "Natural language processing " * 1000 + "Deep learning models " * 500, # Sparse data with many nulls [None if i % 3 == 0 else f"value_{i}" for i in range(10000)] ] benchmark = CompressionBenchmark() test_data = generate_test_data() compression_results = benchmark.benchmark_compression_algorithms(test_data) # Analyze results for algorithm, results in compression_results.items(): print(f"\n{algorithm.upper()} Results:") print(f" Average Compression Ratio: {results['avg_compression_ratio']:.2f}x") print(f" Average Space Savings: {results['avg_space_savings']:.1%}") print(f" Average Throughput: {results['avg_throughput']:.2f} MB/s")

Semantic Compression Evaluation

class SemanticCompressionBenchmark: def __init__(self, embedding_model, summarization_model): self.embedding_model = embedding_model self.summarization_model = summarization_model def benchmark_semantic_compression_techniques(self, documents: List[str]): """ Benchmark semantic compression techniques like summarization and clustering Args: documents: List of text documents to compress semantically """ techniques = { 'extractive_summary': self._extractive_summarization, 'abstractive_summary': self._abstractive_summarization, 'embedding_clustering': self._embedding_clustering, 'topic_modeling': self._topic_modeling, 'hierarchical_compression': self._hierarchical_compression } results = {} for technique_name, technique_func in techniques.items(): print(f"Benchmarking {technique_name}...") # Measure original content size original_content = " ".join(documents) original_size = len(original_content.encode('utf-8')) # Apply compression technique start_time = time.time() compressed_representation = technique_func(documents) compression_time = time.time() - start_time # Measure compressed size compressed_content = str(compressed_representation) compressed_size = len(compressed_content.encode('utf-8')) # Evaluate semantic preservation semantic_similarity = self._evaluate_semantic_preservation( original_content, compressed_content ) # Calculate metrics compression_ratio = original_size / compressed_size space_savings = 1 - (compressed_size / original_size) results[technique_name] = { 'original_size': original_size, 'compressed_size': compressed_size, 'compression_ratio': compression_ratio, 'space_savings': space_savings, 'compression_time': compression_time, 'semantic_similarity': semantic_similarity, 'quality_efficiency_score': semantic_similarity / compression_time, 'compressed_representation': compressed_representation } return results def _extractive_summarization(self, documents): """Extract key sentences from documents""" from transformers import pipeline summarizer = pipeline("summarization", model="facebook/bart-large-cnn") summaries = [] for doc in documents: if len(doc) > 100: # Only summarize substantial content try: summary = summarizer(doc, max_length=50, min_length=10)[0]['summary_text'] summaries.append(summary) except: summaries.append(doc[:100]) # Fallback to truncation else: summaries.append(doc) return summaries def _abstractive_summarization(self, documents): """Generate abstractive summaries""" # Use pre-configured summarization model summaries = [] for doc in documents: summary = self.summarization_model.summarize(doc, max_length=100) summaries.append(summary) return summaries def _embedding_clustering(self, documents): """Cluster documents and represent clusters with centroids + representatives""" from sklearn.cluster import KMeans # Get embeddings embeddings = [self.embedding_model.encode(doc) for doc in documents] embeddings_array = np.array(embeddings) # Cluster documents n_clusters = min(len(documents) // 5, 20) # Adaptive cluster count kmeans = KMeans(n_clusters=n_clusters) cluster_labels = kmeans.fit_predict(embeddings_array) # Create compressed representation cluster_representation = {} for i in range(n_clusters): cluster_docs = [documents[j] for j in range(len(documents)) if cluster_labels[j] == i] # Find most representative document in cluster cluster_center = kmeans.cluster_centers_[i] cluster_embeddings = [embeddings[j] for j in range(len(documents)) if cluster_labels[j] == i] if cluster_embeddings: similarities = [ np.dot(cluster_center, emb) / (np.linalg.norm(cluster_center) * np.linalg.norm(emb)) for emb in cluster_embeddings ] representative_idx = np.argmax(similarities) representative_doc = cluster_docs[representative_idx] cluster_representation[f"cluster_{i}"] = { 'representative': representative_doc, 'count': len(cluster_docs), 'centroid': cluster_center.tolist() } return cluster_representation def _topic_modeling(self, documents): """Use topic modeling for compression""" from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.decomposition import LatentDirichletAllocation # Vectorize documents vectorizer = TfidfVectorizer(max_features=100, stop_words='english') doc_term_matrix = vectorizer.fit_transform(documents) # Apply LDA n_topics = min(len(documents) // 3, 10) lda = LatentDirichletAllocation(n_components=n_topics) doc_topic_matrix = lda.fit_transform(doc_term_matrix) # Create compressed representation feature_names = vectorizer.get_feature_names_out() topic_representation = {} for topic_idx, topic in enumerate(lda.components_): top_words_idx = topic.argsort()[-10:][::-1] # Top 10 words top_words = [feature_names[i] for i in top_words_idx] # Find documents most associated with this topic topic_docs = np.argsort(doc_topic_matrix[:, topic_idx])[-3:][::-1] topic_representation[f"topic_{topic_idx}"] = { 'keywords': top_words, 'weight': topic[top_words_idx].tolist(), 'representative_docs': [documents[i][:200] for i in topic_docs] } return topic_representation def _hierarchical_compression(self, documents): """Multi-level compression: cluster -> summarize -> compress""" # First level: cluster similar documents clusters = self._embedding_clustering(documents) # Second level: summarize each cluster summarized_clusters = {} for cluster_id, cluster_data in clusters.items(): representative = cluster_data['representative'] summary = self._abstractive_summarization([representative])[0] summarized_clusters[cluster_id] = { 'summary': summary, 'count': cluster_data['count'], 'centroid': cluster_data['centroid'][:10] # Truncate centroid } return summarized_clusters def _evaluate_semantic_preservation(self, original_text, compressed_text): """Evaluate how well semantic meaning is preserved""" original_embedding = self.embedding_model.encode(original_text) compressed_embedding = self.embedding_model.encode(compressed_text) # Calculate cosine similarity similarity = np.dot(original_embedding, compressed_embedding) / ( np.linalg.norm(original_embedding) * np.linalg.norm(compressed_embedding) ) return float(similarity) # Example usage from sentence_transformers import SentenceTransformer # Initialize models (replace with your actual models) embedding_model = SentenceTransformer('all-MiniLM-L6-v2') class DummySummarizer: def summarize(self, text, max_length=100): return text[:max_length] + "..." if len(text) > max_length else text summarization_model = DummySummarizer() # Generate test documents test_documents = [ "Machine learning is a powerful tool for data analysis and prediction...", "Deep neural networks have revolutionized computer vision and natural language processing...", "Artificial intelligence systems require careful consideration of ethical implications...", # Add more diverse documents ] * 10 semantic_benchmark = SemanticCompressionBenchmark(embedding_model, summarization_model) semantic_results = semantic_benchmark.benchmark_semantic_compression_techniques(test_documents) for technique, results in semantic_results.items(): print(f"\n{technique.upper()}:") print(f" Compression Ratio: {results['compression_ratio']:.2f}x") print(f" Space Savings: {results['space_savings']:.1%}") print(f" Semantic Similarity: {results['semantic_similarity']:.3f}") print(f" Quality/Efficiency Score: {results['quality_efficiency_score']:.3f}")

Vector Compression Strategies

Embedding Dimensionality Reduction

class VectorCompressionBenchmark: def __init__(self): self.reduction_techniques = { 'pca': self._pca_reduction, 'random_projection': self._random_projection, 'autoencoder': self._autoencoder_compression, 'quantization': self._vector_quantization, 'pruning': self._vector_pruning } def benchmark_vector_compression(self, embeddings: np.ndarray, target_dimensions: List[int]): """ Benchmark vector compression techniques across different target dimensions Args: embeddings: Array of shape (n_vectors, dimensions) target_dimensions: List of target dimensions to compress to """ original_dim = embeddings.shape[1] results = {} for technique_name, technique_func in self.reduction_techniques.items(): print(f"Benchmarking {technique_name}...") technique_results = {} for target_dim in target_dimensions: if target_dim >= original_dim: continue # Apply compression start_time = time.time() compressed_embeddings, compression_model = technique_func(embeddings, target_dim) compression_time = time.time() - start_time # Evaluate quality metrics quality_metrics = self._evaluate_compression_quality( embeddings, compressed_embeddings, technique_name ) # Calculate compression metrics compression_ratio = original_dim / target_dim memory_savings = 1 - (target_dim / original_dim) technique_results[target_dim] = { 'compression_ratio': compression_ratio, 'memory_savings': memory_savings, 'compression_time': compression_time, 'quality_metrics': quality_metrics, 'compressed_shape': compressed_embeddings.shape } results[technique_name] = technique_results return results def _pca_reduction(self, embeddings, target_dim): """Principal Component Analysis reduction""" from sklearn.decomposition import PCA pca = PCA(n_components=target_dim) compressed = pca.fit_transform(embeddings) return compressed, pca def _random_projection(self, embeddings, target_dim): """Random projection reduction""" from sklearn.random_projection import GaussianRandomProjection rp = GaussianRandomProjection(n_components=target_dim) compressed = rp.fit_transform(embeddings) return compressed, rp def _autoencoder_compression(self, embeddings, target_dim): """Autoencoder-based compression""" import tensorflow as tf from tensorflow import keras input_dim = embeddings.shape[1] # Simple autoencoder architecture encoder_input = keras.Input(shape=(input_dim,)) encoded = keras.layers.Dense(target_dim * 2, activation='relu')(encoder_input) encoded = keras.layers.Dense(target_dim, activation='relu')(encoded) decoder = keras.layers.Dense(target_dim * 2, activation='relu')(encoded) decoder_output = keras.layers.Dense(input_dim, activation='linear')(decoder) autoencoder = keras.Model(encoder_input, decoder_output) encoder = keras.Model(encoder_input, encoded) # Train autoencoder autoencoder.compile(optimizer='adam', loss='mse') autoencoder.fit(embeddings, embeddings, epochs=50, batch_size=32, verbose=0) # Compress embeddings compressed = encoder.predict(embeddings) return compressed, encoder def _vector_quantization(self, embeddings, target_dim): """Vector quantization using codebook""" from sklearn.cluster import MiniBatchKMeans # Create codebook with target_dim centroids kmeans = MiniBatchKMeans(n_clusters=target_dim) cluster_assignments = kmeans.fit_predict(embeddings) # Represent each vector by its cluster assignment and distance to centroid centroids = kmeans.cluster_centers_ # Create compressed representation: (cluster_id, offset_vector) compressed = [] for i, assignment in enumerate(cluster_assignments): centroid = centroids[assignment] offset = embeddings[i] - centroid # Keep only top-k components of offset (further compression) k = min(10, len(offset)) top_k_indices = np.argsort(np.abs(offset))[-k:] sparse_offset = np.zeros_like(offset) sparse_offset[top_k_indices] = offset[top_k_indices] compressed.append([assignment] + sparse_offset.tolist()) compressed = np.array(compressed) return compressed, (kmeans, centroids) def _vector_pruning(self, embeddings, target_dim): """Remove least important dimensions""" # Calculate variance across all embeddings for each dimension variances = np.var(embeddings, axis=0) # Keep dimensions with highest variance important_dims = np.argsort(variances)[-target_dim:] compressed = embeddings[:, important_dims] return compressed, important_dims def _evaluate_compression_quality(self, original, compressed, technique): """Evaluate quality of compressed embeddings""" # For techniques that change dimensionality, we need to handle differently if technique in ['quantization']: # For quantization, reconstruct original dimensionality for comparison return self._evaluate_quantization_quality(original, compressed) # Calculate similarity preservation similarity_preservation = self._calculate_similarity_preservation(original, compressed) # Calculate reconstruction error (if possible to reconstruct) reconstruction_error = None if original.shape[1] >= compressed.shape[1]: # Can't directly reconstruct, but can measure relative quality reconstruction_error = np.mean(np.linalg.norm(original[:100] - compressed[:100], axis=1)) return { 'similarity_preservation': similarity_preservation, 'reconstruction_error': reconstruction_error, 'variance_retained': np.var(compressed) / np.var(original) } def _calculate_similarity_preservation(self, original, compressed): """Calculate how well pairwise similarities are preserved""" # Sample subset for efficiency n_samples = min(100, original.shape[0]) sample_indices = np.random.choice(original.shape[0], n_samples, replace=False) orig_sample = original[sample_indices] comp_sample = compressed[sample_indices] # Calculate pairwise cosine similarities orig_similarities = np.zeros((n_samples, n_samples)) comp_similarities = np.zeros((n_samples, n_samples)) for i in range(n_samples): for j in range(i+1, n_samples): orig_sim = np.dot(orig_sample[i], orig_sample[j]) / ( np.linalg.norm(orig_sample[i]) * np.linalg.norm(orig_sample[j]) ) comp_sim = np.dot(comp_sample[i], comp_sample[j]) / ( np.linalg.norm(comp_sample[i]) * np.linalg.norm(comp_sample[j]) ) orig_similarities[i, j] = orig_sim comp_similarities[i, j] = comp_sim # Calculate correlation between similarity matrices orig_flat = orig_similarities[np.triu_indices(n_samples, 1)] comp_flat = comp_similarities[np.triu_indices(n_samples, 1)] correlation = np.corrcoef(orig_flat, comp_flat)[0, 1] return correlation if not np.isnan(correlation) else 0.0 def _evaluate_quantization_quality(self, original, compressed): """Special evaluation for quantized vectors""" # For quantization, measure how well cluster assignments preserve neighborhoods # Sample evaluation n_samples = min(100, original.shape[0]) sample_indices = np.random.choice(original.shape[0], n_samples, replace=False) # Check if nearest neighbors are preserved after quantization preserved_neighbors = 0 total_comparisons = 0 for i in sample_indices: # Find nearest neighbors in original space distances = np.linalg.norm(original - original[i], axis=1) orig_neighbors = np.argsort(distances)[1:6] # Top 5 neighbors # Find nearest neighbors in quantized space (by cluster assignment) cluster_i = int(compressed[i][0]) same_cluster = [j for j in range(len(compressed)) if int(compressed[j][0]) == cluster_i] # Count how many original neighbors are in the same cluster preserved = len(set(orig_neighbors) & set(same_cluster)) preserved_neighbors += preserved total_comparisons += len(orig_neighbors) neighbor_preservation = preserved_neighbors / total_comparisons if total_comparisons > 0 else 0 return { 'similarity_preservation': neighbor_preservation, 'reconstruction_error': None, # Not applicable for quantization 'variance_retained': None # Not applicable for quantization } # Example usage # Generate sample embeddings n_vectors = 1000 original_dim = 384 embeddings = np.random.rand(n_vectors, original_dim) vector_benchmark = VectorCompressionBenchmark() target_dims = [64, 128, 192, 256] compression_results = vector_benchmark.benchmark_vector_compression(embeddings, target_dims) # Analyze results for technique, dims_results in compression_results.items(): print(f"\n{technique.upper()} Results:") for dim, metrics in dims_results.items(): print(f" {dim}D: Compression {metrics['compression_ratio']:.1f}x, " f"Quality {metrics['quality_metrics']['similarity_preservation']:.3f}, " f"Time {metrics['compression_time']:.3f}s")

Production Memory Compression

Dynamic Compression Strategy

class DynamicCompressionManager: def __init__(self, storage_backend): self.storage = storage_backend self.compression_policies = { 'age_based': self._age_based_compression, 'access_frequency': self._access_frequency_compression, 'content_type': self._content_type_compression, 'storage_pressure': self._storage_pressure_compression } self.compression_stats = { 'total_compressed': 0, 'space_saved': 0, 'compression_operations': 0 } def apply_dynamic_compression(self, documents: List[Dict]): """Apply compression based on multiple policies""" compression_decisions = [] for doc in documents: # Analyze document for compression decision age_days = (time.time() - doc.get('created_at', time.time())) / 86400 access_count = doc.get('access_count', 0) last_access_days = (time.time() - doc.get('last_accessed', time.time())) / 86400 content_type = doc.get('type', 'text') content_size = len(str(doc.get('content', ''))) # Score document for compression priority compression_score = 0 # Age factor if age_days > 30: compression_score += 3 elif age_days > 7: compression_score += 2 elif age_days > 1: compression_score += 1 # Access frequency factor if access_count == 0: compression_score += 2 elif access_count < 5: compression_score += 1 # Recency factor if last_access_days > 7: compression_score += 2 elif last_access_days > 1: compression_score += 1 # Size factor if content_size > 10000: compression_score += 2 elif content_size > 1000: compression_score += 1 # Determine compression strategy strategy = self._select_compression_strategy(doc, compression_score) compression_decisions.append({ 'document_id': doc['id'], 'strategy': strategy, 'score': compression_score, 'estimated_savings': self._estimate_compression_savings(doc, strategy) }) # Execute compression decisions for decision in compression_decisions: if decision['strategy'] != 'none': self._execute_compression(decision) return compression_decisions def _select_compression_strategy(self, doc, score): """Select appropriate compression strategy based on score and content""" content_type = doc.get('type', 'text') content_size = len(str(doc.get('content', ''))) if score < 3: return 'none' elif score < 6: if content_type == 'embedding': return 'vector_quantization' elif content_type == 'conversation': return 'semantic_summary' else: return 'gzip_compression' else: # score >= 6 if content_type == 'embedding': return 'pca_reduction' elif content_type == 'conversation': return 'hierarchical_compression' else: return 'aggressive_compression' def _estimate_compression_savings(self, doc, strategy): """Estimate space savings for compression strategy""" content_size = len(str(doc.get('content', ''))) savings_estimates = { 'none': 0, 'gzip_compression': 0.6, # 60% savings typical 'aggressive_compression': 0.8, # 80% savings 'vector_quantization': 0.5, # 50% savings 'pca_reduction': 0.7, # 70% savings 'semantic_summary': 0.85, # 85% savings 'hierarchical_compression': 0.9 # 90% savings } estimated_savings = content_size * savings_estimates.get(strategy, 0) return estimated_savings def _execute_compression(self, decision): """Execute compression decision""" doc_id = decision['document_id'] strategy = decision['strategy'] try: # Retrieve document doc = self.storage.get_document(doc_id) original_size = len(str(doc)) # Apply compression strategy if strategy == 'gzip_compression': compressed_doc = self._apply_gzip_compression(doc) elif strategy == 'aggressive_compression': compressed_doc = self._apply_aggressive_compression(doc) elif strategy == 'vector_quantization': compressed_doc = self._apply_vector_quantization(doc) elif strategy == 'pca_reduction': compressed_doc = self._apply_pca_reduction(doc) elif strategy == 'semantic_summary': compressed_doc = self._apply_semantic_summary(doc) elif strategy == 'hierarchical_compression': compressed_doc = self._apply_hierarchical_compression(doc) # Update document in storage compressed_doc['compression_metadata'] = { 'strategy': strategy, 'original_size': original_size, 'compressed_at': time.time(), 'compression_ratio': original_size / len(str(compressed_doc)) } self.storage.update_document(doc_id, compressed_doc) # Update stats self.compression_stats['total_compressed'] += 1 self.compression_stats['space_saved'] += ( original_size - len(str(compressed_doc)) ) self.compression_stats['compression_operations'] += 1 except Exception as e: print(f"Compression failed for document {doc_id}: {e}") def _apply_gzip_compression(self, doc): """Apply gzip compression to document content""" import base64 content = str(doc.get('content', '')) compressed_content = gzip.compress(content.encode('utf-8')) encoded_content = base64.b64encode(compressed_content).decode('utf-8') compressed_doc = doc.copy() compressed_doc['content'] = encoded_content compressed_doc['content_encoding'] = 'gzip_b64' return compressed_doc def _apply_aggressive_compression(self, doc): """Apply multiple compression techniques""" # First apply semantic compression, then gzip semantically_compressed = self._apply_semantic_summary(doc) return self._apply_gzip_compression(semantically_compressed) def _apply_vector_quantization(self, doc): """Apply vector quantization to embeddings""" if 'embedding' not in doc: return doc embedding = np.array(doc['embedding']) # Simple quantization - reduce precision quantized = np.round(embedding * 127).astype(np.int8) compressed_doc = doc.copy() compressed_doc['embedding'] = quantized.tolist() compressed_doc['embedding_encoding'] = 'int8_quantized' return compressed_doc def _apply_pca_reduction(self, doc): """Apply PCA reduction to embeddings""" if 'embedding' not in doc: return doc embedding = np.array(doc['embedding']).reshape(1, -1) # Reduce to 75% of original dimensions target_dim = int(embedding.shape[1] * 0.75) from sklearn.decomposition import PCA pca = PCA(n_components=target_dim) reduced_embedding = pca.fit_transform(embedding)[0] compressed_doc = doc.copy() compressed_doc['embedding'] = reduced_embedding.tolist() compressed_doc['pca_components'] = pca.components_.tolist() compressed_doc['embedding_encoding'] = 'pca_reduced' return compressed_doc def _apply_semantic_summary(self, doc): """Apply semantic summarization""" content = doc.get('content', '') if len(content) > 200: # Simple extractive summarization sentences = content.split('. ') # Keep first and last sentences, plus middle ones if short enough if len(sentences) > 3: summary_sentences = [sentences[0], sentences[-1]] # Add middle sentences until we reach ~50% of original target_length = len(content) * 0.5 current_length = len('. '.join(summary_sentences)) for sentence in sentences[1:-1]: if current_length + len(sentence) < target_length: summary_sentences.insert(-1, sentence) current_length += len(sentence) else: break summarized_content = '. '.join(summary_sentences) else: summarized_content = content else: summarized_content = content compressed_doc = doc.copy() compressed_doc['content'] = summarized_content compressed_doc['content_encoding'] = 'semantic_summary' return compressed_doc def _apply_hierarchical_compression(self, doc): """Apply multiple levels of compression""" # 1. Semantic compression semantically_compressed = self._apply_semantic_summary(doc) # 2. Vector compression if applicable if 'embedding' in semantically_compressed: vector_compressed = self._apply_vector_quantization(semantically_compressed) else: vector_compressed = semantically_compressed # 3. Final gzip compression final_compressed = self._apply_gzip_compression(vector_compressed) final_compressed['content_encoding'] = 'hierarchical' return final_compressed def get_compression_statistics(self): """Get current compression statistics""" return { 'total_documents_compressed': self.compression_stats['total_compressed'], 'total_space_saved_bytes': self.compression_stats['space_saved'], 'total_compression_operations': self.compression_stats['compression_operations'], 'average_space_savings': ( self.compression_stats['space_saved'] / max(self.compression_stats['total_compressed'], 1) ) } def optimize_compression_schedule(self, performance_targets): """Optimize compression schedule based on performance targets""" targets = { 'max_compression_time_per_doc': 1.0, # seconds 'min_space_savings_threshold': 0.3, # 30% 'max_quality_degradation': 0.1 # 10% } targets.update(performance_targets) # Analyze current compression performance recent_operations = self._get_recent_compression_operations() recommendations = [] # Check if compression is taking too long avg_compression_time = np.mean([op['time'] for op in recent_operations]) if avg_compression_time > targets['max_compression_time_per_doc']: recommendations.append({ 'issue': 'Compression taking too long', 'current_avg_time': avg_compression_time, 'target': targets['max_compression_time_per_doc'], 'recommendation': 'Use faster compression algorithms or reduce compression levels' }) # Check space savings avg_space_savings = np.mean([op['space_savings'] for op in recent_operations]) if avg_space_savings < targets['min_space_savings_threshold']: recommendations.append({ 'issue': 'Insufficient space savings', 'current_savings': avg_space_savings, 'target': targets['min_space_savings_threshold'], 'recommendation': 'Apply more aggressive compression strategies' }) return recommendations def _get_recent_compression_operations(self): """Get recent compression operation metrics""" # This would integrate with your monitoring system # Here's a placeholder implementation return [ {'time': 0.5, 'space_savings': 0.6, 'quality_loss': 0.05}, {'time': 0.8, 'space_savings': 0.4, 'quality_loss': 0.02}, {'time': 1.2, 'space_savings': 0.7, 'quality_loss': 0.08}, ] # Example usage dynamic_compressor = DynamicCompressionManager(your_storage_backend) # Sample documents for compression sample_documents = [ { 'id': 'doc1', 'content': 'Long document content...' * 100, 'type': 'text', 'created_at': time.time() - 86400 * 5, # 5 days old 'access_count': 2, 'last_accessed': time.time() - 86400 * 3 }, { 'id': 'doc2', 'content': 'Another document...', 'type': 'conversation', 'embedding': np.random.rand(384).tolist(), 'created_at': time.time() - 86400 * 30, # 30 days old 'access_count': 0, 'last_accessed': time.time() - 86400 * 30 } ] # Apply dynamic compression decisions = dynamic_compressor.apply_dynamic_compression(sample_documents) # Get statistics stats = dynamic_compressor.get_compression_statistics() print(f"Compression Statistics: {stats}") # Optimize schedule recommendations = dynamic_compressor.optimize_compression_schedule({ 'max_compression_time_per_doc': 0.5 }) for rec in recommendations: print(f"Recommendation: {rec['recommendation']}")

Best Practices

Compression Strategy Selection

  1. Content Type Awareness: Choose compression based on data type (text, vectors, structured data)
  2. Access Pattern Analysis: Compress less frequently accessed data more aggressively
  3. Quality Requirements: Balance compression ratio with acceptable quality degradation
  4. Performance Constraints: Consider compression/decompression time in user-facing operations

Monitoring and Optimization

  1. Compression Ratio Tracking: Monitor space savings across different content types
  2. Quality Metrics: Continuously evaluate semantic preservation after compression
  3. Performance Impact: Measure compression/decompression latency impact on queries
  4. Cost-Benefit Analysis: Calculate storage cost savings vs. computational overhead

Production Implementation Guidelines

  1. Gradual Rollout: Implement compression incrementally with rollback capabilities
  2. Backup Strategy: Maintain uncompressed backups during initial deployment
  3. Monitoring: Set up alerts for compression failures and quality degradation
  4. Testing: Thoroughly test compression/decompression pipelines before production

This comprehensive memory compression benchmarking guide provides the tools and methodologies to optimize storage efficiency while maintaining system performance and data quality in agent memory systems.