Memory Compression Benchmarks
Memory compression is essential for efficient agent memory systems, reducing storage costs and improving retrieval performance. This guide provides comprehensive benchmarking methodologies for evaluating compression algorithms, techniques, and trade-offs in agent memory architectures.
Overview
Effective memory compression balances three key factors:
- Compression Ratio: How much space is saved
- Performance Impact: Speed of compression/decompression
- Information Preservation: Quality of compressed representations
Core Compression Metrics
Compression Ratio Analysis
import pickle
import gzip
import lzma
import bz2
import zlib
import time
import numpy as np
from typing import List, Dict, Any
class CompressionBenchmark:
def __init__(self):
self.algorithms = {
'gzip': self._compress_gzip,
'lzma': self._compress_lzma,
'bz2': self._compress_bz2,
'zlib': self._compress_zlib
}
def benchmark_compression_algorithms(self, data_samples: List[Any]) -> Dict:
"""
Benchmark different compression algorithms on various data types
Args:
data_samples: List of different data structures to compress
"""
results = {}
for alg_name, compress_func in self.algorithms.items():
print(f"Benchmarking {alg_name}...")
alg_results = []
for i, sample in enumerate(data_samples):
# Serialize data first
original_data = pickle.dumps(sample)
original_size = len(original_data)
# Measure compression
start_time = time.time()
compressed_data = compress_func(original_data)
compress_time = time.time() - start_time
# Measure decompression
start_time = time.time()
decompressed_data = self._decompress(compressed_data, alg_name)
decompress_time = time.time() - start_time
# Verify integrity
integrity_ok = decompressed_data == original_data
compressed_size = len(compressed_data)
compression_ratio = original_size / compressed_size
space_savings = 1 - (compressed_size / original_size)
alg_results.append({
'sample_id': i,
'original_size': original_size,
'compressed_size': compressed_size,
'compression_ratio': compression_ratio,
'space_savings': space_savings,
'compress_time': compress_time,
'decompress_time': decompress_time,
'total_time': compress_time + decompress_time,
'integrity_ok': integrity_ok,
'throughput_mb_s': (original_size / (1024*1024)) / (compress_time + decompress_time)
})
# Aggregate results
results[alg_name] = {
'individual_results': alg_results,
'avg_compression_ratio': np.mean([r['compression_ratio'] for r in alg_results]),
'avg_space_savings': np.mean([r['space_savings'] for r in alg_results]),
'avg_compress_time': np.mean([r['compress_time'] for r in alg_results]),
'avg_decompress_time': np.mean([r['decompress_time'] for r in alg_results]),
'avg_throughput': np.mean([r['throughput_mb_s'] for r in alg_results]),
'integrity_success_rate': np.mean([r['integrity_ok'] for r in alg_results])
}
return results
def _compress_gzip(self, data):
return gzip.compress(data)
def _compress_lzma(self, data):
return lzma.compress(data)
def _compress_bz2(self, data):
return bz2.compress(data)
def _compress_zlib(self, data):
return zlib.compress(data)
def _decompress(self, data, algorithm):
if algorithm == 'gzip':
return gzip.decompress(data)
elif algorithm == 'lzma':
return lzma.decompress(data)
elif algorithm == 'bz2':
return bz2.decompress(data)
elif algorithm == 'zlib':
return zlib.decompress(data)
# Example usage
def generate_test_data():
"""Generate diverse test data for compression benchmarking"""
return [
# Text documents
"This is a sample document with repeated text patterns. " * 100,
# JSON-like structured data
{
"users": [
{"id": i, "name": f"user_{i}", "email": f"user_{i}@example.com"}
for i in range(1000)
]
},
# Embedding vectors (common in agent memory)
[np.random.rand(384).tolist() for _ in range(100)],
# Mixed conversation history
[
{"role": "user", "content": "What is machine learning?" + " " * 50},
{"role": "assistant", "content": "Machine learning is a subset of AI..." + " " * 100}
] * 50,
# Large text corpus
"Natural language processing " * 1000 + "Deep learning models " * 500,
# Sparse data with many nulls
[None if i % 3 == 0 else f"value_{i}" for i in range(10000)]
]
benchmark = CompressionBenchmark()
test_data = generate_test_data()
compression_results = benchmark.benchmark_compression_algorithms(test_data)
# Analyze results
for algorithm, results in compression_results.items():
print(f"\n{algorithm.upper()} Results:")
print(f" Average Compression Ratio: {results['avg_compression_ratio']:.2f}x")
print(f" Average Space Savings: {results['avg_space_savings']:.1%}")
print(f" Average Throughput: {results['avg_throughput']:.2f} MB/s")Semantic Compression Evaluation
class SemanticCompressionBenchmark:
def __init__(self, embedding_model, summarization_model):
self.embedding_model = embedding_model
self.summarization_model = summarization_model
def benchmark_semantic_compression_techniques(self, documents: List[str]):
"""
Benchmark semantic compression techniques like summarization and clustering
Args:
documents: List of text documents to compress semantically
"""
techniques = {
'extractive_summary': self._extractive_summarization,
'abstractive_summary': self._abstractive_summarization,
'embedding_clustering': self._embedding_clustering,
'topic_modeling': self._topic_modeling,
'hierarchical_compression': self._hierarchical_compression
}
results = {}
for technique_name, technique_func in techniques.items():
print(f"Benchmarking {technique_name}...")
# Measure original content size
original_content = " ".join(documents)
original_size = len(original_content.encode('utf-8'))
# Apply compression technique
start_time = time.time()
compressed_representation = technique_func(documents)
compression_time = time.time() - start_time
# Measure compressed size
compressed_content = str(compressed_representation)
compressed_size = len(compressed_content.encode('utf-8'))
# Evaluate semantic preservation
semantic_similarity = self._evaluate_semantic_preservation(
original_content, compressed_content
)
# Calculate metrics
compression_ratio = original_size / compressed_size
space_savings = 1 - (compressed_size / original_size)
results[technique_name] = {
'original_size': original_size,
'compressed_size': compressed_size,
'compression_ratio': compression_ratio,
'space_savings': space_savings,
'compression_time': compression_time,
'semantic_similarity': semantic_similarity,
'quality_efficiency_score': semantic_similarity / compression_time,
'compressed_representation': compressed_representation
}
return results
def _extractive_summarization(self, documents):
"""Extract key sentences from documents"""
from transformers import pipeline
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
summaries = []
for doc in documents:
if len(doc) > 100: # Only summarize substantial content
try:
summary = summarizer(doc, max_length=50, min_length=10)[0]['summary_text']
summaries.append(summary)
except:
summaries.append(doc[:100]) # Fallback to truncation
else:
summaries.append(doc)
return summaries
def _abstractive_summarization(self, documents):
"""Generate abstractive summaries"""
# Use pre-configured summarization model
summaries = []
for doc in documents:
summary = self.summarization_model.summarize(doc, max_length=100)
summaries.append(summary)
return summaries
def _embedding_clustering(self, documents):
"""Cluster documents and represent clusters with centroids + representatives"""
from sklearn.cluster import KMeans
# Get embeddings
embeddings = [self.embedding_model.encode(doc) for doc in documents]
embeddings_array = np.array(embeddings)
# Cluster documents
n_clusters = min(len(documents) // 5, 20) # Adaptive cluster count
kmeans = KMeans(n_clusters=n_clusters)
cluster_labels = kmeans.fit_predict(embeddings_array)
# Create compressed representation
cluster_representation = {}
for i in range(n_clusters):
cluster_docs = [documents[j] for j in range(len(documents)) if cluster_labels[j] == i]
# Find most representative document in cluster
cluster_center = kmeans.cluster_centers_[i]
cluster_embeddings = [embeddings[j] for j in range(len(documents)) if cluster_labels[j] == i]
if cluster_embeddings:
similarities = [
np.dot(cluster_center, emb) / (np.linalg.norm(cluster_center) * np.linalg.norm(emb))
for emb in cluster_embeddings
]
representative_idx = np.argmax(similarities)
representative_doc = cluster_docs[representative_idx]
cluster_representation[f"cluster_{i}"] = {
'representative': representative_doc,
'count': len(cluster_docs),
'centroid': cluster_center.tolist()
}
return cluster_representation
def _topic_modeling(self, documents):
"""Use topic modeling for compression"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
# Vectorize documents
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
doc_term_matrix = vectorizer.fit_transform(documents)
# Apply LDA
n_topics = min(len(documents) // 3, 10)
lda = LatentDirichletAllocation(n_components=n_topics)
doc_topic_matrix = lda.fit_transform(doc_term_matrix)
# Create compressed representation
feature_names = vectorizer.get_feature_names_out()
topic_representation = {}
for topic_idx, topic in enumerate(lda.components_):
top_words_idx = topic.argsort()[-10:][::-1] # Top 10 words
top_words = [feature_names[i] for i in top_words_idx]
# Find documents most associated with this topic
topic_docs = np.argsort(doc_topic_matrix[:, topic_idx])[-3:][::-1]
topic_representation[f"topic_{topic_idx}"] = {
'keywords': top_words,
'weight': topic[top_words_idx].tolist(),
'representative_docs': [documents[i][:200] for i in topic_docs]
}
return topic_representation
def _hierarchical_compression(self, documents):
"""Multi-level compression: cluster -> summarize -> compress"""
# First level: cluster similar documents
clusters = self._embedding_clustering(documents)
# Second level: summarize each cluster
summarized_clusters = {}
for cluster_id, cluster_data in clusters.items():
representative = cluster_data['representative']
summary = self._abstractive_summarization([representative])[0]
summarized_clusters[cluster_id] = {
'summary': summary,
'count': cluster_data['count'],
'centroid': cluster_data['centroid'][:10] # Truncate centroid
}
return summarized_clusters
def _evaluate_semantic_preservation(self, original_text, compressed_text):
"""Evaluate how well semantic meaning is preserved"""
original_embedding = self.embedding_model.encode(original_text)
compressed_embedding = self.embedding_model.encode(compressed_text)
# Calculate cosine similarity
similarity = np.dot(original_embedding, compressed_embedding) / (
np.linalg.norm(original_embedding) * np.linalg.norm(compressed_embedding)
)
return float(similarity)
# Example usage
from sentence_transformers import SentenceTransformer
# Initialize models (replace with your actual models)
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
class DummySummarizer:
def summarize(self, text, max_length=100):
return text[:max_length] + "..." if len(text) > max_length else text
summarization_model = DummySummarizer()
# Generate test documents
test_documents = [
"Machine learning is a powerful tool for data analysis and prediction...",
"Deep neural networks have revolutionized computer vision and natural language processing...",
"Artificial intelligence systems require careful consideration of ethical implications...",
# Add more diverse documents
] * 10
semantic_benchmark = SemanticCompressionBenchmark(embedding_model, summarization_model)
semantic_results = semantic_benchmark.benchmark_semantic_compression_techniques(test_documents)
for technique, results in semantic_results.items():
print(f"\n{technique.upper()}:")
print(f" Compression Ratio: {results['compression_ratio']:.2f}x")
print(f" Space Savings: {results['space_savings']:.1%}")
print(f" Semantic Similarity: {results['semantic_similarity']:.3f}")
print(f" Quality/Efficiency Score: {results['quality_efficiency_score']:.3f}")Vector Compression Strategies
Embedding Dimensionality Reduction
class VectorCompressionBenchmark:
def __init__(self):
self.reduction_techniques = {
'pca': self._pca_reduction,
'random_projection': self._random_projection,
'autoencoder': self._autoencoder_compression,
'quantization': self._vector_quantization,
'pruning': self._vector_pruning
}
def benchmark_vector_compression(self, embeddings: np.ndarray, target_dimensions: List[int]):
"""
Benchmark vector compression techniques across different target dimensions
Args:
embeddings: Array of shape (n_vectors, dimensions)
target_dimensions: List of target dimensions to compress to
"""
original_dim = embeddings.shape[1]
results = {}
for technique_name, technique_func in self.reduction_techniques.items():
print(f"Benchmarking {technique_name}...")
technique_results = {}
for target_dim in target_dimensions:
if target_dim >= original_dim:
continue
# Apply compression
start_time = time.time()
compressed_embeddings, compression_model = technique_func(embeddings, target_dim)
compression_time = time.time() - start_time
# Evaluate quality metrics
quality_metrics = self._evaluate_compression_quality(
embeddings, compressed_embeddings, technique_name
)
# Calculate compression metrics
compression_ratio = original_dim / target_dim
memory_savings = 1 - (target_dim / original_dim)
technique_results[target_dim] = {
'compression_ratio': compression_ratio,
'memory_savings': memory_savings,
'compression_time': compression_time,
'quality_metrics': quality_metrics,
'compressed_shape': compressed_embeddings.shape
}
results[technique_name] = technique_results
return results
def _pca_reduction(self, embeddings, target_dim):
"""Principal Component Analysis reduction"""
from sklearn.decomposition import PCA
pca = PCA(n_components=target_dim)
compressed = pca.fit_transform(embeddings)
return compressed, pca
def _random_projection(self, embeddings, target_dim):
"""Random projection reduction"""
from sklearn.random_projection import GaussianRandomProjection
rp = GaussianRandomProjection(n_components=target_dim)
compressed = rp.fit_transform(embeddings)
return compressed, rp
def _autoencoder_compression(self, embeddings, target_dim):
"""Autoencoder-based compression"""
import tensorflow as tf
from tensorflow import keras
input_dim = embeddings.shape[1]
# Simple autoencoder architecture
encoder_input = keras.Input(shape=(input_dim,))
encoded = keras.layers.Dense(target_dim * 2, activation='relu')(encoder_input)
encoded = keras.layers.Dense(target_dim, activation='relu')(encoded)
decoder = keras.layers.Dense(target_dim * 2, activation='relu')(encoded)
decoder_output = keras.layers.Dense(input_dim, activation='linear')(decoder)
autoencoder = keras.Model(encoder_input, decoder_output)
encoder = keras.Model(encoder_input, encoded)
# Train autoencoder
autoencoder.compile(optimizer='adam', loss='mse')
autoencoder.fit(embeddings, embeddings, epochs=50, batch_size=32, verbose=0)
# Compress embeddings
compressed = encoder.predict(embeddings)
return compressed, encoder
def _vector_quantization(self, embeddings, target_dim):
"""Vector quantization using codebook"""
from sklearn.cluster import MiniBatchKMeans
# Create codebook with target_dim centroids
kmeans = MiniBatchKMeans(n_clusters=target_dim)
cluster_assignments = kmeans.fit_predict(embeddings)
# Represent each vector by its cluster assignment and distance to centroid
centroids = kmeans.cluster_centers_
# Create compressed representation: (cluster_id, offset_vector)
compressed = []
for i, assignment in enumerate(cluster_assignments):
centroid = centroids[assignment]
offset = embeddings[i] - centroid
# Keep only top-k components of offset (further compression)
k = min(10, len(offset))
top_k_indices = np.argsort(np.abs(offset))[-k:]
sparse_offset = np.zeros_like(offset)
sparse_offset[top_k_indices] = offset[top_k_indices]
compressed.append([assignment] + sparse_offset.tolist())
compressed = np.array(compressed)
return compressed, (kmeans, centroids)
def _vector_pruning(self, embeddings, target_dim):
"""Remove least important dimensions"""
# Calculate variance across all embeddings for each dimension
variances = np.var(embeddings, axis=0)
# Keep dimensions with highest variance
important_dims = np.argsort(variances)[-target_dim:]
compressed = embeddings[:, important_dims]
return compressed, important_dims
def _evaluate_compression_quality(self, original, compressed, technique):
"""Evaluate quality of compressed embeddings"""
# For techniques that change dimensionality, we need to handle differently
if technique in ['quantization']:
# For quantization, reconstruct original dimensionality for comparison
return self._evaluate_quantization_quality(original, compressed)
# Calculate similarity preservation
similarity_preservation = self._calculate_similarity_preservation(original, compressed)
# Calculate reconstruction error (if possible to reconstruct)
reconstruction_error = None
if original.shape[1] >= compressed.shape[1]:
# Can't directly reconstruct, but can measure relative quality
reconstruction_error = np.mean(np.linalg.norm(original[:100] - compressed[:100], axis=1))
return {
'similarity_preservation': similarity_preservation,
'reconstruction_error': reconstruction_error,
'variance_retained': np.var(compressed) / np.var(original)
}
def _calculate_similarity_preservation(self, original, compressed):
"""Calculate how well pairwise similarities are preserved"""
# Sample subset for efficiency
n_samples = min(100, original.shape[0])
sample_indices = np.random.choice(original.shape[0], n_samples, replace=False)
orig_sample = original[sample_indices]
comp_sample = compressed[sample_indices]
# Calculate pairwise cosine similarities
orig_similarities = np.zeros((n_samples, n_samples))
comp_similarities = np.zeros((n_samples, n_samples))
for i in range(n_samples):
for j in range(i+1, n_samples):
orig_sim = np.dot(orig_sample[i], orig_sample[j]) / (
np.linalg.norm(orig_sample[i]) * np.linalg.norm(orig_sample[j])
)
comp_sim = np.dot(comp_sample[i], comp_sample[j]) / (
np.linalg.norm(comp_sample[i]) * np.linalg.norm(comp_sample[j])
)
orig_similarities[i, j] = orig_sim
comp_similarities[i, j] = comp_sim
# Calculate correlation between similarity matrices
orig_flat = orig_similarities[np.triu_indices(n_samples, 1)]
comp_flat = comp_similarities[np.triu_indices(n_samples, 1)]
correlation = np.corrcoef(orig_flat, comp_flat)[0, 1]
return correlation if not np.isnan(correlation) else 0.0
def _evaluate_quantization_quality(self, original, compressed):
"""Special evaluation for quantized vectors"""
# For quantization, measure how well cluster assignments preserve neighborhoods
# Sample evaluation
n_samples = min(100, original.shape[0])
sample_indices = np.random.choice(original.shape[0], n_samples, replace=False)
# Check if nearest neighbors are preserved after quantization
preserved_neighbors = 0
total_comparisons = 0
for i in sample_indices:
# Find nearest neighbors in original space
distances = np.linalg.norm(original - original[i], axis=1)
orig_neighbors = np.argsort(distances)[1:6] # Top 5 neighbors
# Find nearest neighbors in quantized space (by cluster assignment)
cluster_i = int(compressed[i][0])
same_cluster = [j for j in range(len(compressed)) if int(compressed[j][0]) == cluster_i]
# Count how many original neighbors are in the same cluster
preserved = len(set(orig_neighbors) & set(same_cluster))
preserved_neighbors += preserved
total_comparisons += len(orig_neighbors)
neighbor_preservation = preserved_neighbors / total_comparisons if total_comparisons > 0 else 0
return {
'similarity_preservation': neighbor_preservation,
'reconstruction_error': None, # Not applicable for quantization
'variance_retained': None # Not applicable for quantization
}
# Example usage
# Generate sample embeddings
n_vectors = 1000
original_dim = 384
embeddings = np.random.rand(n_vectors, original_dim)
vector_benchmark = VectorCompressionBenchmark()
target_dims = [64, 128, 192, 256]
compression_results = vector_benchmark.benchmark_vector_compression(embeddings, target_dims)
# Analyze results
for technique, dims_results in compression_results.items():
print(f"\n{technique.upper()} Results:")
for dim, metrics in dims_results.items():
print(f" {dim}D: Compression {metrics['compression_ratio']:.1f}x, "
f"Quality {metrics['quality_metrics']['similarity_preservation']:.3f}, "
f"Time {metrics['compression_time']:.3f}s")Production Memory Compression
Dynamic Compression Strategy
class DynamicCompressionManager:
def __init__(self, storage_backend):
self.storage = storage_backend
self.compression_policies = {
'age_based': self._age_based_compression,
'access_frequency': self._access_frequency_compression,
'content_type': self._content_type_compression,
'storage_pressure': self._storage_pressure_compression
}
self.compression_stats = {
'total_compressed': 0,
'space_saved': 0,
'compression_operations': 0
}
def apply_dynamic_compression(self, documents: List[Dict]):
"""Apply compression based on multiple policies"""
compression_decisions = []
for doc in documents:
# Analyze document for compression decision
age_days = (time.time() - doc.get('created_at', time.time())) / 86400
access_count = doc.get('access_count', 0)
last_access_days = (time.time() - doc.get('last_accessed', time.time())) / 86400
content_type = doc.get('type', 'text')
content_size = len(str(doc.get('content', '')))
# Score document for compression priority
compression_score = 0
# Age factor
if age_days > 30:
compression_score += 3
elif age_days > 7:
compression_score += 2
elif age_days > 1:
compression_score += 1
# Access frequency factor
if access_count == 0:
compression_score += 2
elif access_count < 5:
compression_score += 1
# Recency factor
if last_access_days > 7:
compression_score += 2
elif last_access_days > 1:
compression_score += 1
# Size factor
if content_size > 10000:
compression_score += 2
elif content_size > 1000:
compression_score += 1
# Determine compression strategy
strategy = self._select_compression_strategy(doc, compression_score)
compression_decisions.append({
'document_id': doc['id'],
'strategy': strategy,
'score': compression_score,
'estimated_savings': self._estimate_compression_savings(doc, strategy)
})
# Execute compression decisions
for decision in compression_decisions:
if decision['strategy'] != 'none':
self._execute_compression(decision)
return compression_decisions
def _select_compression_strategy(self, doc, score):
"""Select appropriate compression strategy based on score and content"""
content_type = doc.get('type', 'text')
content_size = len(str(doc.get('content', '')))
if score < 3:
return 'none'
elif score < 6:
if content_type == 'embedding':
return 'vector_quantization'
elif content_type == 'conversation':
return 'semantic_summary'
else:
return 'gzip_compression'
else: # score >= 6
if content_type == 'embedding':
return 'pca_reduction'
elif content_type == 'conversation':
return 'hierarchical_compression'
else:
return 'aggressive_compression'
def _estimate_compression_savings(self, doc, strategy):
"""Estimate space savings for compression strategy"""
content_size = len(str(doc.get('content', '')))
savings_estimates = {
'none': 0,
'gzip_compression': 0.6, # 60% savings typical
'aggressive_compression': 0.8, # 80% savings
'vector_quantization': 0.5, # 50% savings
'pca_reduction': 0.7, # 70% savings
'semantic_summary': 0.85, # 85% savings
'hierarchical_compression': 0.9 # 90% savings
}
estimated_savings = content_size * savings_estimates.get(strategy, 0)
return estimated_savings
def _execute_compression(self, decision):
"""Execute compression decision"""
doc_id = decision['document_id']
strategy = decision['strategy']
try:
# Retrieve document
doc = self.storage.get_document(doc_id)
original_size = len(str(doc))
# Apply compression strategy
if strategy == 'gzip_compression':
compressed_doc = self._apply_gzip_compression(doc)
elif strategy == 'aggressive_compression':
compressed_doc = self._apply_aggressive_compression(doc)
elif strategy == 'vector_quantization':
compressed_doc = self._apply_vector_quantization(doc)
elif strategy == 'pca_reduction':
compressed_doc = self._apply_pca_reduction(doc)
elif strategy == 'semantic_summary':
compressed_doc = self._apply_semantic_summary(doc)
elif strategy == 'hierarchical_compression':
compressed_doc = self._apply_hierarchical_compression(doc)
# Update document in storage
compressed_doc['compression_metadata'] = {
'strategy': strategy,
'original_size': original_size,
'compressed_at': time.time(),
'compression_ratio': original_size / len(str(compressed_doc))
}
self.storage.update_document(doc_id, compressed_doc)
# Update stats
self.compression_stats['total_compressed'] += 1
self.compression_stats['space_saved'] += (
original_size - len(str(compressed_doc))
)
self.compression_stats['compression_operations'] += 1
except Exception as e:
print(f"Compression failed for document {doc_id}: {e}")
def _apply_gzip_compression(self, doc):
"""Apply gzip compression to document content"""
import base64
content = str(doc.get('content', ''))
compressed_content = gzip.compress(content.encode('utf-8'))
encoded_content = base64.b64encode(compressed_content).decode('utf-8')
compressed_doc = doc.copy()
compressed_doc['content'] = encoded_content
compressed_doc['content_encoding'] = 'gzip_b64'
return compressed_doc
def _apply_aggressive_compression(self, doc):
"""Apply multiple compression techniques"""
# First apply semantic compression, then gzip
semantically_compressed = self._apply_semantic_summary(doc)
return self._apply_gzip_compression(semantically_compressed)
def _apply_vector_quantization(self, doc):
"""Apply vector quantization to embeddings"""
if 'embedding' not in doc:
return doc
embedding = np.array(doc['embedding'])
# Simple quantization - reduce precision
quantized = np.round(embedding * 127).astype(np.int8)
compressed_doc = doc.copy()
compressed_doc['embedding'] = quantized.tolist()
compressed_doc['embedding_encoding'] = 'int8_quantized'
return compressed_doc
def _apply_pca_reduction(self, doc):
"""Apply PCA reduction to embeddings"""
if 'embedding' not in doc:
return doc
embedding = np.array(doc['embedding']).reshape(1, -1)
# Reduce to 75% of original dimensions
target_dim = int(embedding.shape[1] * 0.75)
from sklearn.decomposition import PCA
pca = PCA(n_components=target_dim)
reduced_embedding = pca.fit_transform(embedding)[0]
compressed_doc = doc.copy()
compressed_doc['embedding'] = reduced_embedding.tolist()
compressed_doc['pca_components'] = pca.components_.tolist()
compressed_doc['embedding_encoding'] = 'pca_reduced'
return compressed_doc
def _apply_semantic_summary(self, doc):
"""Apply semantic summarization"""
content = doc.get('content', '')
if len(content) > 200:
# Simple extractive summarization
sentences = content.split('. ')
# Keep first and last sentences, plus middle ones if short enough
if len(sentences) > 3:
summary_sentences = [sentences[0], sentences[-1]]
# Add middle sentences until we reach ~50% of original
target_length = len(content) * 0.5
current_length = len('. '.join(summary_sentences))
for sentence in sentences[1:-1]:
if current_length + len(sentence) < target_length:
summary_sentences.insert(-1, sentence)
current_length += len(sentence)
else:
break
summarized_content = '. '.join(summary_sentences)
else:
summarized_content = content
else:
summarized_content = content
compressed_doc = doc.copy()
compressed_doc['content'] = summarized_content
compressed_doc['content_encoding'] = 'semantic_summary'
return compressed_doc
def _apply_hierarchical_compression(self, doc):
"""Apply multiple levels of compression"""
# 1. Semantic compression
semantically_compressed = self._apply_semantic_summary(doc)
# 2. Vector compression if applicable
if 'embedding' in semantically_compressed:
vector_compressed = self._apply_vector_quantization(semantically_compressed)
else:
vector_compressed = semantically_compressed
# 3. Final gzip compression
final_compressed = self._apply_gzip_compression(vector_compressed)
final_compressed['content_encoding'] = 'hierarchical'
return final_compressed
def get_compression_statistics(self):
"""Get current compression statistics"""
return {
'total_documents_compressed': self.compression_stats['total_compressed'],
'total_space_saved_bytes': self.compression_stats['space_saved'],
'total_compression_operations': self.compression_stats['compression_operations'],
'average_space_savings': (
self.compression_stats['space_saved'] / max(self.compression_stats['total_compressed'], 1)
)
}
def optimize_compression_schedule(self, performance_targets):
"""Optimize compression schedule based on performance targets"""
targets = {
'max_compression_time_per_doc': 1.0, # seconds
'min_space_savings_threshold': 0.3, # 30%
'max_quality_degradation': 0.1 # 10%
}
targets.update(performance_targets)
# Analyze current compression performance
recent_operations = self._get_recent_compression_operations()
recommendations = []
# Check if compression is taking too long
avg_compression_time = np.mean([op['time'] for op in recent_operations])
if avg_compression_time > targets['max_compression_time_per_doc']:
recommendations.append({
'issue': 'Compression taking too long',
'current_avg_time': avg_compression_time,
'target': targets['max_compression_time_per_doc'],
'recommendation': 'Use faster compression algorithms or reduce compression levels'
})
# Check space savings
avg_space_savings = np.mean([op['space_savings'] for op in recent_operations])
if avg_space_savings < targets['min_space_savings_threshold']:
recommendations.append({
'issue': 'Insufficient space savings',
'current_savings': avg_space_savings,
'target': targets['min_space_savings_threshold'],
'recommendation': 'Apply more aggressive compression strategies'
})
return recommendations
def _get_recent_compression_operations(self):
"""Get recent compression operation metrics"""
# This would integrate with your monitoring system
# Here's a placeholder implementation
return [
{'time': 0.5, 'space_savings': 0.6, 'quality_loss': 0.05},
{'time': 0.8, 'space_savings': 0.4, 'quality_loss': 0.02},
{'time': 1.2, 'space_savings': 0.7, 'quality_loss': 0.08},
]
# Example usage
dynamic_compressor = DynamicCompressionManager(your_storage_backend)
# Sample documents for compression
sample_documents = [
{
'id': 'doc1',
'content': 'Long document content...' * 100,
'type': 'text',
'created_at': time.time() - 86400 * 5, # 5 days old
'access_count': 2,
'last_accessed': time.time() - 86400 * 3
},
{
'id': 'doc2',
'content': 'Another document...',
'type': 'conversation',
'embedding': np.random.rand(384).tolist(),
'created_at': time.time() - 86400 * 30, # 30 days old
'access_count': 0,
'last_accessed': time.time() - 86400 * 30
}
]
# Apply dynamic compression
decisions = dynamic_compressor.apply_dynamic_compression(sample_documents)
# Get statistics
stats = dynamic_compressor.get_compression_statistics()
print(f"Compression Statistics: {stats}")
# Optimize schedule
recommendations = dynamic_compressor.optimize_compression_schedule({
'max_compression_time_per_doc': 0.5
})
for rec in recommendations:
print(f"Recommendation: {rec['recommendation']}")Best Practices
Compression Strategy Selection
- Content Type Awareness: Choose compression based on data type (text, vectors, structured data)
- Access Pattern Analysis: Compress less frequently accessed data more aggressively
- Quality Requirements: Balance compression ratio with acceptable quality degradation
- Performance Constraints: Consider compression/decompression time in user-facing operations
Monitoring and Optimization
- Compression Ratio Tracking: Monitor space savings across different content types
- Quality Metrics: Continuously evaluate semantic preservation after compression
- Performance Impact: Measure compression/decompression latency impact on queries
- Cost-Benefit Analysis: Calculate storage cost savings vs. computational overhead
Production Implementation Guidelines
- Gradual Rollout: Implement compression incrementally with rollback capabilities
- Backup Strategy: Maintain uncompressed backups during initial deployment
- Monitoring: Set up alerts for compression failures and quality degradation
- Testing: Thoroughly test compression/decompression pipelines before production
This comprehensive memory compression benchmarking guide provides the tools and methodologies to optimize storage efficiency while maintaining system performance and data quality in agent memory systems.