Serialization for Agent Memory
Efficient serialization is critical for agent memory systems, affecting performance, storage efficiency, and interoperability. This guide covers practical serialization strategies, format comparisons, and production-ready implementations.
Serialization Formats Comparison
JSON - Universal Compatibility
JSON provides excellent compatibility but with performance trade-offs for large datasets.
import json
import gzip
from typing import Dict, Any, List, Optional
from datetime import datetime
from dataclasses import dataclass, asdict
import base64
@dataclass
class MemoryEntry:
id: str
content: str
memory_type: str
importance: float
timestamp: datetime
embedding: Optional[List[float]] = None
metadata: Optional[Dict[str, Any]] = None
tags: Optional[List[str]] = None
class JSONMemorySerializer:
"""JSON serialization with optimizations for memory data"""
@staticmethod
def serialize_memory(memory: MemoryEntry, compress: bool = True) -> bytes:
"""Serialize memory entry to JSON with optional compression"""
# Convert to dict with proper datetime handling
memory_dict = asdict(memory)
memory_dict['timestamp'] = memory.timestamp.isoformat()
# Convert to JSON
json_str = json.dumps(memory_dict, separators=(',', ':')) # Compact format
if compress:
return gzip.compress(json_str.encode('utf-8'))
else:
return json_str.encode('utf-8')
@staticmethod
def deserialize_memory(data: bytes, compressed: bool = True) -> MemoryEntry:
"""Deserialize memory entry from JSON"""
if compressed:
json_str = gzip.decompress(data).decode('utf-8')
else:
json_str = data.decode('utf-8')
memory_dict = json.loads(json_str)
# Parse datetime
memory_dict['timestamp'] = datetime.fromisoformat(memory_dict['timestamp'])
return MemoryEntry(**memory_dict)
@staticmethod
def serialize_batch(memories: List[MemoryEntry], compress: bool = True) -> bytes:
"""Serialize multiple memories efficiently"""
# Convert all memories to dicts
memory_dicts = []
for memory in memories:
memory_dict = asdict(memory)
memory_dict['timestamp'] = memory.timestamp.isoformat()
memory_dicts.append(memory_dict)
# Batch JSON encoding
json_str = json.dumps(memory_dicts, separators=(',', ':'))
if compress:
return gzip.compress(json_str.encode('utf-8'))
else:
return json_str.encode('utf-8')
@staticmethod
def deserialize_batch(data: bytes, compressed: bool = True) -> List[MemoryEntry]:
"""Deserialize multiple memories efficiently"""
if compressed:
json_str = gzip.decompress(data).decode('utf-8')
else:
json_str = data.decode('utf-8')
memory_dicts = json.loads(json_str)
memories = []
for memory_dict in memory_dicts:
memory_dict['timestamp'] = datetime.fromisoformat(memory_dict['timestamp'])
memories.append(MemoryEntry(**memory_dict))
return memories
# Example usage with streaming for large datasets
class StreamingJSONSerializer:
"""Stream-based JSON serialization for large memory collections"""
def __init__(self, output_file: str, compress: bool = True):
self.output_file = output_file
self.compress = compress
self.file_handle = None
self.compressor = None
def __enter__(self):
if self.compress:
self.file_handle = gzip.open(self.output_file, 'wt', encoding='utf-8')
else:
self.file_handle = open(self.output_file, 'w', encoding='utf-8')
self.file_handle.write('[') # Start JSON array
self.first_item = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.file_handle.write(']') # Close JSON array
self.file_handle.close()
def write_memory(self, memory: MemoryEntry):
"""Write a single memory to the stream"""
if not self.first_item:
self.file_handle.write(',')
memory_dict = asdict(memory)
memory_dict['timestamp'] = memory.timestamp.isoformat()
json.dump(memory_dict, self.file_handle, separators=(',', ':'))
self.first_item = False
# Performance optimization with ujson
try:
import ujson as fast_json
JSON_ENCODER = fast_json
except ImportError:
JSON_ENCODER = json
class OptimizedJSONSerializer:
"""High-performance JSON serialization using ujson"""
@staticmethod
def serialize_memory(memory: MemoryEntry) -> bytes:
memory_dict = asdict(memory)
memory_dict['timestamp'] = memory.timestamp.isoformat()
return JSON_ENCODER.dumps(memory_dict).encode('utf-8')
@staticmethod
def deserialize_memory(data: bytes) -> MemoryEntry:
memory_dict = JSON_ENCODER.loads(data.decode('utf-8'))
memory_dict['timestamp'] = datetime.fromisoformat(memory_dict['timestamp'])
return MemoryEntry(**memory_dict)Protocol Buffers - High Performance
Protocol Buffers offer excellent performance and schema evolution capabilities.
# First, define the protobuf schema (memory.proto)
"""
syntax = "proto3";
package agent_memory;
message MemoryEntry {
string id = 1;
string content = 2;
string memory_type = 3;
float importance = 4;
int64 timestamp = 5; // Unix timestamp
repeated float embedding = 6;
map<string, string> metadata = 7;
repeated string tags = 8;
}
message MemoryBatch {
repeated MemoryEntry memories = 1;
string batch_id = 2;
int64 created_at = 3;
}
message MemoryIndex {
string memory_id = 1;
string memory_type = 2;
float importance = 3;
int64 timestamp = 4;
repeated string tags = 5;
}
"""
# Generate Python classes with: protoc --python_out=. memory.proto
import memory_pb2
from google.protobuf.message import DecodeError
import time
from typing import List, Dict, Any
class ProtobufMemorySerializer:
"""Protocol Buffers serialization for high performance"""
@staticmethod
def serialize_memory(memory: MemoryEntry) -> bytes:
"""Serialize memory entry to protobuf"""
pb_memory = memory_pb2.MemoryEntry()
pb_memory.id = memory.id
pb_memory.content = memory.content
pb_memory.memory_type = memory.memory_type
pb_memory.importance = memory.importance
pb_memory.timestamp = int(memory.timestamp.timestamp())
if memory.embedding:
pb_memory.embedding[:] = memory.embedding
if memory.metadata:
for key, value in memory.metadata.items():
pb_memory.metadata[key] = str(value)
if memory.tags:
pb_memory.tags[:] = memory.tags
return pb_memory.SerializeToString()
@staticmethod
def deserialize_memory(data: bytes) -> MemoryEntry:
"""Deserialize memory entry from protobuf"""
pb_memory = memory_pb2.MemoryEntry()
try:
pb_memory.ParseFromString(data)
except DecodeError as e:
raise ValueError(f"Failed to parse protobuf data: {e}")
# Convert metadata back to proper types
metadata = {}
for key, value in pb_memory.metadata.items():
# Try to convert back to appropriate Python types
try:
if value.isdigit():
metadata[key] = int(value)
elif value.replace('.', '').isdigit():
metadata[key] = float(value)
elif value.lower() in ('true', 'false'):
metadata[key] = value.lower() == 'true'
else:
metadata[key] = value
except:
metadata[key] = value
return MemoryEntry(
id=pb_memory.id,
content=pb_memory.content,
memory_type=pb_memory.memory_type,
importance=pb_memory.importance,
timestamp=datetime.fromtimestamp(pb_memory.timestamp),
embedding=list(pb_memory.embedding) if pb_memory.embedding else None,
metadata=metadata if metadata else None,
tags=list(pb_memory.tags) if pb_memory.tags else None
)
@staticmethod
def serialize_batch(memories: List[MemoryEntry], batch_id: str = None) -> bytes:
"""Serialize multiple memories in a batch"""
pb_batch = memory_pb2.MemoryBatch()
pb_batch.batch_id = batch_id or f"batch_{int(time.time())}"
pb_batch.created_at = int(time.time())
for memory in memories:
pb_memory = pb_batch.memories.add()
pb_memory.id = memory.id
pb_memory.content = memory.content
pb_memory.memory_type = memory.memory_type
pb_memory.importance = memory.importance
pb_memory.timestamp = int(memory.timestamp.timestamp())
if memory.embedding:
pb_memory.embedding[:] = memory.embedding
if memory.metadata:
for key, value in memory.metadata.items():
pb_memory.metadata[key] = str(value)
if memory.tags:
pb_memory.tags[:] = memory.tags
return pb_batch.SerializeToString()
@staticmethod
def deserialize_batch(data: bytes) -> tuple[List[MemoryEntry], str]:
"""Deserialize batch of memories"""
pb_batch = memory_pb2.MemoryBatch()
pb_batch.ParseFromString(data)
memories = []
for pb_memory in pb_batch.memories:
# Convert metadata
metadata = {}
for key, value in pb_memory.metadata.items():
try:
if value.isdigit():
metadata[key] = int(value)
elif value.replace('.', '').isdigit():
metadata[key] = float(value)
else:
metadata[key] = value
except:
metadata[key] = value
memory = MemoryEntry(
id=pb_memory.id,
content=pb_memory.content,
memory_type=pb_memory.memory_type,
importance=pb_memory.importance,
timestamp=datetime.fromtimestamp(pb_memory.timestamp),
embedding=list(pb_memory.embedding) if pb_memory.embedding else None,
metadata=metadata if metadata else None,
tags=list(pb_memory.tags) if pb_memory.tags else None
)
memories.append(memory)
return memories, pb_batch.batch_id
# Schema evolution example
class VersionedProtobufSerializer:
"""Handle schema evolution in protobuf serialization"""
VERSION_HEADER = b'PROTOMEM'
CURRENT_VERSION = 1
@staticmethod
def serialize_with_version(memory: MemoryEntry) -> bytes:
"""Serialize with version header for future compatibility"""
# Serialize the memory
pb_data = ProtobufMemorySerializer.serialize_memory(memory)
# Add version header
version_header = VersionedProtobufSerializer.VERSION_HEADER
version_bytes = VersionedProtobufSerializer.CURRENT_VERSION.to_bytes(4, 'big')
return version_header + version_bytes + pb_data
@staticmethod
def deserialize_with_version(data: bytes) -> MemoryEntry:
"""Deserialize with version checking"""
header_size = len(VersionedProtobufSerializer.VERSION_HEADER)
version_size = 4
if len(data) < header_size + version_size:
raise ValueError("Data too short to contain version header")
# Check header
header = data[:header_size]
if header != VersionedProtobufSerializer.VERSION_HEADER:
raise ValueError("Invalid version header")
# Extract version
version_bytes = data[header_size:header_size + version_size]
version = int.from_bytes(version_bytes, 'big')
# Extract protobuf data
pb_data = data[header_size + version_size:]
# Handle different versions
if version == 1:
return ProtobufMemorySerializer.deserialize_memory(pb_data)
else:
raise ValueError(f"Unsupported version: {version}")MessagePack - Balanced Approach
MessagePack provides a good balance between performance and ease of use.
import msgpack
import lz4.frame
from typing import List, Dict, Any, Optional
from datetime import datetime
class MessagePackMemorySerializer:
"""MessagePack serialization with LZ4 compression"""
@staticmethod
def _prepare_for_serialization(obj: Any) -> Any:
"""Prepare object for MessagePack serialization"""
if isinstance(obj, datetime):
return {'__datetime__': obj.isoformat()}
elif isinstance(obj, MemoryEntry):
data = asdict(obj)
data['timestamp'] = MessagePackMemorySerializer._prepare_for_serialization(data['timestamp'])
return data
elif isinstance(obj, dict):
return {k: MessagePackMemorySerializer._prepare_for_serialization(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [MessagePackMemorySerializer._prepare_for_serialization(item) for item in obj]
else:
return obj
@staticmethod
def _restore_from_serialization(obj: Any) -> Any:
"""Restore object from MessagePack deserialization"""
if isinstance(obj, dict):
if '__datetime__' in obj:
return datetime.fromisoformat(obj['__datetime__'])
else:
return {k: MessagePackMemorySerializer._restore_from_serialization(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [MessagePackMemorySerializer._restore_from_serialization(item) for item in obj]
else:
return obj
@staticmethod
def serialize_memory(memory: MemoryEntry, compress: bool = True) -> bytes:
"""Serialize memory entry using MessagePack"""
# Prepare data
prepared_data = MessagePackMemorySerializer._prepare_for_serialization(memory)
# Pack with MessagePack
packed_data = msgpack.packb(prepared_data, use_bin_type=True)
# Optional compression
if compress:
return lz4.frame.compress(packed_data)
else:
return packed_data
@staticmethod
def deserialize_memory(data: bytes, compressed: bool = True) -> MemoryEntry:
"""Deserialize memory entry from MessagePack"""
# Decompress if needed
if compressed:
packed_data = lz4.frame.decompress(data)
else:
packed_data = data
# Unpack
unpacked_data = msgpack.unpackb(packed_data, raw=False, strict_map_key=False)
# Restore objects
restored_data = MessagePackMemorySerializer._restore_from_serialization(unpacked_data)
return MemoryEntry(**restored_data)
@staticmethod
def serialize_batch(memories: List[MemoryEntry], compress: bool = True) -> bytes:
"""Serialize batch of memories"""
prepared_memories = [
MessagePackMemorySerializer._prepare_for_serialization(memory)
for memory in memories
]
packed_data = msgpack.packb(prepared_memories, use_bin_type=True)
if compress:
return lz4.frame.compress(packed_data)
else:
return packed_data
@staticmethod
def deserialize_batch(data: bytes, compressed: bool = True) -> List[MemoryEntry]:
"""Deserialize batch of memories"""
if compressed:
packed_data = lz4.frame.decompress(data)
else:
packed_data = data
unpacked_data = msgpack.unpackb(packed_data, raw=False, strict_map_key=False)
memories = []
for memory_data in unpacked_data:
restored_data = MessagePackMemorySerializer._restore_from_serialization(memory_data)
memories.append(MemoryEntry(**restored_data))
return memories
# Streaming MessagePack for large datasets
class StreamingMessagePackSerializer:
"""Stream-based MessagePack serialization"""
def __init__(self, output_file: str, compress: bool = True):
self.output_file = output_file
self.compress = compress
self.packer = msgpack.Packer(use_bin_type=True)
self.file_handle = None
def __enter__(self):
if self.compress:
import lz4.frame
self.file_handle = lz4.frame.open(self.output_file, mode='wb')
else:
self.file_handle = open(self.output_file, 'wb')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.file_handle.close()
def write_memory(self, memory: MemoryEntry):
"""Write a single memory to the stream"""
prepared_data = MessagePackMemorySerializer._prepare_for_serialization(memory)
packed_data = self.packer.pack(prepared_data)
self.file_handle.write(packed_data)
def read_memories(input_file: str, compressed: bool = True):
"""Generator to read memories from stream"""
unpacker = msgpack.Unpacker(raw=False, strict_map_key=False)
if compressed:
import lz4.frame
file_handle = lz4.frame.open(input_file, mode='rb')
else:
file_handle = open(input_file, 'rb')
try:
while True:
chunk = file_handle.read(1024)
if not chunk:
break
unpacker.feed(chunk)
for unpacked_data in unpacker:
restored_data = MessagePackMemorySerializer._restore_from_serialization(unpacked_data)
yield MemoryEntry(**restored_data)
finally:
file_handle.close()Vector Embedding Serialization
Efficient Embedding Storage
import numpy as np
import struct
from typing import List, Dict, Any, Tuple
import zlib
class EmbeddingSerializer:
"""Specialized serialization for vector embeddings"""
@staticmethod
def serialize_embedding_float32(embedding: List[float]) -> bytes:
"""Serialize embedding as float32 array (4 bytes per dimension)"""
np_array = np.array(embedding, dtype=np.float32)
return np_array.tobytes()
@staticmethod
def deserialize_embedding_float32(data: bytes, dimension: int) -> List[float]:
"""Deserialize float32 embedding"""
np_array = np.frombuffer(data, dtype=np.float32)
if len(np_array) != dimension:
raise ValueError(f"Expected {dimension} dimensions, got {len(np_array)}")
return np_array.tolist()
@staticmethod
def serialize_embedding_float16(embedding: List[float]) -> bytes:
"""Serialize embedding as float16 array (2 bytes per dimension)"""
np_array = np.array(embedding, dtype=np.float16)
return np_array.tobytes()
@staticmethod
def deserialize_embedding_float16(data: bytes, dimension: int) -> List[float]:
"""Deserialize float16 embedding (lower precision, smaller size)"""
np_array = np.frombuffer(data, dtype=np.float16)
if len(np_array) != dimension:
raise ValueError(f"Expected {dimension} dimensions, got {len(np_array)}")
return np_array.astype(np.float32).tolist()
@staticmethod
def serialize_embedding_compressed(embedding: List[float],
dtype: str = 'float32',
compression_level: int = 6) -> bytes:
"""Serialize embedding with compression"""
if dtype == 'float32':
np_array = np.array(embedding, dtype=np.float32)
elif dtype == 'float16':
np_array = np.array(embedding, dtype=np.float16)
else:
raise ValueError("dtype must be 'float32' or 'float16'")
raw_bytes = np_array.tobytes()
compressed_bytes = zlib.compress(raw_bytes, compression_level)
# Add metadata header: dtype(1 byte) + original_length(4 bytes)
dtype_byte = 0 if dtype == 'float32' else 1
header = struct.pack('BI', dtype_byte, len(raw_bytes))
return header + compressed_bytes
@staticmethod
def deserialize_embedding_compressed(data: bytes) -> List[float]:
"""Deserialize compressed embedding"""
if len(data) < 5:
raise ValueError("Data too short for compressed embedding")
# Extract header
dtype_byte, original_length = struct.unpack('BI', data[:5])
dtype = 'float32' if dtype_byte == 0 else 'float16'
# Decompress
compressed_data = data[5:]
raw_bytes = zlib.decompress(compressed_data)
if len(raw_bytes) != original_length:
raise ValueError("Decompressed data length mismatch")
# Convert back to array
if dtype == 'float32':
np_array = np.frombuffer(raw_bytes, dtype=np.float32)
else:
np_array = np.frombuffer(raw_bytes, dtype=np.float16).astype(np.float32)
return np_array.tolist()
class BatchEmbeddingSerializer:
"""Efficient batch serialization for multiple embeddings"""
@staticmethod
def serialize_embedding_batch(embeddings: List[List[float]],
dtype: str = 'float32') -> bytes:
"""Serialize multiple embeddings efficiently"""
if not embeddings:
return b''
# Check dimension consistency
dimension = len(embeddings[0])
if not all(len(emb) == dimension for emb in embeddings):
raise ValueError("All embeddings must have the same dimension")
# Convert to numpy array for efficient serialization
if dtype == 'float32':
np_array = np.array(embeddings, dtype=np.float32)
elif dtype == 'float16':
np_array = np.array(embeddings, dtype=np.float16)
else:
raise ValueError("dtype must be 'float32' or 'float16'")
# Create header: dtype(1) + count(4) + dimension(4)
dtype_byte = 0 if dtype == 'float32' else 1
header = struct.pack('BII', dtype_byte, len(embeddings), dimension)
return header + np_array.tobytes()
@staticmethod
def deserialize_embedding_batch(data: bytes) -> List[List[float]]:
"""Deserialize multiple embeddings"""
if len(data) < 9:
raise ValueError("Data too short for embedding batch")
# Extract header
dtype_byte, count, dimension = struct.unpack('BII', data[:9])
dtype = np.float32 if dtype_byte == 0 else np.float16
# Extract embedding data
embedding_data = data[9:]
np_array = np.frombuffer(embedding_data, dtype=dtype)
# Reshape to (count, dimension)
expected_size = count * dimension
if len(np_array) != expected_size:
raise ValueError(f"Expected {expected_size} values, got {len(np_array)}")
reshaped = np_array.reshape(count, dimension)
# Convert to float32 if needed and return as list
if dtype == np.float16:
reshaped = reshaped.astype(np.float32)
return reshaped.tolist()
# Quantization for even smaller embeddings
class QuantizedEmbeddingSerializer:
"""Quantized embedding serialization for extreme size reduction"""
@staticmethod
def serialize_embedding_quantized(embedding: List[float],
bits: int = 8) -> bytes:
"""Quantize embedding to specified bit depth"""
if bits not in [4, 8]:
raise ValueError("Only 4-bit and 8-bit quantization supported")
np_array = np.array(embedding, dtype=np.float32)
# Find min/max for quantization
min_val = np_array.min()
max_val = np_array.max()
# Quantize
if bits == 8:
scale = 255.0 / (max_val - min_val) if max_val != min_val else 1.0
quantized = np.clip((np_array - min_val) * scale, 0, 255).astype(np.uint8)
quantized_bytes = quantized.tobytes()
else: # 4-bit
scale = 15.0 / (max_val - min_val) if max_val != min_val else 1.0
quantized = np.clip((np_array - min_val) * scale, 0, 15).astype(np.uint8)
# Pack two 4-bit values into one byte
packed = []
for i in range(0, len(quantized), 2):
if i + 1 < len(quantized):
packed_byte = (quantized[i] << 4) | quantized[i + 1]
else:
packed_byte = quantized[i] << 4
packed.append(packed_byte)
quantized_bytes = bytes(packed)
# Create header: bits(1) + dimension(4) + min_val(4) + max_val(4)
header = struct.pack('Biff', bits, len(embedding), min_val, max_val)
return header + quantized_bytes
@staticmethod
def deserialize_embedding_quantized(data: bytes) -> List[float]:
"""Deserialize quantized embedding"""
if len(data) < 13:
raise ValueError("Data too short for quantized embedding")
# Extract header
bits, dimension, min_val, max_val = struct.unpack('Biff', data[:13])
quantized_data = data[13:]
if bits == 8:
quantized = np.frombuffer(quantized_data, dtype=np.uint8)
scale = (max_val - min_val) / 255.0 if max_val != min_val else 0.0
restored = quantized.astype(np.float32) * scale + min_val
else: # 4-bit
# Unpack 4-bit values
packed_bytes = np.frombuffer(quantized_data, dtype=np.uint8)
quantized = []
for packed_byte in packed_bytes:
high = (packed_byte >> 4) & 0xF
low = packed_byte & 0xF
quantized.extend([high, low])
# Trim to original dimension
quantized = np.array(quantized[:dimension], dtype=np.float32)
scale = (max_val - min_val) / 15.0 if max_val != min_val else 0.0
restored = quantized * scale + min_val
return restored.tolist()Performance Benchmarking
Serialization Benchmark Suite
import time
import psutil
import sys
from typing import List, Callable, Dict, Any
from datetime import datetime, timedelta
import random
import string
class SerializationBenchmark:
"""Comprehensive benchmarking for serialization methods"""
def __init__(self):
self.serializers = {
'json': (JSONMemorySerializer.serialize_memory, JSONMemorySerializer.deserialize_memory),
'json_compressed': (
lambda m: JSONMemorySerializer.serialize_memory(m, compress=True),
lambda d: JSONMemorySerializer.deserialize_memory(d, compressed=True)
),
'protobuf': (ProtobufMemorySerializer.serialize_memory, ProtobufMemorySerializer.deserialize_memory),
'messagepack': (MessagePackMemorySerializer.serialize_memory, MessagePackMemorySerializer.deserialize_memory),
'messagepack_compressed': (
lambda m: MessagePackMemorySerializer.serialize_memory(m, compress=True),
lambda d: MessagePackMemorySerializer.deserialize_memory(d, compressed=True)
)
}
def generate_test_memory(self, embedding_size: int = 384, content_length: int = 500) -> MemoryEntry:
"""Generate a test memory entry"""
return MemoryEntry(
id=''.join(random.choices(string.ascii_letters + string.digits, k=16)),
content=''.join(random.choices(string.ascii_letters + ' ', k=content_length)),
memory_type=random.choice(['episodic', 'semantic', 'working', 'procedural']),
importance=random.random(),
timestamp=datetime.now() - timedelta(seconds=random.randint(0, 86400)),
embedding=[random.random() for _ in range(embedding_size)],
metadata={
'session_id': ''.join(random.choices(string.ascii_letters, k=8)),
'user_id': random.randint(1, 10000),
'source': random.choice(['chat', 'document', 'search']),
'confidence': random.random()
},
tags=[random.choice(['important', 'work', 'personal', 'research']) for _ in range(random.randint(1, 4))]
)
def benchmark_single_memory(self, memory: MemoryEntry, iterations: int = 1000) -> Dict[str, Dict[str, float]]:
"""Benchmark serialization/deserialization of a single memory"""
results = {}
for name, (serialize_func, deserialize_func) in self.serializers.items():
# Warm up
for _ in range(10):
data = serialize_func(memory)
deserialize_func(data)
# Measure serialization
start_time = time.perf_counter()
start_memory = psutil.Process().memory_info().rss
serialized_data = None
for _ in range(iterations):
serialized_data = serialize_func(memory)
serialize_time = (time.perf_counter() - start_time) / iterations
serialize_memory = psutil.Process().memory_info().rss - start_memory
# Measure deserialization
start_time = time.perf_counter()
start_memory = psutil.Process().memory_info().rss
for _ in range(iterations):
deserialize_func(serialized_data)
deserialize_time = (time.perf_counter() - start_time) / iterations
deserialize_memory = psutil.Process().memory_info().rss - start_memory
results[name] = {
'serialize_time_ms': serialize_time * 1000,
'deserialize_time_ms': deserialize_time * 1000,
'serialized_size_bytes': len(serialized_data),
'serialize_memory_mb': serialize_memory / (1024 * 1024),
'deserialize_memory_mb': deserialize_memory / (1024 * 1024)
}
return results
def benchmark_batch_operations(self, batch_size: int = 100, iterations: int = 100) -> Dict[str, Dict[str, float]]:
"""Benchmark batch serialization operations"""
# Generate test batch
memories = [self.generate_test_memory() for _ in range(batch_size)]
batch_serializers = {
'json': (JSONMemorySerializer.serialize_batch, JSONMemorySerializer.deserialize_batch),
'json_compressed': (
lambda m: JSONMemorySerializer.serialize_batch(m, compress=True),
lambda d: JSONMemorySerializer.deserialize_batch(d, compressed=True)
),
'protobuf': (ProtobufMemorySerializer.serialize_batch,
lambda d: ProtobufMemorySerializer.deserialize_batch(d)[0]),
'messagepack': (MessagePackMemorySerializer.serialize_batch, MessagePackMemorySerializer.deserialize_batch),
'messagepack_compressed': (
lambda m: MessagePackMemorySerializer.serialize_batch(m, compress=True),
lambda d: MessagePackMemorySerializer.deserialize_batch(d, compressed=True)
)
}
results = {}
for name, (serialize_func, deserialize_func) in batch_serializers.items():
try:
# Warm up
for _ in range(5):
data = serialize_func(memories)
deserialize_func(data)
# Measure serialization
start_time = time.perf_counter()
serialized_data = None
for _ in range(iterations):
serialized_data = serialize_func(memories)
serialize_time = (time.perf_counter() - start_time) / iterations
# Measure deserialization
start_time = time.perf_counter()
for _ in range(iterations):
deserialize_func(serialized_data)
deserialize_time = (time.perf_counter() - start_time) / iterations
results[name] = {
'batch_serialize_time_ms': serialize_time * 1000,
'batch_deserialize_time_ms': deserialize_time * 1000,
'batch_size_bytes': len(serialized_data),
'bytes_per_memory': len(serialized_data) / batch_size,
'memories_per_second_serialize': batch_size / serialize_time,
'memories_per_second_deserialize': batch_size / deserialize_time
}
except Exception as e:
results[name] = {'error': str(e)}
return results
def run_comprehensive_benchmark(self) -> Dict[str, Any]:
"""Run complete benchmark suite"""
print("Starting comprehensive serialization benchmark...")
# Test different memory configurations
test_configs = [
{'embedding_size': 384, 'content_length': 100, 'name': 'small_memory'},
{'embedding_size': 384, 'content_length': 1000, 'name': 'medium_memory'},
{'embedding_size': 1536, 'content_length': 2000, 'name': 'large_memory'}
]
benchmark_results = {}
for config in test_configs:
print(f"Testing {config['name']}...")
test_memory = self.generate_test_memory(
embedding_size=config['embedding_size'],
content_length=config['content_length']
)
# Single memory benchmark
single_results = self.benchmark_single_memory(test_memory)
# Batch benchmark
batch_results = self.benchmark_batch_operations(batch_size=100)
benchmark_results[config['name']] = {
'config': config,
'single_memory': single_results,
'batch_operations': batch_results
}
return benchmark_results
def print_results(self, results: Dict[str, Any]):
"""Print benchmark results in a readable format"""
for test_name, test_results in results.items():
print(f"\n=== {test_name.upper()} ===")
print(f"Config: {test_results['config']}")
print("\nSingle Memory Performance:")
print(f"{'Format':<20} {'Serialize (ms)':<15} {'Deserialize (ms)':<18} {'Size (bytes)':<12} {'Compression %':<15}")
print("-" * 85)
# Calculate baseline size for compression ratio
json_size = test_results['single_memory']['json']['serialized_size_bytes']
for format_name, metrics in test_results['single_memory'].items():
if 'error' not in metrics:
compression_ratio = (1 - metrics['serialized_size_bytes'] / json_size) * 100
print(f"{format_name:<20} {metrics['serialize_time_ms']:<15.3f} {metrics['deserialize_time_ms']:<18.3f} "
f"{metrics['serialized_size_bytes']:<12} {compression_ratio:<15.1f}%")
print("\nBatch Operations Performance (100 memories):")
print(f"{'Format':<20} {'Batch Ser. (ms)':<18} {'Batch Deser. (ms)':<20} {'Mem/sec Ser.':<15} {'Mem/sec Deser.':<15}")
print("-" * 95)
for format_name, metrics in test_results['batch_operations'].items():
if 'error' not in metrics:
print(f"{format_name:<20} {metrics['batch_serialize_time_ms']:<18.3f} {metrics['batch_deserialize_time_ms']:<20.3f} "
f"{metrics['memories_per_second_serialize']:<15.0f} {metrics['memories_per_second_deserialize']:<15.0f}")
# Example usage
if __name__ == "__main__":
benchmark = SerializationBenchmark()
results = benchmark.run_comprehensive_benchmark()
benchmark.print_results(results)Production Patterns
Adaptive Serialization Strategy
from typing import Protocol, Union
from enum import Enum
import logging
class SerializationFormat(Enum):
JSON = "json"
JSON_COMPRESSED = "json_compressed"
PROTOBUF = "protobuf"
MESSAGEPACK = "messagepack"
MESSAGEPACK_COMPRESSED = "messagepack_compressed"
class SerializationContext:
"""Context information for choosing serialization strategy"""
def __init__(self, data_size_bytes: int = 0, frequency_hz: float = 0.0,
latency_sensitive: bool = False, storage_sensitive: bool = False,
network_transfer: bool = False):
self.data_size_bytes = data_size_bytes
self.frequency_hz = frequency_hz
self.latency_sensitive = latency_sensitive
self.storage_sensitive = storage_sensitive
self.network_transfer = network_transfer
class AdaptiveSerializer:
"""Automatically choose the best serialization format based on context"""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Performance characteristics from benchmarking
self.format_characteristics = {
SerializationFormat.JSON: {
'speed_score': 3, # 1-5 scale
'size_score': 1,
'compatibility_score': 5,
'cpu_overhead': 'medium'
},
SerializationFormat.JSON_COMPRESSED: {
'speed_score': 2,
'size_score': 4,
'compatibility_score': 5,
'cpu_overhead': 'high'
},
SerializationFormat.PROTOBUF: {
'speed_score': 5,
'size_score': 4,
'compatibility_score': 3,
'cpu_overhead': 'low'
},
SerializationFormat.MESSAGEPACK: {
'speed_score': 4,
'size_score': 3,
'compatibility_score': 4,
'cpu_overhead': 'medium'
},
SerializationFormat.MESSAGEPACK_COMPRESSED: {
'speed_score': 3,
'size_score': 5,
'compatibility_score': 4,
'cpu_overhead': 'high'
}
}
def choose_format(self, context: SerializationContext) -> SerializationFormat:
"""Choose optimal serialization format based on context"""
scores = {}
for format_enum, characteristics in self.format_characteristics.items():
score = 0
# Latency sensitivity - prefer faster formats
if context.latency_sensitive:
score += characteristics['speed_score'] * 3
# Storage sensitivity - prefer smaller formats
if context.storage_sensitive:
score += characteristics['size_score'] * 3
# Network transfer - balance speed and size
if context.network_transfer:
score += characteristics['speed_score'] * 2
score += characteristics['size_score'] * 2
# High frequency operations - prefer speed
if context.frequency_hz > 100:
score += characteristics['speed_score'] * 2
# Large data size - prefer compression
if context.data_size_bytes > 10000:
score += characteristics['size_score'] * 2
scores[format_enum] = score
# Choose format with highest score
best_format = max(scores.items(), key=lambda x: x[1])[0]
self.logger.info(f"Chose {best_format.value} serialization (score: {scores[best_format]})")
return best_format
def serialize(self, memory: MemoryEntry, context: SerializationContext) -> tuple[bytes, SerializationFormat]:
"""Serialize memory with optimal format"""
format_chosen = self.choose_format(context)
if format_chosen == SerializationFormat.JSON:
data = JSONMemorySerializer.serialize_memory(memory, compress=False)
elif format_chosen == SerializationFormat.JSON_COMPRESSED:
data = JSONMemorySerializer.serialize_memory(memory, compress=True)
elif format_chosen == SerializationFormat.PROTOBUF:
data = ProtobufMemorySerializer.serialize_memory(memory)
elif format_chosen == SerializationFormat.MESSAGEPACK:
data = MessagePackMemorySerializer.serialize_memory(memory, compress=False)
elif format_chosen == SerializationFormat.MESSAGEPACK_COMPRESSED:
data = MessagePackMemorySerializer.serialize_memory(memory, compress=True)
else:
raise ValueError(f"Unsupported format: {format_chosen}")
return data, format_chosen
def deserialize(self, data: bytes, format_used: SerializationFormat) -> MemoryEntry:
"""Deserialize memory using the specified format"""
if format_used == SerializationFormat.JSON:
return JSONMemorySerializer.deserialize_memory(data, compressed=False)
elif format_used == SerializationFormat.JSON_COMPRESSED:
return JSONMemorySerializer.deserialize_memory(data, compressed=True)
elif format_used == SerializationFormat.PROTOBUF:
return ProtobufMemorySerializer.deserialize_memory(data)
elif format_used == SerializationFormat.MESSAGEPACK:
return MessagePackMemorySerializer.deserialize_memory(data, compressed=False)
elif format_used == SerializationFormat.MESSAGEPACK_COMPRESSED:
return MessagePackMemorySerializer.deserialize_memory(data, compressed=True)
else:
raise ValueError(f"Unsupported format: {format_used}")
# Production memory store with adaptive serialization
class AdaptiveMemoryStore:
"""Memory store that adapts serialization based on usage patterns"""
def __init__(self, base_store):
self.base_store = base_store
self.adaptive_serializer = AdaptiveSerializer()
self.usage_stats = {}
self.logger = logging.getLogger(__name__)
def store_memory(self, memory: MemoryEntry, **kwargs) -> str:
"""Store memory with adaptive serialization"""
# Analyze context
context = self._analyze_storage_context(memory)
# Serialize with optimal format
serialized_data, format_used = self.adaptive_serializer.serialize(memory, context)
# Store with format metadata
storage_metadata = {
'serialization_format': format_used.value,
'original_size': len(memory.content),
'serialized_size': len(serialized_data),
**kwargs.get('metadata', {})
}
# Use base store (which should handle raw bytes and metadata)
return self.base_store.store_raw(
memory_id=memory.id,
data=serialized_data,
metadata=storage_metadata
)
def retrieve_memory(self, memory_id: str) -> Optional[MemoryEntry]:
"""Retrieve and deserialize memory"""
raw_data, metadata = self.base_store.retrieve_raw(memory_id)
if not raw_data:
return None
# Get serialization format from metadata
format_str = metadata.get('serialization_format', 'json')
try:
format_enum = SerializationFormat(format_str)
except ValueError:
self.logger.warning(f"Unknown serialization format: {format_str}, falling back to JSON")
format_enum = SerializationFormat.JSON
# Deserialize
try:
return self.adaptive_serializer.deserialize(raw_data, format_enum)
except Exception as e:
self.logger.error(f"Failed to deserialize memory {memory_id}: {e}")
return None
def _analyze_storage_context(self, memory: MemoryEntry) -> SerializationContext:
"""Analyze memory characteristics to determine optimal serialization"""
# Estimate serialized size (rough approximation)
estimated_size = len(memory.content) + (len(memory.embedding) * 4 if memory.embedding else 0) + 500
# Check if this is a high-frequency memory type
memory_type_stats = self.usage_stats.get(memory.memory_type, {'count': 0, 'avg_frequency': 0})
frequency = memory_type_stats.get('avg_frequency', 0)
return SerializationContext(
data_size_bytes=estimated_size,
frequency_hz=frequency,
latency_sensitive=memory.memory_type == 'working',
storage_sensitive=estimated_size > 5000,
network_transfer=True # Assume network storage
)This comprehensive serialization guide provides practical implementations for all major serialization formats, performance benchmarking tools, and adaptive strategies for production agent memory systems. The code examples are production-ready and include proper error handling, optimization techniques, and monitoring capabilities.