Custom Retry Examples

This page demonstrates custom retry strategies and patterns.

Custom Retry Strategy

Business Hours Retry

from requestforge.interfaces import RetryStrategyInterface
from requestforge.models import RequestContext
from requestforge import TimeoutException, ConnectionException
import datetime

class BusinessHoursRetryStrategy(RetryStrategyInterface):
    """Only retry during business hours (9 AM - 5 PM)."""

    def __init__(self, max_retries=3, base_delay=1.0):
        self._max_retries = max_retries
        self._base_delay = base_delay

    @property
    def max_retries(self):
        return self._max_retries

    def should_retry(self, context, exception):
        # Check max retries
        if context.attempt >= self._max_retries:
            return False

        # Only retry during business hours
        now = datetime.datetime.now()
        if not (9 <= now.hour < 17):  # 9 AM - 5 PM
            return False

        # Only retry timeout and connection errors
        return isinstance(exception, (TimeoutException, ConnectionException))

    def get_delay(self, context):
        # Exponential backoff
        return self._base_delay * (2 ** context.attempt)

# Usage
from requestforge import HttpClientConfigBuilder, HttpClient

config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_retry_strategy(BusinessHoursRetryStrategy(max_retries=3))
    .build()
)

client = HttpClient(config)

Conditional Retry Strategy

from requestforge.interfaces import RetryStrategyInterface
from requestforge import HttpStatusException

class ConditionalRetryStrategy(RetryStrategyInterface):
    """Retry based on response content."""

    def __init__(self, max_retries=3):
        self._max_retries = max_retries

    @property
    def max_retries(self):
        return self._max_retries

    def should_retry(self, context, exception):
        if context.attempt >= self._max_retries:
            return False

        # Only retry HTTP status exceptions
        if not isinstance(exception, HttpStatusException):
            return False

        # Check response body for retryable errors
        if exception.response_body:
            body_lower = exception.response_body.lower()

            # Retry if error is temporary
            if 'temporary' in body_lower or 'retry' in body_lower:
                return True

            # Don't retry if error is permanent
            if 'permanent' in body_lower or 'invalid' in body_lower:
                return False

        # Retry 5xx errors
        return 500 <= exception.status_code < 600

    def get_delay(self, context):
        return 2.0 ** context.attempt

Adaptive Retry Strategy

from requestforge.interfaces import RetryStrategyInterface
import statistics

class AdaptiveRetryStrategy(RetryStrategyInterface):
    """Adjust retry delay based on response times."""

    def __init__(self, max_retries=5):
        self._max_retries = max_retries
        self._response_times = []

    @property
    def max_retries(self):
        return self._max_retries

    def should_retry(self, context, exception):
        return context.attempt < self._max_retries

    def get_delay(self, context):
        # Record response time from context if available
        if 'response_time' in context.metadata:
            self._response_times.append(context.metadata['response_time'])
            # Keep only last 10 measurements
            self._response_times = self._response_times[-10:]

        # Calculate delay based on average response time
        if self._response_times:
            avg_time = statistics.mean(self._response_times)
            # Delay is proportional to average response time
            return min(avg_time * 0.5, 60.0)
        else:
            # Default delay
            return 1.0 * (2 ** context.attempt)

Rate-Limit Aware Retry

from requestforge.interfaces import RetryStrategyInterface
from requestforge import HttpStatusException
import time

class RateLimitRetryStrategy(RetryStrategyInterface):
    """Respect rate limit headers."""

    def __init__(self, max_retries=5):
        self._max_retries = max_retries

    @property
    def max_retries(self):
        return self._max_retries

    def should_retry(self, context, exception):
        if context.attempt >= self._max_retries:
            return False

        # Always retry rate limit errors
        if isinstance(exception, HttpStatusException):
            if exception.status_code == 429:  # Too Many Requests
                return True

        # Retry other transient errors
        from requestforge import TimeoutException, ConnectionException
        return isinstance(exception, (TimeoutException, ConnectionException))

    def get_delay(self, context):
        # Check for Retry-After header in context
        retry_after = context.metadata.get('retry_after')

        if retry_after:
            # Use Retry-After header value
            try:
                return float(retry_after)
            except ValueError:
                # Could be HTTP-date
                pass

        # Check for rate limit reset timestamp
        rate_limit_reset = context.metadata.get('rate_limit_reset')
        if rate_limit_reset:
            delay = rate_limit_reset - time.time()
            return max(delay, 0)

        # Default exponential backoff
        return 2.0 ** context.attempt

Custom Retry with Hooks

