StreamingMetadataEncoder
The StreamingMetadataEncoder
class provides specialized functionality for embedding metadata in streaming content, such as text generated chunk by chunk from an LLM.
Overview
When working with streaming content, embedding metadata presents unique challenges:
- The content arrives in chunks, not all at once
- Each chunk may not have suitable locations for embedding metadata
- The metadata must be consistently verifiable across the entire content
The StreamingMetadataEncoder
addresses these challenges by:
- Buffering content as needed
- Intelligently distributing metadata across chunks
- Ensuring HMAC verification works on the complete content
Class Definition
class StreamingMetadataEncoder:
def __init__(
self,
secret_key: Optional[str] = None,
target: Union[str, MetadataTarget] = "whitespace",
encode_first_chunk_only: bool = True
):
"""
Initialize a StreamingMetadataEncoder for handling streaming content.
Args:
secret_key: Optional secret key for HMAC verification. If not provided,
a random key will be generated.
target: Where to embed metadata. Can be a string ("whitespace", "punctuation",
"first_letter", "last_letter", "all_characters") or a MetadataTarget enum.
encode_first_chunk_only: If True, metadata will only be embedded in the first
non-empty chunk that contains suitable targets.
"""
Methods
initialize_stream
def initialize_stream(
self,
metadata: Dict[str, Any]
) -> str:
"""
Initialize a new streaming session with the provided metadata.
Args:
metadata: Dictionary containing the metadata to embed
Returns:
stream_id: A unique identifier for this streaming session
"""
process_chunk
def process_chunk(
self,
stream_id: str,
chunk: str,
is_first: bool = False,
is_last: bool = False
) -> str:
"""
Process a chunk of streaming content.
Args:
stream_id: The stream ID returned by initialize_stream
chunk: The text chunk to process
is_first: Whether this is the first chunk in the stream
is_last: Whether this is the last chunk in the stream
Returns:
The processed chunk with metadata embedded (if applicable)
"""
finalize_stream
def finalize_stream(
self,
stream_id: str
) -> Dict[str, Any]:
"""
Finalize a streaming session.
Args:
stream_id: The stream ID returned by initialize_stream
Returns:
A dictionary containing information about the completed stream
"""
get_stream_info
def get_stream_info(
self,
stream_id: str
) -> Dict[str, Any]:
"""
Get information about a streaming session.
Args:
stream_id: The stream ID returned by initialize_stream
Returns:
A dictionary containing information about the stream
"""
Usage Example
from encypher.streaming.encoders import StreamingMetadataEncoder
from encypher.core.unicode_metadata import MetadataTarget
import time
# Initialize the encoder
encoder = StreamingMetadataEncoder(
target=MetadataTarget.WHITESPACE,
encode_first_chunk_only=True
)
# Create metadata
metadata = {
"model_id": "gpt-4",
"organization": "MyCompany",
"timestamp": int(time.time()) # Unix/Epoch timestamp
}
# Initialize a streaming session
stream_id = encoder.initialize_stream(metadata)
# Process chunks as they arrive
chunks = [
"This is the first chunk of text. ",
"This is the second chunk. ",
"And this is the final chunk."
]
processed_chunks = []
for i, chunk in enumerate(chunks):
is_first = i == 0
is_last = i == len(chunks) - 1
processed_chunk = encoder.process_chunk(
stream_id=stream_id,
chunk=chunk,
is_first=is_first,
is_last=is_last
)
processed_chunks.append(processed_chunk)
print(f"Processed chunk {i+1}: {processed_chunk}")
# If the last chunk wasn't marked as is_last=True, finalize the stream
if not chunks: # If no chunks were processed with is_last=True
encoder.finalize_stream(stream_id)
# Combine all processed chunks
full_text = "".join(processed_chunks)
# Now you can extract and verify the metadata using the standard MetadataEncoder
from encypher.core.metadata_encoder import MetadataEncoder
standard_encoder = MetadataEncoder()
is_valid, metadata_dict, clean_text = standard_encoder.verify_text(full_text)
print(f"Extracted metadata: {metadata_dict}")
print(f"Verification result: {is_valid}")
Streaming with OpenAI
from openai import OpenAI
from encypher.streaming.encoders import StreamingMetadataEncoder
import time
# Initialize OpenAI client
client = OpenAI(api_key="your-api-key")
# Initialize the streaming encoder
encoder = StreamingMetadataEncoder(
target="whitespace",
encode_first_chunk_only=True
)
# Create metadata
metadata = {
"model_id": "gpt-4",
"organization": "MyCompany",
"timestamp": int(time.time()) # Unix/Epoch timestamp
}
# Initialize a streaming session
stream_id = encoder.initialize_stream(metadata)
# Create a streaming completion
completion = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a short paragraph about AI ethics."}
],
stream=True
)
# Process each chunk
full_response = ""
for chunk in completion:
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# Process the chunk
processed_chunk = encoder.process_chunk(
stream_id=stream_id,
chunk=content
)
# Print and accumulate the processed chunk
print(processed_chunk, end="", flush=True)
full_response += processed_chunk
# Finalize the stream
encoder.finalize_stream(stream_id)
print("\n\nStreaming completed!")
Streaming Handler Alternative
For a higher-level interface, you can also use the StreamingHandler
class, which provides a simpler API for common streaming scenarios.
Implementation Details
Metadata Distribution
The StreamingMetadataEncoder
uses different strategies for embedding metadata depending on the encode_first_chunk_only
setting:
- When
encode_first_chunk_only=True
(default), it waits for a chunk with suitable targets and embeds all metadata there - When
encode_first_chunk_only=False
, it distributes metadata across multiple chunks as needed
Buffering
If a chunk doesn't have enough suitable targets for embedding metadata, the encoder may buffer it and combine it with subsequent chunks until there are enough targets.
HMAC Verification
The HMAC signature is calculated based on the entire content, not just individual chunks. This ensures that the verification will detect if any part of the content is modified.
Related Classes
MetadataEncoder
- Base class for embedding and extracting metadataStreamingHandler
- Higher-level interface for streaming scenariosUnicodeMetadata
- Low-level utilities for working with Unicode variation selectors
StreamingHandler
The StreamingHandler
class provides specialized functionality for embedding metadata in streaming content, such as text generated chunk by chunk from an LLM.
Overview
When working with streaming content, embedding metadata presents unique challenges:
- The content arrives in chunks, not all at once
- Each chunk may not have suitable locations for embedding metadata
- The metadata must be consistently verifiable across the entire content
The StreamingHandler
addresses these challenges by:
- Buffering content as needed
- Intelligently distributing metadata across chunks
- Ensuring HMAC verification works on the complete content
Class Definition
class StreamingHandler:
def __init__(
self,
metadata: Optional[Dict[str, Any]] = None,
target: Union[str, MetadataTarget] = "whitespace",
encode_first_chunk_only: bool = True,
hmac_secret_key: Optional[str] = None
):
"""
Initialize a StreamingHandler for handling streaming content.
Args:
metadata: Dictionary containing the metadata to embed
target: Where to embed metadata. Can be a string ("whitespace", "punctuation",
"first_letter", "last_letter", "all_characters") or a MetadataTarget enum.
encode_first_chunk_only: If True, metadata will only be embedded in the first
non-empty chunk that contains suitable targets.
hmac_secret_key: Optional secret key for HMAC verification. Only needed if
you want to verify the integrity of the metadata.
"""
Methods
process_chunk
def process_chunk(
self,
chunk: Union[str, Dict[str, Any]]
) -> Union[str, Dict[str, Any]]:
"""
Process a chunk of streaming content.
Args:
chunk: The text chunk to process or a dictionary containing a text chunk
Returns:
The processed chunk with metadata embedded (if applicable)
"""
finalize
def finalize(self) -> Optional[str]:
"""
Finalize the streaming process.
This method should be called after all chunks have been processed to ensure
that any remaining buffered content is properly processed.
Returns:
Any remaining processed content, or None if there is none
"""
Usage Example
```python from encypher.streaming.handlers import StreamingHandler import time
Create metadata
metadata = { "model_id": "gpt-4", "organization": "EncypherAI", "timestamp": int(time.time()), # Unix/Epoch timestamp "version": "1.0.0" }
Initialize the streaming handler
handler = StreamingHandler( metadata=metadata, target="whitespace", encode_first_chunk_only=True, hmac_secret_key="your-secret-key" # Optional: Only needed for HMAC verification )
Process chunks as they arrive
chunks = [ "This is the first chunk of text. ", "This is the second chunk. ", "And this is the final chunk." ]
full_text = "" for chunk in chunks: # Process the chunk processed_chunk = handler.process_chunk(chunk=chunk)
# Print and accumulate the processed chunk
print(processed_chunk, end="", flush=True)
full_text += processed_chunk
Finalize the stream
final_chunk = handler.finalize() if final_chunk: full_text += final_chunk
Extract and verify the metadata
from encypher.core.metadata_encoder import MetadataEncoder
encoder = MetadataEncoder(hmac_secret_key="your-secret-key") metadata_dict, is_verified = encoder.extract_verified_metadata(full_text) print(f"\nMetadata: {metadata_dict}") print(f"Verified: {is_verified}")