FastAPI Integration
This guide explains how to integrate EncypherAI with FastAPI to build robust web applications that can embed, extract, and verify metadata in AI-generated content.
Prerequisites
Before you begin, make sure you have:
- FastAPI and its dependencies installed
- EncypherAI installed
- (Optional) An LLM provider API key if you're integrating with an LLM
Basic Integration
Creating a FastAPI Application with EncypherAI
Here's a simple FastAPI application that demonstrates how to embed and extract metadata:
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, Any, Optional
from encypher.core.unicode_metadata import UnicodeMetadata
from encypher.core.keys import generate_key_pair
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.types import PublicKeyTypes, PrivateKeyTypes
import time
import json
# Initialize FastAPI app
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Key Management (Replace with your actual key management) ---
private_key, public_key = generate_key_pair()
# Store public keys (in a real app, this would be a database or secure store)
public_keys_store: Dict[str, PublicKeyTypes] = {
"fastapi-key-1": public_key
}
def resolve_public_key(key_id: str) -> Optional[PublicKeyTypes]:
"""Retrieves the public key based on its ID."""
return public_keys_store.get(key_id)
# Define request and response models
class EncodeRequest(BaseModel):
text: str
metadata: Dict[str, Any]
target: Optional[str] = "whitespace"
key_id: str = "fastapi-key-1" # Default key ID
class VerifyRequest(BaseModel):
text: str
class MetadataResponse(BaseModel):
has_metadata: bool
metadata: Optional[Dict[str, Any]] = None
verified: Optional[bool] = None
error: Optional[str] = None
# Endpoints
@app.post("/encode", response_model=Dict[str, Any])
async def encode_metadata(request: EncodeRequest):
"""Embed metadata into text"""
try:
# Add timestamp if not provided
request.metadata["timestamp"] = request.metadata.get("timestamp", time.time())
request.metadata["key_id"] = request.metadata.get("key_id", request.key_id) # Ensure key_id is in metadata
# Embed metadata
encoded_text = UnicodeMetadata.embed_metadata(
text=request.text,
metadata=request.metadata,
private_key=private_key # Use the loaded private key
)
return {
"text": encoded_text,
"metadata": request.metadata
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/verify", response_model=MetadataResponse) # Renamed from extract-verified
async def verify_text(request: VerifyRequest):
"""Extract and verify metadata from text"""
result = {
"has_metadata": False,
"metadata": None,
"verified": None,
"error": None
}
try:
# Extract and verify metadata using the resolver
is_valid, verified_metadata = UnicodeMetadata.verify_metadata(
request.text,
public_key_resolver=resolve_public_key
)
# Check if metadata was found (even if invalid)
if verified_metadata is not None:
result["has_metadata"] = True
result["metadata"] = verified_metadata
result["verified"] = is_valid
else:
result["has_metadata"] = False # No metadata marker found
except Exception as e:
result["error"] = str(e)
return result
# Run the app
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Advanced Integration
Streaming Support with FastAPI
To handle streaming content with FastAPI:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict, Any, Optional, AsyncGenerator
from encypher.streaming import StreamingHandler
from encypher.core.unicode_metadata import UnicodeMetadata
from encypher.core.keys import generate_key_pair # Key management
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes, PublicKeyTypes
import time
import json
import asyncio
import openai # Or any other LLM provider
app = FastAPI()
# --- Key Management (Same as above or separate for streaming) ---
stream_private_key, stream_public_key = generate_key_pair()
public_keys_store["fastapi-stream-key"] = stream_public_key # Add to store
# ---------------------------------------------------------------
# Define request model
class StreamRequest(BaseModel):
prompt: str
model: str = "gpt-4"
system_prompt: Optional[str] = "You are a helpful assistant."
metadata: Optional[Dict[str, Any]] = None
@app.post("/stream")
async def stream_response(request: StreamRequest):
# Initialize OpenAI client
client = openai.OpenAI(api_key="your-api-key")
# Create metadata if not provided
metadata = request.metadata or {}
metadata["timestamp"] = metadata.get("timestamp", time.time())
metadata["model"] = metadata.get("model", request.model)
metadata["key_id"] = metadata.get("key_id", "fastapi-stream-key") # Use a stream-specific key ID
# Initialize the streaming handler
handler = StreamingHandler(
metadata=metadata,
private_key=stream_private_key # Use the stream private key
)
# Initialize OpenAI client
client = openai.OpenAI(api_key="your-api-key")
async def generate():
try:
# Create a streaming completion
completion = client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.prompt}
],
stream=True
)
# Process each chunk
for chunk in completion:
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# Process the chunk
processed_chunk = handler.process_chunk(chunk=content)
# Yield the processed chunk
yield processed_chunk
# Finalize the stream
final_chunk = handler.finalize()
if final_chunk:
yield final_chunk
except Exception as e:
# Handle errors
yield f"Error: {str(e)}"
# Return a streaming response
return StreamingResponse(generate(), media_type="text/plain")
WebSocket Support
For real-time communication with WebSockets:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, Any, List, Optional
from encypher.streaming import StreamingHandler
from encypher.core.unicode_metadata import UnicodeMetadata
from encypher.core.keys import generate_key_pair # Key management
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes, PublicKeyTypes
import time
import json
import openai # Or any other LLM provider
app = FastAPI()
# --- Key Management (Same as above or separate for streaming) ---
stream_private_key, stream_public_key = generate_key_pair()
public_keys_store["fastapi-stream-key"] = stream_public_key # Add to store
# ---------------------------------------------------------------
# HTML for a simple WebSocket client
html = """
<!DOCTYPE html>
<html>
<head>
<title>EncypherAI WebSocket Demo</title>
<script>
var ws = null;
function connect() {
ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
message.textContent = event.data;
messages.appendChild(message);
};
ws.onclose = function(event) {
document.getElementById('status').textContent = 'Disconnected';
};
ws.onopen = function(event) {
document.getElementById('status').textContent = 'Connected';
};
}
function sendMessage() {
var input = document.getElementById('messageText');
var message = input.value;
ws.send(message);
input.value = '';
}
function verifyMetadata() {
var text = document.getElementById('messages').innerText;
fetch('/verify', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({text: text})
})
.then(response => response.json())
.then(data => {
var verification = document.getElementById('verification');
if (data.has_metadata) {
verification.innerHTML = '<h3>Metadata Found</h3>' +
'<p>Verified: ' + (data.verified ? '✅' : '❌') + '</p>' +
'<pre>' + JSON.stringify(data.metadata, null, 2) + '</pre>';
} else {
verification.innerHTML = '<h3>No Metadata Found</h3>' +
'<p>Error: ' + data.error + '</p>';
}
});
}
window.onload = function() {
connect();
};
</script>
</head>
<body>
<h1>EncypherAI WebSocket Demo</h1>
<p>Status: <span id="status">Connecting...</span></p>
<input type="text" id="messageText" placeholder="Enter your message here">
<button onclick="sendMessage()">Send</button>
<button onclick="verifyMetadata()">Verify Metadata</button>
<h2>Messages:</h2>
<ul id="messages"></ul>
<div id="verification"></div>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Create metadata
metadata = {
"model": "gpt-4",
"organization": "YourOrganization",
"timestamp": time.time(),
"prompt": ""
}
# Initialize streaming handler
handler = StreamingHandler(
metadata=metadata,
private_key=stream_private_key # Use the stream private key
)
try:
while True:
# Receive message from client
prompt = await websocket.receive_text()
# Update metadata with the prompt
metadata["prompt"] = prompt
# Initialize OpenAI client
client = openai.OpenAI(api_key="your-api-key")
# Create a streaming completion
completion = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
stream=True
)
# Process and send each chunk
for chunk in completion:
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# Process the chunk
processed_chunk = handler.process_chunk(chunk=content)
# Send the processed chunk
await websocket.send_text(processed_chunk)
# Finalize the stream
final_chunk = handler.finalize()
if final_chunk:
await websocket.send_text(final_chunk)
except WebSocketDisconnect:
print("Client disconnected")
Middleware for Automatic Metadata Embedding
You can create a middleware to automatically embed metadata in all responses:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from encypher.core.unicode_metadata import UnicodeMetadata
from encypher.core.keys import generate_key_pair # Assuming key management is accessible
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
import time
# Initialize FastAPI app
app = FastAPI()
# --- Key Loading/Management ---
# Load your main private key (e.g., from a file or env var)
# Example: Load from PEM file
try:
with open("private_key.pem", "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None # Add password if key is encrypted
)
except FileNotFoundError:
print("Warning: private_key.pem not found. Using generated key for demo.")
private_key, _ = generate_key_pair() # Fallback for demo
# Load public keys for resolver (e.g., from DB or config file)
# public_keys_store = load_public_keys_from_db(DB_URL)
# For demo, we used an in-memory dict initialized earlier
# ----------------------------
class MetadataMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
# Process the request normally
response = await call_next(request)
# Check if this is a JSON response
if response.headers.get("content-type") == "application/json":
# Get the response body
body = await response.body()
content = json.loads(body)
# Assuming the response has a 'text' field to embed metadata into
if isinstance(content, dict) and 'text' in content:
text_to_encode = content['text']
# Prepare metadata
metadata_to_embed = {
"processed_by": "middleware",
"timestamp": time.time(),
"key_id": "middleware-key-1"
}
# Embed metadata using UnicodeMetadata
encoded_text = UnicodeMetadata.embed_metadata(
text=text_to_encode,
metadata=metadata_to_embed,
private_key=private_key
)
# Update the response content
content['text'] = encoded_text
# Return the modified response
return JSONResponse(
content=content,
status_code=response.status_code,
headers=dict(response.headers)
)
return response
# Add middleware to the app
app.add_middleware(MetadataMiddleware)
Integration with LLM Providers
OpenAI Integration
Combining FastAPI, EncypherAI, and OpenAI:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict, Any, Optional, AsyncGenerator
from encypher.core.unicode_metadata import UnicodeMetadata
from encypher.streaming import StreamingHandler
from encypher.core.keys import generate_key_pair # Key management
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes, PublicKeyTypes
import time
import openai
app = FastAPI()
# --- Key Management (Load appropriate keys) ---
private_key = get_private_key_for_user("your-user-id") # Example function
key_id = f"user-your-user-id-key"
# ----------------------------------------------
# Define request model
class GenerateRequest(BaseModel):
prompt: str
model: str = "gpt-4"
system_prompt: Optional[str] = "You are a helpful assistant."
stream: bool = False
additional_metadata: Optional[Dict[str, Any]] = None
@app.post("/generate")
async def generate(request: GenerateRequest):
# Initialize OpenAI client
client = openai.OpenAI(api_key="your-api-key")
# Create metadata
metadata = {
"model": request.model,
"timestamp": time.time(),
"key_id": key_id,
"user_id": "your-user-id"
}
# Add additional metadata if provided
if request.additional_metadata:
metadata.update(request.additional_metadata)
# Handle streaming requests
if request.stream:
return StreamingResponse(
generate_stream(client, request, metadata, private_key),
media_type="text/plain"
)
# Create a completion
response = client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.prompt}
]
)
# Get the response text
text = response.choices[0].message.content
# Update metadata with usage information
if hasattr(response, "usage"):
metadata.update({
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
})
# Embed metadata
encoded_text = UnicodeMetadata.embed_metadata(
text=text,
metadata=metadata,
private_key=private_key
)
# Return the response
return {
"text": encoded_text,
"metadata": metadata
}
async def generate_stream(client, request: GenerateRequest, metadata: Dict[str, Any], private_key: PrivateKeyTypes) -> AsyncGenerator[str, None]:
# Initialize the streaming handler
handler = StreamingHandler(
metadata=metadata,
private_key=private_key
)
# Create a streaming completion
completion = client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.prompt}
],
stream=True
)
# Process each chunk
for chunk in completion:
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# Process the chunk
processed_chunk = handler.process_chunk(chunk=content)
# Yield the processed chunk
yield processed_chunk
# Finalize the stream
final_chunk = handler.finalize()
if final_chunk:
yield final_chunk
Anthropic Integration
Combining FastAPI, EncypherAI, and Anthropic:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict, Any, Optional, AsyncGenerator
from encypher.core.unicode_metadata import UnicodeMetadata
from encypher.streaming import StreamingHandler
from encypher.core.keys import generate_key_pair # Key management
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes, PublicKeyTypes
import time
import anthropic
app = FastAPI()
# --- Key Management (Load appropriate keys) ---
private_key = get_private_key_for_user("your-user-id") # Example function
key_id = f"user-your-user-id-key"
# ----------------------------------------------
# Define request model
class GenerateRequest(BaseModel):
prompt: str
model: str = "claude-3-opus-20240229"
stream: bool = False
additional_metadata: Optional[Dict[str, Any]] = None
@app.post("/generate")
async def generate(request: GenerateRequest):
# Initialize Anthropic client
client = anthropic.Anthropic(api_key="your-api-key")
# Create metadata
metadata = {
"model": request.model,
"timestamp": time.time(),
"key_id": key_id,
"user_id": "your-user-id"
}
# Add additional metadata if provided
if request.additional_metadata:
metadata.update(request.additional_metadata)
# Handle streaming requests
if request.stream:
return StreamingResponse(
generate_stream(client, request, metadata, private_key),
media_type="text/plain"
)
# Create a message
response = client.messages.create(
model=request.model,
max_tokens=1000,
messages=[
{"role": "user", "content": request.prompt}
]
)
# Get the response text
text = response.content[0].text
# Update metadata with usage information
if hasattr(response, "usage"):
metadata.update({
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens
})
# Embed metadata
encoded_text = UnicodeMetadata.embed_metadata(
text=text,
metadata=metadata,
private_key=private_key
)
# Return the response
return {
"text": encoded_text,
"metadata": metadata
}
async def generate_stream(client, request: GenerateRequest, metadata: Dict[str, Any], private_key: PrivateKeyTypes) -> AsyncGenerator[str, None]:
# Initialize the streaming handler
handler = StreamingHandler(
metadata=metadata,
private_key=private_key
)
# Create a streaming message
with client.messages.stream(
model=request.model,
max_tokens=1000,
messages=[
{"role": "user", "content": request.prompt}
]
) as stream:
# Process each chunk
for text_delta in stream.text_deltas:
# Process the chunk
processed_chunk = handler.process_chunk(chunk=text_delta)
# Yield the processed chunk
yield processed_chunk
# Finalize the stream
final_chunk = handler.finalize()
if final_chunk:
yield final_chunk
Authentication and Security
API Key Authentication
Implementing API key authentication:
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security.api_key import APIKeyHeader, APIKey
from starlette.status import HTTP_403_FORBIDDEN
from typing import Dict, Any
app = FastAPI()
# Define API key header
API_KEY = "your-api-key" # In production, use a secure key management system
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
# Dependency for API key validation
async def get_api_key(api_key: str = Security(api_key_header)):
if api_key == API_KEY:
return api_key
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Invalid API Key"
)
# Protected endpoint
@app.post("/encode", dependencies=[Depends(get_api_key)])
async def encode_metadata(request: Dict[str, Any]):
# Your implementation here
pass
CORS Configuration
Configuring CORS for production:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Specify your frontend domain
allow_credentials=True,
allow_methods=["GET", "POST"], # Specify allowed methods
allow_headers=["*"], # Or specify allowed headers
)
Rate Limiting
Implementing rate limiting:
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
from typing import Dict, Any, Optional
import time
app = FastAPI()
# Rate limiting configuration
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app,
requests_per_minute: int = 60,
window_size: int = 60 # seconds
):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.window_size = window_size
self.request_counts: Dict[str, Dict[int, int]] = {}
async def dispatch(self, request: Request, call_next):
# Get client IP
client_ip = request.client.host
# Get current time window
current_window = int(time.time() / self.window_size)
# Initialize or update request count
if client_ip not in self.request_counts:
self.request_counts[client_ip] = {}
# Clean up old windows
self.request_counts[client_ip] = {
window: count for window, count in self.request_counts[client_ip].items()
if window >= current_window - 1
}
# Get current count
current_count = self.request_counts[client_ip].get(current_window, 0)
# Check if rate limit exceeded
if current_count >= self.requests_per_minute:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Update count
self.request_counts[client_ip][current_window] = current_count + 1
# Process the request
return await call_next(request)
# Add rate limiting middleware
app.add_middleware(RateLimitMiddleware, requests_per_minute=60)
Deployment
Docker Deployment
Here's a sample Dockerfile for deploying a FastAPI application with EncypherAI:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Start the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
And a sample requirements.txt
:
fastapi>=0.100.0
uvicorn>=0.22.0
encypher-ai>=1.0.0
python-multipart>=0.0.6
python-dotenv>=1.0.0
cryptography>=4.0.0
Environment Variables
Using environment variables for configuration:
from fastapi import FastAPI
from encypher.core.unicode_metadata import UnicodeMetadata
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize FastAPI app
app = FastAPI()
# Get configuration from environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
DB_URL = os.getenv("DATABASE_URL")
# --- Key Loading/Management ---
# Load your main private key (e.g., from a file or env var)
# Example: Load from PEM file
try:
with open("private_key.pem", "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None # Add password if key is encrypted
)
except FileNotFoundError:
print("Warning: private_key.pem not found. Using generated key for demo.")
private_key, _ = generate_key_pair() # Fallback for demo
# Load public keys for resolver (e.g., from DB or config file)
# public_keys_store = load_public_keys_from_db(DB_URL)
# For demo, we used an in-memory dict initialized earlier
# ----------------------------
Best Practices
-
Secure Key Management: Store your EncypherAI private key securely using environment variables or a secure key management system.
-
Input Validation: Use Pydantic models to validate input data and provide clear error messages.
-
Error Handling: Implement proper error handling for both FastAPI and EncypherAI operations.
-
Rate Limiting: Implement rate limiting to prevent abuse of your API.
-
Authentication: Implement API key authentication or OAuth2 for secure access to your API.
-
CORS Configuration: Configure CORS properly to allow only trusted domains to access your API.
-
Logging: Implement structured logging to track API usage and errors.
-
Documentation: Use FastAPI's automatic documentation generation to provide clear API documentation.
Troubleshooting
Common Issues
-
CORS Errors: Ensure
allow_origins
inCORSMiddleware
is correctly configured for your frontend URL in production. -
Key Management: Securely store private keys. Never commit them to version control. Implement a robust
public_key_resolver
that fetches keys from a secure store (like a database or vault) based onkey_id
. -
Verification Failures:
- Check if the text content was modified after metadata embedding.
- Ensure the
public_key_resolver
correctly retrieves the public key corresponding to thekey_id
used during signing. -
Verify that the
private_key
used for signing matches thepublic_key
returned by the resolver for the givenkey_id
. -
Streaming Issues: Ensure the
finalize()
method ofStreamingHandler
is called after the loop to process any buffered content. Check for errors in the async generator logic. -
Dependencies: Make sure
cryptography
is installed (uv pip install cryptography
).