Advanced Usage

This guide covers advanced features and patterns for power users of HTTP Client.

Custom Session Configuration

Using Shared Session

Share a session across multiple client instances:

import requests
from requestforge import HttpClient, HttpClientConfigBuilder

# Create custom session
session = requests.Session()
session.headers.update({
    'User-Agent': 'MyCustomAgent/1.0',
    'Accept-Encoding': 'gzip, deflate'
})

# Configure session adapter
from requests.adapters import HTTPAdapter
adapter = HTTPAdapter(
    pool_connections=20,
    pool_maxsize=50,
    max_retries=0
)
session.mount('https://', adapter)
session.mount('http://', adapter)

# Share session across clients
config = HttpClientConfigBuilder().with_base_url('https://api.example.com').build()

client1 = HttpClient(config, session=session)
client2 = HttpClient(config, session=session)

# Both clients share the same connection pool
response1 = client1.get('/users')
response2 = client2.get('/posts')

Session Cookies

Persist cookies across requests:

session = requests.Session()

# Set initial cookies
session.cookies.set('session_id', 'abc123', domain='api.example.com')

client = HttpClient(config, session=session)

# Cookies automatically sent with requests
response = client.get('/profile')

# Access response cookies
print(session.cookies.get_dict())

Custom Request Adapters

Implementing Custom Transport

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class CustomAdapter(HTTPAdapter):
    """Custom adapter with additional features."""

    def __init__(self, timeout=30, *args, **kwargs):
        self.timeout = timeout
        super().__init__(*args, **kwargs)

    def send(self, request, **kwargs):
        # Set default timeout if not specified
        if 'timeout' not in kwargs:
            kwargs['timeout'] = self.timeout

        # Add custom header
        request.headers['X-Custom-Transport'] = 'v1.0'

        return super().send(request, **kwargs)

# Use custom adapter
session = requests.Session()
adapter = CustomAdapter(timeout=60)
session.mount('https://', adapter)
session.mount('http://', adapter)

client = HttpClient(config, session=session)

SSL/TLS Configuration

Custom SSL Context

import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context

class SSLAdapter(HTTPAdapter):
    """Adapter with custom SSL configuration."""

    def init_poolmanager(self, *args, **kwargs):
        context = create_urllib3_context()
        context.load_default_certs()
        context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
        kwargs['ssl_context'] = context
        return super().init_poolmanager(*args, **kwargs)

session = requests.Session()
session.mount('https://', SSLAdapter())

client = HttpClient(config, session=session)

Client Certificates

Authenticate with client certificates:

session = requests.Session()
session.cert = ('/path/to/client.crt', '/path/to/client.key')

client = HttpClient(config, session=session)
response = client.get('/secure-endpoint')

Custom CA Bundle

Use custom certificate authority:

session = requests.Session()
session.verify = '/path/to/custom-ca-bundle.crt'

client = HttpClient(config, session=session)

Proxy Configuration

HTTP/HTTPS Proxies

session = requests.Session()
session.proxies = {
    'http': 'http://proxy.example.com:8080',
    'https': 'https://proxy.example.com:8080',
}

client = HttpClient(config, session=session)

SOCKS Proxy

Requires requests[socks]:

session = requests.Session()
session.proxies = {
    'http': 'socks5://user:pass@proxy.example.com:1080',
    'https': 'socks5://user:pass@proxy.example.com:1080'
}

client = HttpClient(config, session=session)

Proxy with Authentication

session = requests.Session()
session.proxies = {
    'http': 'http://user:password@proxy.example.com:8080',
    'https': 'http://user:password@proxy.example.com:8080',
}

Request/Response Transformation

Custom Request Transformer

from requestforge.interfaces import RequestHookInterface

class RequestTransformerHook(RequestHookInterface):
    """Transform request data before sending."""

    def before_request(self, request, context):
        # Convert snake_case to camelCase for API
        if request.json_data:
            transformed = self._to_camel_case(request.json_data)
            return request.with_json_data(transformed)
        return request

    def _to_camel_case(self, data):
        if isinstance(data, dict):
            return {
                self._camel_case_key(k): self._to_camel_case(v)
                for k, v in data.items()
            }
        elif isinstance(data, list):
            return [self._to_camel_case(item) for item in data]
        return data

    def _camel_case_key(self, key):
        components = key.split('_')
        return components[0] + ''.join(x.title() for x in components[1:])

