Skip to Content

Serialization for Agent Memory

Efficient serialization is critical for agent memory systems, affecting performance, storage efficiency, and interoperability. This guide covers practical serialization strategies, format comparisons, and production-ready implementations.

Serialization Formats Comparison

JSON - Universal Compatibility

JSON provides excellent compatibility but with performance trade-offs for large datasets.

import json import gzip from typing import Dict, Any, List, Optional from datetime import datetime from dataclasses import dataclass, asdict import base64 @dataclass class MemoryEntry: id: str content: str memory_type: str importance: float timestamp: datetime embedding: Optional[List[float]] = None metadata: Optional[Dict[str, Any]] = None tags: Optional[List[str]] = None class JSONMemorySerializer: """JSON serialization with optimizations for memory data""" @staticmethod def serialize_memory(memory: MemoryEntry, compress: bool = True) -> bytes: """Serialize memory entry to JSON with optional compression""" # Convert to dict with proper datetime handling memory_dict = asdict(memory) memory_dict['timestamp'] = memory.timestamp.isoformat() # Convert to JSON json_str = json.dumps(memory_dict, separators=(',', ':')) # Compact format if compress: return gzip.compress(json_str.encode('utf-8')) else: return json_str.encode('utf-8') @staticmethod def deserialize_memory(data: bytes, compressed: bool = True) -> MemoryEntry: """Deserialize memory entry from JSON""" if compressed: json_str = gzip.decompress(data).decode('utf-8') else: json_str = data.decode('utf-8') memory_dict = json.loads(json_str) # Parse datetime memory_dict['timestamp'] = datetime.fromisoformat(memory_dict['timestamp']) return MemoryEntry(**memory_dict) @staticmethod def serialize_batch(memories: List[MemoryEntry], compress: bool = True) -> bytes: """Serialize multiple memories efficiently""" # Convert all memories to dicts memory_dicts = [] for memory in memories: memory_dict = asdict(memory) memory_dict['timestamp'] = memory.timestamp.isoformat() memory_dicts.append(memory_dict) # Batch JSON encoding json_str = json.dumps(memory_dicts, separators=(',', ':')) if compress: return gzip.compress(json_str.encode('utf-8')) else: return json_str.encode('utf-8') @staticmethod def deserialize_batch(data: bytes, compressed: bool = True) -> List[MemoryEntry]: """Deserialize multiple memories efficiently""" if compressed: json_str = gzip.decompress(data).decode('utf-8') else: json_str = data.decode('utf-8') memory_dicts = json.loads(json_str) memories = [] for memory_dict in memory_dicts: memory_dict['timestamp'] = datetime.fromisoformat(memory_dict['timestamp']) memories.append(MemoryEntry(**memory_dict)) return memories # Example usage with streaming for large datasets class StreamingJSONSerializer: """Stream-based JSON serialization for large memory collections""" def __init__(self, output_file: str, compress: bool = True): self.output_file = output_file self.compress = compress self.file_handle = None self.compressor = None def __enter__(self): if self.compress: self.file_handle = gzip.open(self.output_file, 'wt', encoding='utf-8') else: self.file_handle = open(self.output_file, 'w', encoding='utf-8') self.file_handle.write('[') # Start JSON array self.first_item = True return self def __exit__(self, exc_type, exc_val, exc_tb): self.file_handle.write(']') # Close JSON array self.file_handle.close() def write_memory(self, memory: MemoryEntry): """Write a single memory to the stream""" if not self.first_item: self.file_handle.write(',') memory_dict = asdict(memory) memory_dict['timestamp'] = memory.timestamp.isoformat() json.dump(memory_dict, self.file_handle, separators=(',', ':')) self.first_item = False # Performance optimization with ujson try: import ujson as fast_json JSON_ENCODER = fast_json except ImportError: JSON_ENCODER = json class OptimizedJSONSerializer: """High-performance JSON serialization using ujson""" @staticmethod def serialize_memory(memory: MemoryEntry) -> bytes: memory_dict = asdict(memory) memory_dict['timestamp'] = memory.timestamp.isoformat() return JSON_ENCODER.dumps(memory_dict).encode('utf-8') @staticmethod def deserialize_memory(data: bytes) -> MemoryEntry: memory_dict = JSON_ENCODER.loads(data.decode('utf-8')) memory_dict['timestamp'] = datetime.fromisoformat(memory_dict['timestamp']) return MemoryEntry(**memory_dict)

Protocol Buffers - High Performance

Protocol Buffers offer excellent performance and schema evolution capabilities.

