Advanced Usage
This guide covers advanced features and patterns for power users of HTTP Client.
Custom Session Configuration
Custom Request Adapters
Implementing Custom Transport
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class CustomAdapter(HTTPAdapter):
"""Custom adapter with additional features."""
def __init__(self, timeout=30, *args, **kwargs):
self.timeout = timeout
super().__init__(*args, **kwargs)
def send(self, request, **kwargs):
# Set default timeout if not specified
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
# Add custom header
request.headers['X-Custom-Transport'] = 'v1.0'
return super().send(request, **kwargs)
# Use custom adapter
session = requests.Session()
adapter = CustomAdapter(timeout=60)
session.mount('https://', adapter)
session.mount('http://', adapter)
client = HttpClient(config, session=session)
SSL/TLS Configuration
Custom SSL Context
import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
class SSLAdapter(HTTPAdapter):
"""Adapter with custom SSL configuration."""
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context()
context.load_default_certs()
context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
session = requests.Session()
session.mount('https://', SSLAdapter())
client = HttpClient(config, session=session)
Client Certificates
Authenticate with client certificates:
session = requests.Session()
session.cert = ('/path/to/client.crt', '/path/to/client.key')
client = HttpClient(config, session=session)
response = client.get('/secure-endpoint')
Custom CA Bundle
Use custom certificate authority:
session = requests.Session()
session.verify = '/path/to/custom-ca-bundle.crt'
client = HttpClient(config, session=session)
Proxy Configuration
HTTP/HTTPS Proxies
session = requests.Session()
session.proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080',
}
client = HttpClient(config, session=session)
SOCKS Proxy
Requires requests[socks]:
session = requests.Session()
session.proxies = {
'http': 'socks5://user:pass@proxy.example.com:1080',
'https': 'socks5://user:pass@proxy.example.com:1080'
}
client = HttpClient(config, session=session)
Proxy with Authentication
session = requests.Session()
session.proxies = {
'http': 'http://user:password@proxy.example.com:8080',
'https': 'http://user:password@proxy.example.com:8080',
}
Request/Response Transformation
Custom Request Transformer
from requestforge.interfaces import RequestHookInterface
class RequestTransformerHook(RequestHookInterface):
"""Transform request data before sending."""
def before_request(self, request, context):
# Convert snake_case to camelCase for API
if request.json_data:
transformed = self._to_camel_case(request.json_data)
return request.with_json_data(transformed)
return request
def _to_camel_case(self, data):
if isinstance(data, dict):
return {
self._camel_case_key(k): self._to_camel_case(v)
for k, v in data.items()
}
elif isinstance(data, list):
return [self._to_camel_case(item) for item in data]
return data
def _camel_case_key(self, key):
components = key.split('_')
return components[0] + ''.join(x.title() for x in components[1:])
Custom Response Parser
from requestforge.interfaces import ResponseHookInterface
class ResponseParserHook(ResponseHookInterface):
"""Parse and transform response data."""
def after_response(self, response, context):
if response.is_success and response.headers.get('Content-Type', '').startswith('application/json'):
# Parse and store in context
data = response.json_or_none()
if data:
# Transform camelCase to snake_case
transformed = self._to_snake_case(data)
context.metadata['parsed_data'] = transformed
return response
def _to_snake_case(self, data):
# Implementation similar to above
Dynamic Configuration
Environment-Based Configuration
import os
from requestforge import HttpClientConfigBuilder
def create_config_for_environment():
"""Create configuration based on environment."""
env = os.getenv('ENVIRONMENT', 'development')
builder = HttpClientConfigBuilder()
if env == 'production':
builder = (
builder
.with_base_url('https://api.production.com')
.with_retry(max_retries=5, base_delay=2.0)
.with_pool_connection(50)
.with_pool_maxsize(100)
.with_verify_ssl(True)
)
elif env == 'staging':
builder = (
builder
.with_base_url('https://api.staging.com')
.with_retry(max_retries=3)
.with_logging(log_headers=True)
)
else: # development
builder = (
builder
.with_base_url('http://localhost:8000')
.with_retry(max_retries=1)
.with_logging(log_headers=True, log_body=True)
.with_verify_ssl(False)
)
return builder.build()
# Usage
config = create_config_for_environment()
client = HttpClient(config)
Feature Flags
class FeatureFlagHook(RequestHookInterface):
"""Add feature flags to requests."""
def __init__(self, feature_flags):
self.flags = feature_flags
def before_request(self, request, context):
# Add active feature flags as header
active_flags = ','.join([
flag for flag, enabled in self.flags.items()
if enabled
])
return request.with_headers({
'X-Feature-Flags': active_flags
})
# Usage
flags = {
'new_api_v2': True,
'experimental_feature': False,
'beta_endpoint': True
}
config = (
builder
.with_request_hook(FeatureFlagHook(flags))
.build()
)
Request Signing
HMAC Signing
import hmac
import hashlib
import time
from requestforge.interfaces import RequestHookInterface
class HMACSigningHook(RequestHookInterface):
"""Sign requests with HMAC."""
def __init__(self, access_key, secret_key):
self.access_key = access_key
self.secret_key = secret_key.encode()
def before_request(self, request, context):
timestamp = str(int(time.time()))
# Create signature string
sign_string = '\n'.join([
request.method.value,
request.url,
timestamp,
request.json_data or ''
])
# Generate signature
signature = hmac.new(
self.secret_key,
sign_string.encode(),
hashlib.sha256
).hexdigest()
return request.with_headers({
'X-Access-Key': self.access_key,
'X-Timestamp': timestamp,
'X-Signature': signature
})
AWS Signature V4
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import boto3
class AWSSignatureHook(RequestHookInterface):
"""Sign requests with AWS Signature V4."""
def __init__(self, service_name, region_name='us-east-1'):
session = boto3.Session()
credentials = session.get_credentials()
self.signer = SigV4Auth(credentials, service_name, region_name)
def before_request(self, request, context):
# Convert to AWS request
aws_request = AWSRequest(
method=request.method.value,
url=request.url,
data=request.json_data,
headers=request.headers or {}
)
# Sign request
self.signer.add_auth(aws_request)
# Return with signed headers
return request.with_headers(dict(aws_request.headers))
Response Caching
In-Memory Cache
from requestforge.interfaces import ResponseHookInterface
import hashlib
import time
class ResponseCacheHook(ResponseHookInterface):
"""Cache successful GET responses in memory."""
def __init__(self, ttl=300):
self.cache = {}
self.ttl = ttl
def after_response(self, response, context):
if (response.request.method == HttpMethod.GET and
response.is_success):
cache_key = self._get_cache_key(response.request)
self.cache[cache_key] = {
'response': response,
'timestamp': time.time()
}
return response
def get_cached(self, request):
"""Check if cached response exists."""
cache_key = self._get_cache_key(request)
cached = self.cache.get(cache_key)
if cached:
age = time.time() - cached['timestamp']
if age < self.ttl:
return cached['response']
else:
del self.cache[cache_key]
return None
def _get_cache_key(self, request):
"""Generate cache key from request."""
content = f"{request.url}:{request.params}"
return hashlib.md5(content.encode()).hexdigest()
Redis Cache
import redis
import pickle
from requestforge.interfaces import ResponseHookInterface
class RedisCacheHook(ResponseHookInterface):
"""Cache responses in Redis."""
def __init__(self, redis_client, ttl=300):
self.redis = redis_client
self.ttl = ttl
def after_response(self, response, context):
if (response.request.method == HttpMethod.GET and
response.is_success):
cache_key = self._get_cache_key(response.request)
# Serialize response
cached_data = {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': response.content,
'elapsed_ms': response.elapsed_ms
}
self.redis.setex(
cache_key,
self.ttl,
pickle.dumps(cached_data)
)
return response
def get_cached(self, request):
"""Retrieve cached response."""
cache_key = self._get_cache_key(request)
cached = self.redis.get(cache_key)
if cached:
data = pickle.loads(cached)
# Reconstruct HttpResponse
return HttpResponse(**data, request=request)
return None
Request Batching
Batch Multiple Requests
from requestforge import HttpRequest, HttpMethod
class BatchRequestManager:
"""Batch multiple requests into single API call."""
def __init__(self, client, batch_endpoint='/batch'):
self.client = client
self.batch_endpoint = batch_endpoint
self.pending = []
def add(self, request):
"""Add request to batch."""
self.pending.append(request)
def execute(self):
"""Execute all pending requests as batch."""
if not self.pending:
return []
# Convert to batch format
batch_request = {
'requests': [
{
'method': req.method.value,
'url': req.url,
'body': req.json_data
}
for req in self.pending
]
}
# Send batch request
response = self.client.post(
self.batch_endpoint,
json_data=batch_request
)
self.pending = []
return response.json()['responses']
# Usage
batch = BatchRequestManager(client)
batch.add(HttpRequest(method=HttpMethod.GET, url='/users/1'))
batch.add(HttpRequest(method=HttpMethod.GET, url='/users/2'))
batch.add(HttpRequest(method=HttpMethod.GET, url='/users/3'))
responses = batch.execute()
GraphQL Support
GraphQL Client Wrapper
class GraphQLClient:
"""GraphQL client wrapper."""
def __init__(self, client, endpoint='/graphql'):
self.client = client
self.endpoint = endpoint
def query(self, query, variables=None, operation_name=None):
"""Execute GraphQL query."""
payload = {'query': query}
if variables:
payload['variables'] = variables
if operation_name:
payload['operationName'] = operation_name
response = self.client.post(self.endpoint, json_data=payload)
if not response.is_success:
raise HttpStatusException(
message=f"GraphQL request failed: {response.status_code}",
status_code=response.status_code,
response_body=response.text
)
data = response.json()
if 'errors' in data:
raise Exception(f"GraphQL errors: {data['errors']}")
return data['data']
def mutate(self, mutation, variables=None):
"""Execute GraphQL mutation."""
return self.query(mutation, variables)
# Usage
config = HttpClientConfigBuilder().with_base_url('https://api.example.com').build()
requestforge = HttpClient(config)
gql_client = GraphQLClient(requestforge)
# Query
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
name
email
}
}
"""
result = gql_client.query(query, variables={'id': '123'})
print(result['user']['name'])
# Mutation
mutation = """
mutation CreateUser($input: CreateUserInput!) {
createUser(input: $input) {
id
name
}
}
"""
result = gql_client.mutate(mutation, variables={
'input': {
'name': 'John Doe',
'email': 'john@example.com'
}
})
Streaming Responses
Download Large Files
def download_large_file(client, url, output_path, chunk_size=8192):
"""Download large file with progress tracking."""
import requests
# Use raw requests for streaming
response = client.session.get(
client._build_url(url),
stream=True,
headers=client._config.default_headers
)
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Progress
if total_size:
percent = (downloaded / total_size) * 100
print(f"Downloaded: {percent:.1f}%", end='\r')
print(f"\nDownload complete: {output_path}")
Stream JSON Lines
def stream_json_lines(client, url):
"""Stream and parse JSONL (JSON Lines) responses."""
import json
response = client.session.get(
client._build_url(url),
stream=True
)
for line in response.iter_lines():
if line:
yield json.loads(line.decode('utf-8'))
# Usage
for item in stream_json_lines(client, '/api/stream'):
process_item(item)
Performance Monitoring
Request Timing Hook
from requestforge.interfaces import RequestHookInterface, ResponseHookInterface
import time
class TimingHooks:
class Request(RequestHookInterface):
def before_request(self, request, context):
context.metadata['timing'] = {
'start': time.time(),
'url': request.url
}
return request
class Response(ResponseHookInterface):
def after_response(self, response, context):
timing = context.metadata.get('timing', {})
if timing:
total_time = time.time() - timing['start']
server_time = response.elapsed_ms / 1000
network_time = total_time - server_time
print(f"URL: {timing['url']}")
print(f" Total: {total_time:.3f}s")
print(f" Server: {server_time:.3f}s")
print(f" Network: {network_time:.3f}s")
return response
Memory Usage Tracking
import sys
from requestforge.interfaces import ResponseHookInterface
class MemoryTrackingHook(ResponseHookInterface):
def after_response(self, response, context):
size_bytes = len(response.content)
size_mb = size_bytes / (1024 * 1024)
context.metadata['response_size'] = size_bytes
if size_mb > 10:
logger.warning(
f"Large response: {size_mb:.2f}MB from {response.url}"
)
return response
Distributed Tracing
OpenTelemetry Integration
from opentelemetry import trace
from requestforge.interfaces import RequestHookInterface, ResponseHookInterface
tracer = trace.get_tracer(__name__)
class OpenTelemetryHooks:
class Request(RequestHookInterface):
def before_request(self, request, context):
# Start span
span = tracer.start_span(
f"HTTP {request.method.value} {request.url}"
)
span.set_attribute("http.method", request.method.value)
span.set_attribute("http.url", request.url)
context.metadata['otel_span'] = span
# Inject trace context
from opentelemetry.propagate import inject
headers = dict(request.headers or {})
inject(headers)
return request.with_headers(headers)
class Response(ResponseHookInterface):
def after_response(self, response, context):
span = context.metadata.get('otel_span')
if span:
span.set_attribute("http.status_code", response.status_code)
span.set_attribute("http.response_size", len(response.content))
span.end()
return response
Jaeger Integration
from jaeger_client import Config
from requestforge.interfaces import RequestHookInterface
def init_jaeger_tracer(service_name):
config = Config(
config={
'sampler': {'type': 'const', 'param': 1},
'logging': True,
},
service_name=service_name,
)
return config.initialize_tracer()
class JaegerTracingHook(RequestHookInterface):
def __init__(self, tracer):
self.tracer = tracer
def before_request(self, request, context):
span = self.tracer.start_span(f'{request.method.value} {request.url}')
span.set_tag('http.method', request.method.value)
span.set_tag('http.url', request.url)
context.metadata['jaeger_span'] = span
return request
Next Steps
Explore Django Integration Examples for framework integration
Learn about Design Principles for design rationale
Check Development Guide for contribution guidelines