Custom Response Parser

from requestforge.interfaces import ResponseHookInterface

class ResponseParserHook(ResponseHookInterface):
    """Parse and transform response data."""

    def after_response(self, response, context):
        if response.is_success and response.headers.get('Content-Type', '').startswith('application/json'):
            # Parse and store in context
            data = response.json_or_none()
            if data:
                # Transform camelCase to snake_case
                transformed = self._to_snake_case(data)
                context.metadata['parsed_data'] = transformed

        return response

    def _to_snake_case(self, data):
        # Implementation similar to above

Dynamic Configuration

Environment-Based Configuration

import os
from requestforge import HttpClientConfigBuilder

def create_config_for_environment():
    """Create configuration based on environment."""
    env = os.getenv('ENVIRONMENT', 'development')

    builder = HttpClientConfigBuilder()

    if env == 'production':
        builder = (
            builder
            .with_base_url('https://api.production.com')
            .with_retry(max_retries=5, base_delay=2.0)
            .with_pool_connection(50)
            .with_pool_maxsize(100)
            .with_verify_ssl(True)
        )
    elif env == 'staging':
        builder = (
            builder
            .with_base_url('https://api.staging.com')
            .with_retry(max_retries=3)
            .with_logging(log_headers=True)
        )
    else:  # development
        builder = (
            builder
            .with_base_url('http://localhost:8000')
            .with_retry(max_retries=1)
            .with_logging(log_headers=True, log_body=True)
            .with_verify_ssl(False)
        )

    return builder.build()

# Usage
config = create_config_for_environment()
client = HttpClient(config)

Feature Flags

class FeatureFlagHook(RequestHookInterface):
    """Add feature flags to requests."""

    def __init__(self, feature_flags):
        self.flags = feature_flags

    def before_request(self, request, context):
        # Add active feature flags as header
        active_flags = ','.join([
            flag for flag, enabled in self.flags.items()
            if enabled
        ])

        return request.with_headers({
            'X-Feature-Flags': active_flags
        })

# Usage
flags = {
    'new_api_v2': True,
    'experimental_feature': False,
    'beta_endpoint': True
}

config = (
    builder
    .with_request_hook(FeatureFlagHook(flags))
    .build()
)

Request Signing

HMAC Signing

import hmac
import hashlib
import time
from requestforge.interfaces import RequestHookInterface

class HMACSigningHook(RequestHookInterface):
    """Sign requests with HMAC."""

    def __init__(self, access_key, secret_key):
        self.access_key = access_key
        self.secret_key = secret_key.encode()

    def before_request(self, request, context):
        timestamp = str(int(time.time()))

        # Create signature string
        sign_string = '\n'.join([
            request.method.value,
            request.url,
            timestamp,
            request.json_data or ''
        ])

        # Generate signature
        signature = hmac.new(
            self.secret_key,
            sign_string.encode(),
            hashlib.sha256
        ).hexdigest()

        return request.with_headers({
            'X-Access-Key': self.access_key,
            'X-Timestamp': timestamp,
            'X-Signature': signature
        })

AWS Signature V4

from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import boto3

class AWSSignatureHook(RequestHookInterface):
    """Sign requests with AWS Signature V4."""

    def __init__(self, service_name, region_name='us-east-1'):
        session = boto3.Session()
        credentials = session.get_credentials()
        self.signer = SigV4Auth(credentials, service_name, region_name)

    def before_request(self, request, context):
        # Convert to AWS request
        aws_request = AWSRequest(
            method=request.method.value,
            url=request.url,
            data=request.json_data,
            headers=request.headers or {}
        )

        # Sign request
        self.signer.add_auth(aws_request)

        # Return with signed headers
        return request.with_headers(dict(aws_request.headers))

Response Caching

In-Memory Cache

from requestforge.interfaces import ResponseHookInterface
import hashlib
import time