# First, define the protobuf schema (memory.proto) """ syntax = "proto3"; package agent_memory; message MemoryEntry { string id = 1; string content = 2; string memory_type = 3; float importance = 4; int64 timestamp = 5; // Unix timestamp repeated float embedding = 6; map<string, string> metadata = 7; repeated string tags = 8; } message MemoryBatch { repeated MemoryEntry memories = 1; string batch_id = 2; int64 created_at = 3; } message MemoryIndex { string memory_id = 1; string memory_type = 2; float importance = 3; int64 timestamp = 4; repeated string tags = 5; } """ # Generate Python classes with: protoc --python_out=. memory.proto import memory_pb2 from google.protobuf.message import DecodeError import time from typing import List, Dict, Any class ProtobufMemorySerializer: """Protocol Buffers serialization for high performance""" @staticmethod def serialize_memory(memory: MemoryEntry) -> bytes: """Serialize memory entry to protobuf""" pb_memory = memory_pb2.MemoryEntry() pb_memory.id = memory.id pb_memory.content = memory.content pb_memory.memory_type = memory.memory_type pb_memory.importance = memory.importance pb_memory.timestamp = int(memory.timestamp.timestamp()) if memory.embedding: pb_memory.embedding[:] = memory.embedding if memory.metadata: for key, value in memory.metadata.items(): pb_memory.metadata[key] = str(value) if memory.tags: pb_memory.tags[:] = memory.tags return pb_memory.SerializeToString() @staticmethod def deserialize_memory(data: bytes) -> MemoryEntry: """Deserialize memory entry from protobuf""" pb_memory = memory_pb2.MemoryEntry() try: pb_memory.ParseFromString(data) except DecodeError as e: raise ValueError(f"Failed to parse protobuf data: {e}") # Convert metadata back to proper types metadata = {} for key, value in pb_memory.metadata.items(): # Try to convert back to appropriate Python types try: if value.isdigit(): metadata[key] = int(value) elif value.replace('.', '').isdigit(): metadata[key] = float(value) elif value.lower() in ('true', 'false'): metadata[key] = value.lower() == 'true' else: metadata[key] = value except: metadata[key] = value return MemoryEntry( id=pb_memory.id, content=pb_memory.content, memory_type=pb_memory.memory_type, importance=pb_memory.importance, timestamp=datetime.fromtimestamp(pb_memory.timestamp), embedding=list(pb_memory.embedding) if pb_memory.embedding else None, metadata=metadata if metadata else None, tags=list(pb_memory.tags) if pb_memory.tags else None ) @staticmethod def serialize_batch(memories: List[MemoryEntry], batch_id: str = None) -> bytes: """Serialize multiple memories in a batch""" pb_batch = memory_pb2.MemoryBatch() pb_batch.batch_id = batch_id or f"batch_{int(time.time())}" pb_batch.created_at = int(time.time()) for memory in memories: pb_memory = pb_batch.memories.add() pb_memory.id = memory.id pb_memory.content = memory.content pb_memory.memory_type = memory.memory_type pb_memory.importance = memory.importance pb_memory.timestamp = int(memory.timestamp.timestamp()) if memory.embedding: pb_memory.embedding[:] = memory.embedding if memory.metadata: for key, value in memory.metadata.items(): pb_memory.metadata[key] = str(value) if memory.tags: pb_memory.tags[:] = memory.tags return pb_batch.SerializeToString() @staticmethod def deserialize_batch(data: bytes) -> tuple[List[MemoryEntry], str]: """Deserialize batch of memories""" pb_batch = memory_pb2.MemoryBatch() pb_batch.ParseFromString(data) memories = [] for pb_memory in pb_batch.memories: # Convert metadata metadata = {} for key, value in pb_memory.metadata.items(): try: if value.isdigit(): metadata[key] = int(value) elif value.replace('.', '').isdigit(): metadata[key] = float(value) else: metadata[key] = value except: metadata[key] = value memory = MemoryEntry( id=pb_memory.id, content=pb_memory.content, memory_type=pb_memory.memory_type, importance=pb_memory.importance, timestamp=datetime.fromtimestamp(pb_memory.timestamp), embedding=list(pb_memory.embedding) if pb_memory.embedding else None, metadata=metadata if metadata else None, tags=list(pb_memory.tags) if pb_memory.tags else None ) memories.append(memory) return memories, pb_batch.batch_id # Schema evolution example class VersionedProtobufSerializer: """Handle schema evolution in protobuf serialization""" VERSION_HEADER = b'PROTOMEM' CURRENT_VERSION = 1 @staticmethod def serialize_with_version(memory: MemoryEntry) -> bytes: """Serialize with version header for future compatibility""" # Serialize the memory pb_data = ProtobufMemorySerializer.serialize_memory(memory) # Add version header version_header = VersionedProtobufSerializer.VERSION_HEADER version_bytes = VersionedProtobufSerializer.CURRENT_VERSION.to_bytes(4, 'big') return version_header + version_bytes + pb_data @staticmethod def deserialize_with_version(data: bytes) -> MemoryEntry: """Deserialize with version checking""" header_size = len(VersionedProtobufSerializer.VERSION_HEADER) version_size = 4 if len(data) < header_size + version_size: raise ValueError("Data too short to contain version header") # Check header header = data[:header_size] if header != VersionedProtobufSerializer.VERSION_HEADER: raise ValueError("Invalid version header") # Extract version version_bytes = data[header_size:header_size + version_size] version = int.from_bytes(version_bytes, 'big') # Extract protobuf data pb_data = data[header_size + version_size:] # Handle different versions if version == 1: return ProtobufMemorySerializer.deserialize_memory(pb_data) else: raise ValueError(f"Unsupported version: {version}")