Capture Rate Limit Headers

from requestforge.interfaces import ResponseHookInterface, ErrorHookInterface

class RateLimitHook(ResponseHookInterface):
    """Capture rate limit information."""

    def after_response(self, response, context):
        # Store rate limit headers in context
        if 'Retry-After' in response.headers:
            context.metadata['retry_after'] = response.headers['Retry-After']

        if 'X-RateLimit-Reset' in response.headers:
            context.metadata['rate_limit_reset'] = int(
                response.headers['X-RateLimit-Reset']
            )

        return response

class RateLimitErrorHook(ErrorHookInterface):
    """Log rate limit errors."""

    def on_error(self, exception, context):
        from requestforge import HttpStatusException

        if isinstance(exception, HttpStatusException):
            if exception.status_code == 429:
                retry_after = context.metadata.get('retry_after', 'unknown')
                logger.warning(
                    f"Rate limited. Retry after: {retry_after}s"
                )

# Configure client
config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_retry_strategy(RateLimitRetryStrategy())
    .with_response_hook(RateLimitHook())
    .with_error_hook(RateLimitErrorHook())
    .build()
)

Timing-Based Retry

from requestforge.interfaces import RequestHookInterface, ResponseHookInterface
import time

class TimingHooks:
    class Request(RequestHookInterface):
        def before_request(self, request, context):
            context.metadata['request_start'] = time.time()
            return request

    class Response(ResponseHookInterface):
        def after_response(self, response, context):
            start = context.metadata.get('request_start')
            if start:
                duration = time.time() - start
                context.metadata['response_time'] = duration
            return response

# Use with adaptive retry
config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_retry_strategy(AdaptiveRetryStrategy())
    .with_request_hook(TimingHooks.Request())
    .with_response_hook(TimingHooks.Response())
    .build()
)

Retry with Fallback

Fallback to Different Endpoint

from requestforge import HttpClient, MaxRetryException

class FallbackClient:
    """Client with fallback to secondary endpoint."""

    def __init__(self, primary_url, fallback_url):
        # Primary client with retry
        self.primary = HttpClient(
            HttpClientConfigBuilder()
            .with_base_url(primary_url)
            .with_retry(max_retries=3)
            .build()
        )

        # Fallback client (no retry)
        self.fallback = HttpClient(
            HttpClientConfigBuilder()
            .with_base_url(fallback_url)
            .build()
        )

    def get(self, path, **kwargs):
        try:
            # Try primary
            return self.primary.get(path, **kwargs)
        except MaxRetryException:
            # Fall back to secondary
            logger.warning(f"Primary failed, using fallback for {path}")
            return self.fallback.get(path, **kwargs)

# Usage
client = FallbackClient(
    primary_url='https://api.example.com',
    fallback_url='https://backup-api.example.com'
)

response = client.get('/users')

Circuit Breaker Implementation

Custom Circuit Breaker

from requestforge.interfaces import ErrorHookInterface
import time

class CircuitBreaker:
    """Circuit breaker with open/half-open/closed states."""

    def __init__(self, failure_threshold=5, timeout=60, half_open_calls=3):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.half_open_calls = half_open_calls
        self.failures = 0
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half_open
        self.half_open_successes = 0

    def record_success(self):
        if self.state == 'half_open':
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_calls:
                # Close circuit
                self.state = 'closed'
                self.failures = 0
                self.half_open_successes = 0
        elif self.state == 'closed':
            # Reset failure count on success
            self.failures = 0

    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()

        if self.failures >= self.failure_threshold:
            # Open circuit
            self.state = 'open'
            logger.warning(f"Circuit opened after {self.failures} failures")

    def can_attempt(self):
        if self.state == 'closed':
            return True

        if self.state == 'open':
            # Check if timeout passed
            if time.time() - self.last_failure_time >= self.timeout:
                # Move to half-open
                self.state = 'half_open'
                self.half_open_successes = 0
                return True
            return False

        if self.state == 'half_open':
            return True

        return False

class CircuitBreakerHook(ErrorHookInterface):
    def __init__(self, circuit_breaker):
        self.cb = circuit_breaker

    def on_error(self, exception, context):
        self.cb.record_failure()

# Usage
circuit_breaker = CircuitBreaker()

config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_error_hook(CircuitBreakerHook(circuit_breaker))
    .build()
)

client = HttpClient(config)

# Check circuit before request
if circuit_breaker.can_attempt():
    try:
        response = client.get('/data')
        circuit_breaker.record_success()
    except HttpClientException:
        pass  # Circuit breaker recorded failure