class ResponseCacheHook(ResponseHookInterface):
    """Cache successful GET responses in memory."""

    def __init__(self, ttl=300):
        self.cache = {}
        self.ttl = ttl

    def after_response(self, response, context):
        if (response.request.method == HttpMethod.GET and
            response.is_success):

            cache_key = self._get_cache_key(response.request)
            self.cache[cache_key] = {
                'response': response,
                'timestamp': time.time()
            }

        return response

    def get_cached(self, request):
        """Check if cached response exists."""
        cache_key = self._get_cache_key(request)
        cached = self.cache.get(cache_key)

        if cached:
            age = time.time() - cached['timestamp']
            if age < self.ttl:
                return cached['response']
            else:
                del self.cache[cache_key]

        return None

    def _get_cache_key(self, request):
        """Generate cache key from request."""
        content = f"{request.url}:{request.params}"
        return hashlib.md5(content.encode()).hexdigest()

Redis Cache

import redis
import pickle
from requestforge.interfaces import ResponseHookInterface

class RedisCacheHook(ResponseHookInterface):
    """Cache responses in Redis."""

    def __init__(self, redis_client, ttl=300):
        self.redis = redis_client
        self.ttl = ttl

    def after_response(self, response, context):
        if (response.request.method == HttpMethod.GET and
            response.is_success):

            cache_key = self._get_cache_key(response.request)

            # Serialize response
            cached_data = {
                'status_code': response.status_code,
                'headers': dict(response.headers),
                'content': response.content,
                'elapsed_ms': response.elapsed_ms
            }

            self.redis.setex(
                cache_key,
                self.ttl,
                pickle.dumps(cached_data)
            )

        return response

    def get_cached(self, request):
        """Retrieve cached response."""
        cache_key = self._get_cache_key(request)
        cached = self.redis.get(cache_key)

        if cached:
            data = pickle.loads(cached)
            # Reconstruct HttpResponse
            return HttpResponse(**data, request=request)

        return None

Request Batching

Batch Multiple Requests

from requestforge import HttpRequest, HttpMethod

class BatchRequestManager:
    """Batch multiple requests into single API call."""

    def __init__(self, client, batch_endpoint='/batch'):
        self.client = client
        self.batch_endpoint = batch_endpoint
        self.pending = []

    def add(self, request):
        """Add request to batch."""
        self.pending.append(request)

    def execute(self):
        """Execute all pending requests as batch."""
        if not self.pending:
            return []

        # Convert to batch format
        batch_request = {
            'requests': [
                {
                    'method': req.method.value,
                    'url': req.url,
                    'body': req.json_data
                }
                for req in self.pending
            ]
        }

        # Send batch request
        response = self.client.post(
            self.batch_endpoint,
            json_data=batch_request
        )

        self.pending = []
        return response.json()['responses']

# Usage
batch = BatchRequestManager(client)
batch.add(HttpRequest(method=HttpMethod.GET, url='/users/1'))
batch.add(HttpRequest(method=HttpMethod.GET, url='/users/2'))
batch.add(HttpRequest(method=HttpMethod.GET, url='/users/3'))

responses = batch.execute()

GraphQL Support

GraphQL Client Wrapper

class GraphQLClient:
    """GraphQL client wrapper."""

    def __init__(self, client, endpoint='/graphql'):
        self.client = client
        self.endpoint = endpoint

    def query(self, query, variables=None, operation_name=None):
        """Execute GraphQL query."""
        payload = {'query': query}

        if variables:
            payload['variables'] = variables

        if operation_name:
            payload['operationName'] = operation_name

        response = self.client.post(self.endpoint, json_data=payload)

        if not response.is_success:
            raise HttpStatusException(
                message=f"GraphQL request failed: {response.status_code}",
                status_code=response.status_code,
                response_body=response.text
            )

        data = response.json()

        if 'errors' in data:
            raise Exception(f"GraphQL errors: {data['errors']}")

        return data['data']

    def mutate(self, mutation, variables=None):
        """Execute GraphQL mutation."""
        return self.query(mutation, variables)

# Usage
config = HttpClientConfigBuilder().with_base_url('https://api.example.com').build()
requestforge = HttpClient(config)
gql_client = GraphQLClient(requestforge)

# Query
query = """
    query GetUser($id: ID!) {
        user(id: $id) {
            id
            name
            email
        }
    }
"""

result = gql_client.query(query, variables={'id': '123'})
print(result['user']['name'])