MessagePack - Balanced Approach

MessagePack provides a good balance between performance and ease of use.

import msgpack import lz4.frame from typing import List, Dict, Any, Optional from datetime import datetime class MessagePackMemorySerializer: """MessagePack serialization with LZ4 compression""" @staticmethod def _prepare_for_serialization(obj: Any) -> Any: """Prepare object for MessagePack serialization""" if isinstance(obj, datetime): return {'__datetime__': obj.isoformat()} elif isinstance(obj, MemoryEntry): data = asdict(obj) data['timestamp'] = MessagePackMemorySerializer._prepare_for_serialization(data['timestamp']) return data elif isinstance(obj, dict): return {k: MessagePackMemorySerializer._prepare_for_serialization(v) for k, v in obj.items()} elif isinstance(obj, list): return [MessagePackMemorySerializer._prepare_for_serialization(item) for item in obj] else: return obj @staticmethod def _restore_from_serialization(obj: Any) -> Any: """Restore object from MessagePack deserialization""" if isinstance(obj, dict): if '__datetime__' in obj: return datetime.fromisoformat(obj['__datetime__']) else: return {k: MessagePackMemorySerializer._restore_from_serialization(v) for k, v in obj.items()} elif isinstance(obj, list): return [MessagePackMemorySerializer._restore_from_serialization(item) for item in obj] else: return obj @staticmethod def serialize_memory(memory: MemoryEntry, compress: bool = True) -> bytes: """Serialize memory entry using MessagePack""" # Prepare data prepared_data = MessagePackMemorySerializer._prepare_for_serialization(memory) # Pack with MessagePack packed_data = msgpack.packb(prepared_data, use_bin_type=True) # Optional compression if compress: return lz4.frame.compress(packed_data) else: return packed_data @staticmethod def deserialize_memory(data: bytes, compressed: bool = True) -> MemoryEntry: """Deserialize memory entry from MessagePack""" # Decompress if needed if compressed: packed_data = lz4.frame.decompress(data) else: packed_data = data # Unpack unpacked_data = msgpack.unpackb(packed_data, raw=False, strict_map_key=False) # Restore objects restored_data = MessagePackMemorySerializer._restore_from_serialization(unpacked_data) return MemoryEntry(**restored_data) @staticmethod def serialize_batch(memories: List[MemoryEntry], compress: bool = True) -> bytes: """Serialize batch of memories""" prepared_memories = [ MessagePackMemorySerializer._prepare_for_serialization(memory) for memory in memories ] packed_data = msgpack.packb(prepared_memories, use_bin_type=True) if compress: return lz4.frame.compress(packed_data) else: return packed_data @staticmethod def deserialize_batch(data: bytes, compressed: bool = True) -> List[MemoryEntry]: """Deserialize batch of memories""" if compressed: packed_data = lz4.frame.decompress(data) else: packed_data = data unpacked_data = msgpack.unpackb(packed_data, raw=False, strict_map_key=False) memories = [] for memory_data in unpacked_data: restored_data = MessagePackMemorySerializer._restore_from_serialization(memory_data) memories.append(MemoryEntry(**restored_data)) return memories # Streaming MessagePack for large datasets class StreamingMessagePackSerializer: """Stream-based MessagePack serialization""" def __init__(self, output_file: str, compress: bool = True): self.output_file = output_file self.compress = compress self.packer = msgpack.Packer(use_bin_type=True) self.file_handle = None def __enter__(self): if self.compress: import lz4.frame self.file_handle = lz4.frame.open(self.output_file, mode='wb') else: self.file_handle = open(self.output_file, 'wb') return self def __exit__(self, exc_type, exc_val, exc_tb): self.file_handle.close() def write_memory(self, memory: MemoryEntry): """Write a single memory to the stream""" prepared_data = MessagePackMemorySerializer._prepare_for_serialization(memory) packed_data = self.packer.pack(prepared_data) self.file_handle.write(packed_data) def read_memories(input_file: str, compressed: bool = True): """Generator to read memories from stream""" unpacker = msgpack.Unpacker(raw=False, strict_map_key=False) if compressed: import lz4.frame file_handle = lz4.frame.open(input_file, mode='rb') else: file_handle = open(input_file, 'rb') try: while True: chunk = file_handle.read(1024) if not chunk: break unpacker.feed(chunk) for unpacked_data in unpacker: restored_data = MessagePackMemorySerializer._restore_from_serialization(unpacked_data) yield MemoryEntry(**restored_data) finally: file_handle.close()