else:
    print("Circuit is open, request not sent")

Retry Metrics

Track Retry Statistics

from requestforge.interfaces import ErrorHookInterface
from collections import defaultdict

class RetryMetricsHook(ErrorHookInterface):
    """Track retry metrics."""

    def __init__(self):
        self.retry_counts = defaultdict(int)
        self.error_types = defaultdict(int)
        self.total_retries = 0

    def on_error(self, exception, context):
        # Track retry attempt
        if context.is_retry:
            self.total_retries += 1
            self.retry_counts[context.attempt] += 1

        # Track error type
        error_type = type(exception).__name__
        self.error_types[error_type] += 1

    def get_stats(self):
        return {
            'total_retries': self.total_retries,
            'retry_distribution': dict(self.retry_counts),
            'error_types': dict(self.error_types)
        }

# Usage
metrics = RetryMetricsHook()

config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_retry(max_retries=3)
    .with_error_hook(metrics)
    .build()
)

client = HttpClient(config)

# Make requests
for i in range(100):
    try:
        client.get(f'/data/{i}')
    except HttpClientException:
        pass

# Get statistics
stats = metrics.get_stats()
print(f"Total retries: {stats['total_retries']}")
print(f"Retry distribution: {stats['retry_distribution']}")
print(f"Error types: {stats['error_types']}")

Exponential Backoff with Jitter

Custom Jitter Implementation

from requestforge.interfaces import RetryStrategyInterface
import random

class CustomJitterRetryStrategy(RetryStrategyInterface):
    """Exponential backoff with full jitter."""

    def __init__(self, max_retries=3, base_delay=1.0, max_delay=60.0):
        self._max_retries = max_retries
        self._base_delay = base_delay
        self._max_delay = max_delay

    @property
    def max_retries(self):
        return self._max_retries

    def should_retry(self, context, exception):
        return context.attempt < self._max_retries

    def get_delay(self, context):
        # Calculate exponential delay
        delay = self._base_delay * (2 ** context.attempt)
        delay = min(delay, self._max_delay)

        # Full jitter: random value between 0 and delay
        return random.uniform(0, delay)

Decorrelated Jitter

from requestforge.interfaces import RetryStrategyInterface
import random

class DecorrelatedJitterRetryStrategy(RetryStrategyInterface):
    """Decorrelated jitter algorithm."""

    def __init__(self, max_retries=3, base_delay=1.0, max_delay=60.0):
        self._max_retries = max_retries
        self._base_delay = base_delay
        self._max_delay = max_delay
        self._last_delay = base_delay

    @property
    def max_retries(self):
        return self._max_retries

    def should_retry(self, context, exception):
        return context.attempt < self._max_retries

    def get_delay(self, context):
        # Decorrelated jitter
        delay = random.uniform(
            self._base_delay,
            self._last_delay * 3
        )
        delay = min(delay, self._max_delay)
        self._last_delay = delay
        return delay

Testing Retry Logic

Test Retry Behavior

import pytest
import responses
from requestforge import (
    HttpClient,
    HttpClientConfigBuilder,
    ExponentialBackoffRetryStrategy,
    MaxRetryException
)

@responses.activate
def test_retry_on_500():
    # First two calls fail, third succeeds
    responses.add(responses.GET, 'https://api.example.com/data', status=500)
    responses.add(responses.GET, 'https://api.example.com/data', status=500)
    responses.add(
        responses.GET,
        'https://api.example.com/data',
        json={'result': 'success'},
        status=200
    )

    strategy = ExponentialBackoffRetryStrategy(
        max_retries=3,
        base_delay=0.01  # Fast retry for testing
    )

    config = (
        HttpClientConfigBuilder()
        .with_base_url('https://api.example.com')
        .with_retry_strategy(strategy)
        .build()
    )

    client = HttpClient(config)
    response = client.get('/data')

    assert response.status_code == 200
    assert len(responses.calls) == 3  # 2 failures + 1 success

@responses.activate
def test_max_retries_exceeded():
    # All calls fail
    for _ in range(10):
        responses.add(
            responses.GET,
            'https://api.example.com/data',
            status=500
        )

    strategy = ExponentialBackoffRetryStrategy(max_retries=3)
    config = (
        HttpClientConfigBuilder()
        .with_base_url('https://api.example.com')
        .with_retry_strategy(strategy)
        .build()
    )

    client = HttpClient(config)

    with pytest.raises(MaxRetryException) as exc_info:
        client.get('/data')

    assert exc_info.value.attempts == 4  # 1 initial + 3 retries

See Also