# Mutation
mutation = """
    mutation CreateUser($input: CreateUserInput!) {
        createUser(input: $input) {
            id
            name
        }
    }
"""

result = gql_client.mutate(mutation, variables={
    'input': {
        'name': 'John Doe',
        'email': 'john@example.com'
    }
})

Streaming Responses

Download Large Files

def download_large_file(client, url, output_path, chunk_size=8192):
    """Download large file with progress tracking."""
    import requests

    # Use raw requests for streaming
    response = client.session.get(
        client._build_url(url),
        stream=True,
        headers=client._config.default_headers
    )

    total_size = int(response.headers.get('content-length', 0))
    downloaded = 0

    with open(output_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:
                f.write(chunk)
                downloaded += len(chunk)

                # Progress
                if total_size:
                    percent = (downloaded / total_size) * 100
                    print(f"Downloaded: {percent:.1f}%", end='\r')

    print(f"\nDownload complete: {output_path}")

Stream JSON Lines

def stream_json_lines(client, url):
    """Stream and parse JSONL (JSON Lines) responses."""
    import json

    response = client.session.get(
        client._build_url(url),
        stream=True
    )

    for line in response.iter_lines():
        if line:
            yield json.loads(line.decode('utf-8'))

# Usage
for item in stream_json_lines(client, '/api/stream'):
    process_item(item)

Performance Monitoring

Request Timing Hook

from requestforge.interfaces import RequestHookInterface, ResponseHookInterface
import time

class TimingHooks:
    class Request(RequestHookInterface):
        def before_request(self, request, context):
            context.metadata['timing'] = {
                'start': time.time(),
                'url': request.url
            }
            return request

    class Response(ResponseHookInterface):
        def after_response(self, response, context):
            timing = context.metadata.get('timing', {})
            if timing:
                total_time = time.time() - timing['start']
                server_time = response.elapsed_ms / 1000
                network_time = total_time - server_time

                print(f"URL: {timing['url']}")
                print(f"  Total: {total_time:.3f}s")
                print(f"  Server: {server_time:.3f}s")
                print(f"  Network: {network_time:.3f}s")

            return response

Memory Usage Tracking

import sys
from requestforge.interfaces import ResponseHookInterface

class MemoryTrackingHook(ResponseHookInterface):
    def after_response(self, response, context):
        size_bytes = len(response.content)
        size_mb = size_bytes / (1024 * 1024)

        context.metadata['response_size'] = size_bytes

        if size_mb > 10:
            logger.warning(
                f"Large response: {size_mb:.2f}MB from {response.url}"
            )

        return response

Distributed Tracing

OpenTelemetry Integration

from opentelemetry import trace
from requestforge.interfaces import RequestHookInterface, ResponseHookInterface

tracer = trace.get_tracer(__name__)

class OpenTelemetryHooks:
    class Request(RequestHookInterface):
        def before_request(self, request, context):
            # Start span
            span = tracer.start_span(
                f"HTTP {request.method.value} {request.url}"
            )

            span.set_attribute("http.method", request.method.value)
            span.set_attribute("http.url", request.url)

            context.metadata['otel_span'] = span

            # Inject trace context
            from opentelemetry.propagate import inject
            headers = dict(request.headers or {})
            inject(headers)

            return request.with_headers(headers)

    class Response(ResponseHookInterface):
        def after_response(self, response, context):
            span = context.metadata.get('otel_span')
            if span:
                span.set_attribute("http.status_code", response.status_code)
                span.set_attribute("http.response_size", len(response.content))
                span.end()

            return response

Jaeger Integration

from jaeger_client import Config
from requestforge.interfaces import RequestHookInterface

def init_jaeger_tracer(service_name):
    config = Config(
        config={
            'sampler': {'type': 'const', 'param': 1},
            'logging': True,
        },
        service_name=service_name,
    )
    return config.initialize_tracer()

class JaegerTracingHook(RequestHookInterface):
    def __init__(self, tracer):
        self.tracer = tracer

    def before_request(self, request, context):
        span = self.tracer.start_span(f'{request.method.value} {request.url}')
        span.set_tag('http.method', request.method.value)
        span.set_tag('http.url', request.url)

        context.metadata['jaeger_span'] = span

        return request

Next Steps