Vector Embedding Serialization

Efficient Embedding Storage

import numpy as np import struct from typing import List, Dict, Any, Tuple import zlib class EmbeddingSerializer: """Specialized serialization for vector embeddings""" @staticmethod def serialize_embedding_float32(embedding: List[float]) -> bytes: """Serialize embedding as float32 array (4 bytes per dimension)""" np_array = np.array(embedding, dtype=np.float32) return np_array.tobytes() @staticmethod def deserialize_embedding_float32(data: bytes, dimension: int) -> List[float]: """Deserialize float32 embedding""" np_array = np.frombuffer(data, dtype=np.float32) if len(np_array) != dimension: raise ValueError(f"Expected {dimension} dimensions, got {len(np_array)}") return np_array.tolist() @staticmethod def serialize_embedding_float16(embedding: List[float]) -> bytes: """Serialize embedding as float16 array (2 bytes per dimension)""" np_array = np.array(embedding, dtype=np.float16) return np_array.tobytes() @staticmethod def deserialize_embedding_float16(data: bytes, dimension: int) -> List[float]: """Deserialize float16 embedding (lower precision, smaller size)""" np_array = np.frombuffer(data, dtype=np.float16) if len(np_array) != dimension: raise ValueError(f"Expected {dimension} dimensions, got {len(np_array)}") return np_array.astype(np.float32).tolist() @staticmethod def serialize_embedding_compressed(embedding: List[float], dtype: str = 'float32', compression_level: int = 6) -> bytes: """Serialize embedding with compression""" if dtype == 'float32': np_array = np.array(embedding, dtype=np.float32) elif dtype == 'float16': np_array = np.array(embedding, dtype=np.float16) else: raise ValueError("dtype must be 'float32' or 'float16'") raw_bytes = np_array.tobytes() compressed_bytes = zlib.compress(raw_bytes, compression_level) # Add metadata header: dtype(1 byte) + original_length(4 bytes) dtype_byte = 0 if dtype == 'float32' else 1 header = struct.pack('BI', dtype_byte, len(raw_bytes)) return header + compressed_bytes @staticmethod def deserialize_embedding_compressed(data: bytes) -> List[float]: """Deserialize compressed embedding""" if len(data) < 5: raise ValueError("Data too short for compressed embedding") # Extract header dtype_byte, original_length = struct.unpack('BI', data[:5]) dtype = 'float32' if dtype_byte == 0 else 'float16' # Decompress compressed_data = data[5:] raw_bytes = zlib.decompress(compressed_data) if len(raw_bytes) != original_length: raise ValueError("Decompressed data length mismatch") # Convert back to array if dtype == 'float32': np_array = np.frombuffer(raw_bytes, dtype=np.float32) else: np_array = np.frombuffer(raw_bytes, dtype=np.float16).astype(np.float32) return np_array.tolist() class BatchEmbeddingSerializer: """Efficient batch serialization for multiple embeddings""" @staticmethod def serialize_embedding_batch(embeddings: List[List[float]], dtype: str = 'float32') -> bytes: """Serialize multiple embeddings efficiently""" if not embeddings: return b'' # Check dimension consistency dimension = len(embeddings[0]) if not all(len(emb) == dimension for emb in embeddings): raise ValueError("All embeddings must have the same dimension") # Convert to numpy array for efficient serialization if dtype == 'float32': np_array = np.array(embeddings, dtype=np.float32) elif dtype == 'float16': np_array = np.array(embeddings, dtype=np.float16) else: raise ValueError("dtype must be 'float32' or 'float16'") # Create header: dtype(1) + count(4) + dimension(4) dtype_byte = 0 if dtype == 'float32' else 1 header = struct.pack('BII', dtype_byte, len(embeddings), dimension) return header + np_array.tobytes() @staticmethod def deserialize_embedding_batch(data: bytes) -> List[List[float]]: """Deserialize multiple embeddings""" if len(data) < 9: raise ValueError("Data too short for embedding batch") # Extract header dtype_byte, count, dimension = struct.unpack('BII', data[:9]) dtype = np.float32 if dtype_byte == 0 else np.float16 # Extract embedding data embedding_data = data[9:] np_array = np.frombuffer(embedding_data, dtype=dtype) # Reshape to (count, dimension) expected_size = count * dimension if len(np_array) != expected_size: raise ValueError(f"Expected {expected_size} values, got {len(np_array)}") reshaped = np_array.reshape(count, dimension) # Convert to float32 if needed and return as list if dtype == np.float16: reshaped = reshaped.astype(np.float32) return reshaped.tolist() # Quantization for even smaller embeddings class QuantizedEmbeddingSerializer: """Quantized embedding serialization for extreme size reduction""" @staticmethod def serialize_embedding_quantized(embedding: List[float], bits: int = 8) -> bytes: """Quantize embedding to specified bit depth""" if bits not in [4, 8]: raise ValueError("Only 4-bit and 8-bit quantization supported") np_array = np.array(embedding, dtype=np.float32) # Find min/max for quantization min_val = np_array.min() max_val = np_array.max() # Quantize if bits == 8: scale = 255.0 / (max_val - min_val) if max_val != min_val else 1.0 quantized = np.clip((np_array - min_val) * scale, 0, 255).astype(np.uint8) quantized_bytes = quantized.tobytes() else: # 4-bit scale = 15.0 / (max_val - min_val) if max_val != min_val else 1.0 quantized = np.clip((np_array - min_val) * scale, 0, 15).astype(np.uint8) # Pack two 4-bit values into one byte packed = [] for i in range(0, len(quantized), 2): if i + 1 < len(quantized): packed_byte = (quantized[i] << 4) | quantized[i + 1] else: packed_byte = quantized[i] << 4 packed.append(packed_byte) quantized_bytes = bytes(packed) # Create header: bits(1) + dimension(4) + min_val(4) + max_val(4) header = struct.pack('Biff', bits, len(embedding), min_val, max_val) return header + quantized_bytes @staticmethod def deserialize_embedding_quantized(data: bytes) -> List[float]: """Deserialize quantized embedding""" if len(data) < 13: raise ValueError("Data too short for quantized embedding") # Extract header bits, dimension, min_val, max_val = struct.unpack('Biff', data[:13]) quantized_data = data[13:] if bits == 8: quantized = np.frombuffer(quantized_data, dtype=np.uint8) scale = (max_val - min_val) / 255.0 if max_val != min_val else 0.0 restored = quantized.astype(np.float32) * scale + min_val else: # 4-bit # Unpack 4-bit values packed_bytes = np.frombuffer(quantized_data, dtype=np.uint8) quantized = [] for packed_byte in packed_bytes: high = (packed_byte >> 4) & 0xF low = packed_byte & 0xF quantized.extend([high, low]) # Trim to original dimension quantized = np.array(quantized[:dimension], dtype=np.float32) scale = (max_val - min_val) / 15.0 if max_val != min_val else 0.0 restored = quantized * scale + min_val return restored.tolist()

Performance Benchmarking

Serialization Benchmark Suite

import time import psutil import sys from typing import List, Callable, Dict, Any from datetime import datetime, timedelta import random import string class SerializationBenchmark: """Comprehensive benchmarking for serialization methods""" def __init__(self): self.serializers = { 'json': (JSONMemorySerializer.serialize_memory, JSONMemorySerializer.deserialize_memory), 'json_compressed': ( lambda m: JSONMemorySerializer.serialize_memory(m, compress=True), lambda d: JSONMemorySerializer.deserialize_memory(d, compressed=True) ), 'protobuf': (ProtobufMemorySerializer.serialize_memory, ProtobufMemorySerializer.deserialize_memory), 'messagepack': (MessagePackMemorySerializer.serialize_memory, MessagePackMemorySerializer.deserialize_memory), 'messagepack_compressed': ( lambda m: MessagePackMemorySerializer.serialize_memory(m, compress=True), lambda d: MessagePackMemorySerializer.deserialize_memory(d, compressed=True) ) } def generate_test_memory(self, embedding_size: int = 384, content_length: int = 500) -> MemoryEntry: """Generate a test memory entry""" return MemoryEntry( id=''.join(random.choices(string.ascii_letters + string.digits, k=16)), content=''.join(random.choices(string.ascii_letters + ' ', k=content_length)), memory_type=random.choice(['episodic', 'semantic', 'working', 'procedural']), importance=random.random(), timestamp=datetime.now() - timedelta(seconds=random.randint(0, 86400)), embedding=[random.random() for _ in range(embedding_size)], metadata={ 'session_id': ''.join(random.choices(string.ascii_letters, k=8)), 'user_id': random.randint(1, 10000), 'source': random.choice(['chat', 'document', 'search']), 'confidence': random.random() }, tags=[random.choice(['important', 'work', 'personal', 'research']) for _ in range(random.randint(1, 4))] ) def benchmark_single_memory(self, memory: MemoryEntry, iterations: int = 1000) -> Dict[str, Dict[str, float]]: """Benchmark serialization/deserialization of a single memory""" results = {} for name, (serialize_func, deserialize_func) in self.serializers.items(): # Warm up for _ in range(10): data = serialize_func(memory) deserialize_func(data) # Measure serialization start_time = time.perf_counter() start_memory = psutil.Process().memory_info().rss serialized_data = None for _ in range(iterations): serialized_data = serialize_func(memory) serialize_time = (time.perf_counter() - start_time) / iterations serialize_memory = psutil.Process().memory_info().rss - start_memory # Measure deserialization start_time = time.perf_counter() start_memory = psutil.Process().memory_info().rss for _ in range(iterations): deserialize_func(serialized_data) deserialize_time = (time.perf_counter() - start_time) / iterations deserialize_memory = psutil.Process().memory_info().rss - start_memory results[name] = { 'serialize_time_ms': serialize_time * 1000, 'deserialize_time_ms': deserialize_time * 1000, 'serialized_size_bytes': len(serialized_data), 'serialize_memory_mb': serialize_memory / (1024 * 1024), 'deserialize_memory_mb': deserialize_memory / (1024 * 1024) } return results def benchmark_batch_operations(self, batch_size: int = 100, iterations: int = 100) -> Dict[str, Dict[str, float]]: """Benchmark batch serialization operations""" # Generate test batch memories = [self.generate_test_memory() for _ in range(batch_size)] batch_serializers = { 'json': (JSONMemorySerializer.serialize_batch, JSONMemorySerializer.deserialize_batch), 'json_compressed': ( lambda m: JSONMemorySerializer.serialize_batch(m, compress=True), lambda d: JSONMemorySerializer.deserialize_batch(d, compressed=True) ), 'protobuf': (ProtobufMemorySerializer.serialize_batch, lambda d: ProtobufMemorySerializer.deserialize_batch(d)[0]), 'messagepack': (MessagePackMemorySerializer.serialize_batch, MessagePackMemorySerializer.deserialize_batch), 'messagepack_compressed': ( lambda m: MessagePackMemorySerializer.serialize_batch(m, compress=True), lambda d: MessagePackMemorySerializer.deserialize_batch(d, compressed=True) ) } results = {} for name, (serialize_func, deserialize_func) in batch_serializers.items(): try: # Warm up for _ in range(5): data = serialize_func(memories) deserialize_func(data) # Measure serialization start_time = time.perf_counter() serialized_data = None for _ in range(iterations): serialized_data = serialize_func(memories) serialize_time = (time.perf_counter() - start_time) / iterations # Measure deserialization start_time = time.perf_counter() for _ in range(iterations): deserialize_func(serialized_data) deserialize_time = (time.perf_counter() - start_time) / iterations results[name] = { 'batch_serialize_time_ms': serialize_time * 1000, 'batch_deserialize_time_ms': deserialize_time * 1000, 'batch_size_bytes': len(serialized_data), 'bytes_per_memory': len(serialized_data) / batch_size, 'memories_per_second_serialize': batch_size / serialize_time, 'memories_per_second_deserialize': batch_size / deserialize_time } except Exception as e: results[name] = {'error': str(e)} return results def run_comprehensive_benchmark(self) -> Dict[str, Any]: """Run complete benchmark suite""" print("Starting comprehensive serialization benchmark...") # Test different memory configurations test_configs = [ {'embedding_size': 384, 'content_length': 100, 'name': 'small_memory'}, {'embedding_size': 384, 'content_length': 1000, 'name': 'medium_memory'}, {'embedding_size': 1536, 'content_length': 2000, 'name': 'large_memory'} ] benchmark_results = {} for config in test_configs: print(f"Testing {config['name']}...") test_memory = self.generate_test_memory( embedding_size=config['embedding_size'], content_length=config['content_length'] ) # Single memory benchmark single_results = self.benchmark_single_memory(test_memory) # Batch benchmark batch_results = self.benchmark_batch_operations(batch_size=100) benchmark_results[config['name']] = { 'config': config, 'single_memory': single_results, 'batch_operations': batch_results } return benchmark_results def print_results(self, results: Dict[str, Any]): """Print benchmark results in a readable format""" for test_name, test_results in results.items(): print(f"\n=== {test_name.upper()} ===") print(f"Config: {test_results['config']}") print("\nSingle Memory Performance:") print(f"{'Format':<20} {'Serialize (ms)':<15} {'Deserialize (ms)':<18} {'Size (bytes)':<12} {'Compression %':<15}") print("-" * 85) # Calculate baseline size for compression ratio json_size = test_results['single_memory']['json']['serialized_size_bytes'] for format_name, metrics in test_results['single_memory'].items(): if 'error' not in metrics: compression_ratio = (1 - metrics['serialized_size_bytes'] / json_size) * 100 print(f"{format_name:<20} {metrics['serialize_time_ms']:<15.3f} {metrics['deserialize_time_ms']:<18.3f} " f"{metrics['serialized_size_bytes']:<12} {compression_ratio:<15.1f}%") print("\nBatch Operations Performance (100 memories):") print(f"{'Format':<20} {'Batch Ser. (ms)':<18} {'Batch Deser. (ms)':<20} {'Mem/sec Ser.':<15} {'Mem/sec Deser.':<15}") print("-" * 95) for format_name, metrics in test_results['batch_operations'].items(): if 'error' not in metrics: print(f"{format_name:<20} {metrics['batch_serialize_time_ms']:<18.3f} {metrics['batch_deserialize_time_ms']:<20.3f} " f"{metrics['memories_per_second_serialize']:<15.0f} {metrics['memories_per_second_deserialize']:<15.0f}") # Example usage if __name__ == "__main__": benchmark = SerializationBenchmark() results = benchmark.run_comprehensive_benchmark() benchmark.print_results(results)

Production Patterns

Adaptive Serialization Strategy

from typing import Protocol, Union from enum import Enum import logging class SerializationFormat(Enum): JSON = "json" JSON_COMPRESSED = "json_compressed" PROTOBUF = "protobuf" MESSAGEPACK = "messagepack" MESSAGEPACK_COMPRESSED = "messagepack_compressed" class SerializationContext: """Context information for choosing serialization strategy""" def __init__(self, data_size_bytes: int = 0, frequency_hz: float = 0.0, latency_sensitive: bool = False, storage_sensitive: bool = False, network_transfer: bool = False): self.data_size_bytes = data_size_bytes self.frequency_hz = frequency_hz self.latency_sensitive = latency_sensitive self.storage_sensitive = storage_sensitive self.network_transfer = network_transfer class AdaptiveSerializer: """Automatically choose the best serialization format based on context""" def __init__(self): self.logger = logging.getLogger(__name__) # Performance characteristics from benchmarking self.format_characteristics = { SerializationFormat.JSON: { 'speed_score': 3, # 1-5 scale 'size_score': 1, 'compatibility_score': 5, 'cpu_overhead': 'medium' }, SerializationFormat.JSON_COMPRESSED: { 'speed_score': 2, 'size_score': 4, 'compatibility_score': 5, 'cpu_overhead': 'high' }, SerializationFormat.PROTOBUF: { 'speed_score': 5, 'size_score': 4, 'compatibility_score': 3, 'cpu_overhead': 'low' }, SerializationFormat.MESSAGEPACK: { 'speed_score': 4, 'size_score': 3, 'compatibility_score': 4, 'cpu_overhead': 'medium' }, SerializationFormat.MESSAGEPACK_COMPRESSED: { 'speed_score': 3, 'size_score': 5, 'compatibility_score': 4, 'cpu_overhead': 'high' } } def choose_format(self, context: SerializationContext) -> SerializationFormat: """Choose optimal serialization format based on context""" scores = {} for format_enum, characteristics in self.format_characteristics.items(): score = 0 # Latency sensitivity - prefer faster formats if context.latency_sensitive: score += characteristics['speed_score'] * 3 # Storage sensitivity - prefer smaller formats if context.storage_sensitive: score += characteristics['size_score'] * 3 # Network transfer - balance speed and size if context.network_transfer: score += characteristics['speed_score'] * 2 score += characteristics['size_score'] * 2 # High frequency operations - prefer speed if context.frequency_hz > 100: score += characteristics['speed_score'] * 2 # Large data size - prefer compression if context.data_size_bytes > 10000: score += characteristics['size_score'] * 2 scores[format_enum] = score # Choose format with highest score best_format = max(scores.items(), key=lambda x: x[1])[0] self.logger.info(f"Chose {best_format.value} serialization (score: {scores[best_format]})") return best_format def serialize(self, memory: MemoryEntry, context: SerializationContext) -> tuple[bytes, SerializationFormat]: """Serialize memory with optimal format""" format_chosen = self.choose_format(context) if format_chosen == SerializationFormat.JSON: data = JSONMemorySerializer.serialize_memory(memory, compress=False) elif format_chosen == SerializationFormat.JSON_COMPRESSED: data = JSONMemorySerializer.serialize_memory(memory, compress=True) elif format_chosen == SerializationFormat.PROTOBUF: data = ProtobufMemorySerializer.serialize_memory(memory) elif format_chosen == SerializationFormat.MESSAGEPACK: data = MessagePackMemorySerializer.serialize_memory(memory, compress=False) elif format_chosen == SerializationFormat.MESSAGEPACK_COMPRESSED: data = MessagePackMemorySerializer.serialize_memory(memory, compress=True) else: raise ValueError(f"Unsupported format: {format_chosen}") return data, format_chosen def deserialize(self, data: bytes, format_used: SerializationFormat) -> MemoryEntry: """Deserialize memory using the specified format""" if format_used == SerializationFormat.JSON: return JSONMemorySerializer.deserialize_memory(data, compressed=False) elif format_used == SerializationFormat.JSON_COMPRESSED: return JSONMemorySerializer.deserialize_memory(data, compressed=True) elif format_used == SerializationFormat.PROTOBUF: return ProtobufMemorySerializer.deserialize_memory(data) elif format_used == SerializationFormat.MESSAGEPACK: return MessagePackMemorySerializer.deserialize_memory(data, compressed=False) elif format_used == SerializationFormat.MESSAGEPACK_COMPRESSED: return MessagePackMemorySerializer.deserialize_memory(data, compressed=True) else: raise ValueError(f"Unsupported format: {format_used}") # Production memory store with adaptive serialization class AdaptiveMemoryStore: """Memory store that adapts serialization based on usage patterns""" def __init__(self, base_store): self.base_store = base_store self.adaptive_serializer = AdaptiveSerializer() self.usage_stats = {} self.logger = logging.getLogger(__name__) def store_memory(self, memory: MemoryEntry, **kwargs) -> str: """Store memory with adaptive serialization""" # Analyze context context = self._analyze_storage_context(memory) # Serialize with optimal format serialized_data, format_used = self.adaptive_serializer.serialize(memory, context) # Store with format metadata storage_metadata = { 'serialization_format': format_used.value, 'original_size': len(memory.content), 'serialized_size': len(serialized_data), **kwargs.get('metadata', {}) } # Use base store (which should handle raw bytes and metadata) return self.base_store.store_raw( memory_id=memory.id, data=serialized_data, metadata=storage_metadata ) def retrieve_memory(self, memory_id: str) -> Optional[MemoryEntry]: """Retrieve and deserialize memory""" raw_data, metadata = self.base_store.retrieve_raw(memory_id) if not raw_data: return None # Get serialization format from metadata format_str = metadata.get('serialization_format', 'json') try: format_enum = SerializationFormat(format_str) except ValueError: self.logger.warning(f"Unknown serialization format: {format_str}, falling back to JSON") format_enum = SerializationFormat.JSON # Deserialize try: return self.adaptive_serializer.deserialize(raw_data, format_enum) except Exception as e: self.logger.error(f"Failed to deserialize memory {memory_id}: {e}") return None def _analyze_storage_context(self, memory: MemoryEntry) -> SerializationContext: """Analyze memory characteristics to determine optimal serialization""" # Estimate serialized size (rough approximation) estimated_size = len(memory.content) + (len(memory.embedding) * 4 if memory.embedding else 0) + 500 # Check if this is a high-frequency memory type memory_type_stats = self.usage_stats.get(memory.memory_type, {'count': 0, 'avg_frequency': 0}) frequency = memory_type_stats.get('avg_frequency', 0) return SerializationContext( data_size_bytes=estimated_size, frequency_hz=frequency, latency_sensitive=memory.memory_type == 'working', storage_sensitive=estimated_size > 5000, network_transfer=True # Assume network storage )

This comprehensive serialization guide provides practical implementations for all major serialization formats, performance benchmarking tools, and adaptive strategies for production agent memory systems. The code examples are production-ready and include proper error handling, optimization techniques, and monitoring capabilities.