Concurrent Requests

HTTP Client provides built-in support for executing multiple requests concurrently using thread pools.

Overview

The request_many() method allows you to execute multiple HTTP requests in parallel, improving throughput for batch operations.

Benefits:

  • Execute multiple requests simultaneously

  • Configurable number of worker threads

  • Results returned in original request order

  • Optional fail-fast mode

  • Thread-safe execution

Basic Usage

Simple Concurrent Requests

from requestforge import HttpClient, HttpRequest, HttpMethod

client = create_client('https://api.example.com')

# Create multiple requests
requests = [
    HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
    for i in range(1, 11)
]

# Execute concurrently with 5 workers
results = client.request_many(requests, max_workers=5)

# Process results
for index, result in results:
    if isinstance(result, HttpResponse):
        print(f"Request {index}: Success - {result.status_code}")
        user = result.json()
        print(f"  User: {user['name']}")
    else:
        print(f"Request {index}: Failed - {result}")

Fail-Fast Mode

Stop execution on first error:

from requestforge import HttpClientException

try:
    results = client.request_many(
        requests,
        max_workers=10,
        fail_fast=True  # Stop on first error
    )

    # All requests succeeded
    for index, response in results:
        process_response(response)

except HttpClientException as e:
    print(f"Request failed: {e}")
    # Processing stopped at first error

Configuration

Worker Threads

Control the number of concurrent requests:

# Conservative: 3 concurrent requests
results = client.request_many(requests, max_workers=3)

# Moderate: 10 concurrent requests (default: 5)
results = client.request_many(requests, max_workers=10)

# Aggressive: 20 concurrent requests
results = client.request_many(requests, max_workers=20)

Choosing max_workers:

  • Too few: Slower execution, underutilized resources

  • Too many: May overwhelm server, hit rate limits

  • Rule of thumb: Start with 5-10, adjust based on API rate limits

Request Order

Results are yielded as they complete, but include the original index:

requests = [
    HttpRequest(method=HttpMethod.GET, url='/slow-endpoint'),   # 0 - Takes 5s
    HttpRequest(method=HttpMethod.GET, url='/fast-endpoint'),   # 1 - Takes 1s
    HttpRequest(method=HttpMethod.GET, url='/medium-endpoint'), # 2 - Takes 3s
]

for index, result in client.request_many(requests):
    print(f"Completed request {index}")

# Output might be:
# Completed request 1  (fast endpoint finished first)
# Completed request 2  (medium endpoint finished second)
# Completed request 0  (slow endpoint finished last)

Sorting Results

Collect and sort by original index:

# Collect all results
results = list(client.request_many(requests, max_workers=10))

# Sort by index
results.sort(key=lambda x: x[0])

# Process in order
for index, response in results:
    print(f"Processing request {index}")

Common Patterns

Batch API Calls

Fetch multiple resources:

from requestforge import HttpRequest, HttpMethod, HttpClientException

def fetch_users_batch(user_ids):
    """Fetch multiple users concurrently."""
    requests = [
        HttpRequest(method=HttpMethod.GET, url=f'/users/{user_id}')
        for user_id in user_ids
    ]

    users = []
    errors = []

    for index, result in client.request_many(requests, max_workers=10):
        if isinstance(result, HttpClientException):
            errors.append({
                'user_id': user_ids[index],
                'error': str(result)
            })
        else:
            users.append(result.json())

    return users, errors

# Usage
users, errors = fetch_users_batch([1, 2, 3, 4, 5])
print(f"Fetched {len(users)} users, {len(errors)} errors")

Parallel POST Requests

Create multiple resources:

def create_users_batch(users_data):
    """Create multiple users concurrently."""
    requests = [
        HttpRequest(
            method=HttpMethod.POST,
            url='/users',
            json_data=user_data
        )
        for user_data in users_data
    ]

    created_users = []
    failures = []

    for index, result in client.request_many(requests, max_workers=5):
        if isinstance(result, HttpResponse) and result.is_success:
            created_users.append(result.json())
        else:
            failures.append({
                'data': users_data[index],
                'error': result if isinstance(result, Exception) else f"HTTP {result.status_code}"
            })

    return created_users, failures

Mixed Request Types

Different HTTP methods in one batch:

requests = [
    # GET requests
    HttpRequest(method=HttpMethod.GET, url='/users/1'),
    HttpRequest(method=HttpMethod.GET, url='/users/2'),

    # POST requests
    HttpRequest(
        method=HttpMethod.POST,
        url='/users',
        json_data={'name': 'John'}
    ),

    # PUT requests
    HttpRequest(
        method=HttpMethod.PUT,
        url='/users/3',
        json_data={'name': 'Jane Updated'}
    ),

    # DELETE requests
    HttpRequest(method=HttpMethod.DELETE, url='/users/4'),
]

results = client.request_many(requests, max_workers=10)

Pagination with Concurrency

Fetch multiple pages in parallel:

def fetch_all_pages(base_url, total_pages):
    """Fetch all pages concurrently."""
    requests = [
        HttpRequest(
            method=HttpMethod.GET,
            url=base_url,
            params={'page': page, 'limit': 100}
        )
        for page in range(1, total_pages + 1)
    ]

    all_items = []

    for index, response in client.request_many(requests, max_workers=10):
        if isinstance(response, HttpResponse) and response.is_success:
            data = response.json()
            all_items.extend(data.get('items', []))

    return all_items

# First, get total count
response = client.get('/items', params={'page': 1, 'limit': 100})
total = response.json()['total']
total_pages = (total + 99) // 100  # Ceiling division

# Fetch all pages
items = fetch_all_pages('/items', total_pages)

Error Handling

Collecting Errors

from requestforge import HttpClientException, HttpResponse

successful_responses = []
failed_requests = []

for index, result in client.request_many(requests, max_workers=10):
    if isinstance(result, HttpResponse):
        if result.is_success:
            successful_responses.append((index, result))
        else:
            failed_requests.append({
                'index': index,
                'status_code': result.status_code,
                'error': result.text
            })
    elif isinstance(result, HttpClientException):
        failed_requests.append({
            'index': index,
            'error': str(result),
            'exception_type': type(result).__name__
        })

print(f"Success: {len(successful_responses)}, Failed: {len(failed_requests)}")

Partial Retry

Retry only failed requests:

from requestforge import HttpClientException

def fetch_with_retry(requests, max_retries=2):
    """Fetch requests with retry for failures."""
    remaining_requests = list(enumerate(requests))
    all_results = {}

    for attempt in range(max_retries + 1):
        if not remaining_requests:
            break

        print(f"Attempt {attempt + 1}: {len(remaining_requests)} requests")

        # Execute current batch
        batch = [req for _, req in remaining_requests]
        results = client.request_many(batch, max_workers=10, fail_fast=False)

        # Process results
        failed = []
        for result_index, result in results:
            original_index = remaining_requests[result_index][0]

            if isinstance(result, HttpResponse) and result.is_success:
                all_results[original_index] = result
            else:
                if attempt < max_retries:
                    failed.append(remaining_requests[result_index])

        remaining_requests = failed

    return all_results

Error Statistics

Track error types:

from collections import Counter
from requestforge import (
    TimeoutException,
    ConnectionException,
    HttpStatusException
)

error_types = Counter()
status_codes = Counter()

for index, result in client.request_many(requests, max_workers=10):
    if isinstance(result, TimeoutException):
        error_types['timeout'] += 1
    elif isinstance(result, ConnectionException):
        error_types['connection'] += 1
    elif isinstance(result, HttpStatusException):
        error_types['http_error'] += 1
        status_codes[result.status_code] += 1

print(f"Error types: {dict(error_types)}")
print(f"Status codes: {dict(status_codes)}")

Performance Optimization

Connection Pooling

Configure pool size for concurrent requests:

config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_pool_connection(20)   # Pool connections
    .with_pool_maxsize(50)      # Max pool size
    .build()
)

client = HttpClient(config)

# Now concurrent requests reuse connections
results = client.request_many(requests, max_workers=20)

Batching Requests

Process requests in batches to control memory:

def process_in_batches(all_requests, batch_size=100, max_workers=10):
    """Process requests in batches."""
    all_results = []

    for i in range(0, len(all_requests), batch_size):
        batch = all_requests[i:i + batch_size]
        print(f"Processing batch {i // batch_size + 1}")

        batch_results = list(
            client.request_many(batch, max_workers=max_workers)
        )
        all_results.extend(batch_results)

    return all_results

# Process 1000 requests in batches of 100
results = process_in_batches(requests, batch_size=100)

Rate Limiting

Respect API rate limits:

import time
from threading import Semaphore

class RateLimitedClient:
    def __init__(self, client, requests_per_second=10):
        self.client = client
        self.min_interval = 1.0 / requests_per_second
        self.last_request_time = 0
        self.semaphore = Semaphore(1)

    def request(self, http_request):
        with self.semaphore:
            # Wait if needed
            now = time.time()
            time_since_last = now - self.last_request_time
            if time_since_last < self.min_interval:
                time.sleep(self.min_interval - time_since_last)

            result = self.client.request(http_request)
            self.last_request_time = time.time()
            return result

# Usage
rate_limited = RateLimitedClient(client, requests_per_second=10)

Progress Tracking

Progress Bar

Using tqdm for progress:

from tqdm import tqdm

requests = [
    HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
    for i in range(1, 101)
]

results = []
with tqdm(total=len(requests), desc="Fetching users") as pbar:
    for index, result in client.request_many(requests, max_workers=10):
        results.append((index, result))
        pbar.update(1)

Custom Progress

def fetch_with_progress(requests, max_workers=10):
    """Fetch with custom progress reporting."""
    total = len(requests)
    completed = 0
    successful = 0
    failed = 0

    print(f"Starting {total} requests with {max_workers} workers...")

    for index, result in client.request_many(requests, max_workers=max_workers):
        completed += 1

        if isinstance(result, HttpResponse) and result.is_success:
            successful += 1
        else:
            failed += 1

        # Print progress every 10 requests
        if completed % 10 == 0:
            print(f"Progress: {completed}/{total} "
                  f"(✓ {successful}, ✗ {failed})")

    print(f"Completed: {successful} successful, {failed} failed")

Advanced Patterns

Request Prioritization

Submit high-priority requests first:

from dataclasses import dataclass
from typing import Any

@dataclass
class PrioritizedRequest:
    priority: int
    request: HttpRequest
    metadata: Any = None

def execute_prioritized(prioritized_requests, max_workers=10):
    """Execute requests in priority order."""
    # Sort by priority (lower number = higher priority)
    sorted_requests = sorted(
        prioritized_requests,
        key=lambda x: x.priority
    )

    requests = [pr.request for pr in sorted_requests]

    results = {}
    for index, result in client.request_many(requests, max_workers=max_workers):
        original_request = sorted_requests[index]
        results[original_request.metadata] = result

    return results

# Usage
prioritized = [
    PrioritizedRequest(priority=1, request=HttpRequest(...), metadata='critical'),
    PrioritizedRequest(priority=5, request=HttpRequest(...), metadata='low'),
    PrioritizedRequest(priority=3, request=HttpRequest(...), metadata='medium'),
]

Request Deduplication

Avoid duplicate requests:

def deduplicate_requests(requests):
    """Remove duplicate requests based on URL."""
    seen = set()
    unique_requests = []
    index_map = {}

    for i, request in enumerate(requests):
        key = f"{request.method.value}:{request.url}"
        if key not in seen:
            seen.add(key)
            index_map[len(unique_requests)] = i
            unique_requests.append(request)

    return unique_requests, index_map

# Deduplicate
unique_requests, index_map = deduplicate_requests(requests)

# Execute only unique requests
results = list(client.request_many(unique_requests, max_workers=10))

# Map back to original indices
full_results = [None] * len(requests)
for unique_index, (_, result) in enumerate(results):
    original_index = index_map[unique_index]
    full_results[original_index] = result

Dynamic Worker Adjustment

Adjust workers based on response times:

import statistics

def adaptive_fetch(requests, initial_workers=5):
    """Adjust worker count based on performance."""
    batch_size = 50
    max_workers = initial_workers

    for i in range(0, len(requests), batch_size):
        batch = requests[i:i + batch_size]

        # Measure performance
        import time
        start = time.time()
        results = list(client.request_many(batch, max_workers=max_workers))
        duration = time.time() - start

        # Calculate average response time
        response_times = [
            r.elapsed_ms for _, r in results
            if isinstance(r, HttpResponse)
        ]

        if response_times:
            avg_time = statistics.mean(response_times)

            # Adjust workers (simple heuristic)
            if avg_time < 100:  # Fast responses
                max_workers = min(max_workers + 2, 20)
            elif avg_time > 500:  # Slow responses
                max_workers = max(max_workers - 2, 3)

            print(f"Avg response: {avg_time:.0f}ms, Workers: {max_workers}")

Testing

Mock Concurrent Requests

import pytest
from unittest.mock import Mock, patch
import responses

@responses.activate
def test_concurrent_requests():
    # Mock multiple endpoints
    for i in range(1, 11):
        responses.add(
            responses.GET,
            f'https://api.example.com/users/{i}',
            json={'id': i, 'name': f'User {i}'},
            status=200
        )

    client = create_client('https://api.example.com')

    requests = [
        HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
        for i in range(1, 11)
    ]

    results = list(client.request_many(requests, max_workers=5))

    assert len(results) == 10
    for index, response in results:
        assert isinstance(response, HttpResponse)
        assert response.is_success

Test Error Scenarios

@responses.activate
def test_concurrent_with_errors():
    # Some succeed, some fail
    responses.add(responses.GET, 'https://api.example.com/users/1', json={'id': 1}, status=200)
    responses.add(responses.GET, 'https://api.example.com/users/2', json={'error': 'Not found'}, status=404)
    responses.add(responses.GET, 'https://api.example.com/users/3', body='Timeout', status=408)

    client = create_client('https://api.example.com')

    requests = [
        HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
        for i in range(1, 4)
    ]

    results = list(client.request_many(requests, max_workers=3, fail_fast=False))

    successful = sum(1 for _, r in results if isinstance(r, HttpResponse) and r.is_success)
    failed = len(results) - successful

    assert successful == 1
    assert failed == 2

Best Practices

  1. Choose Appropriate Worker Count

    # Good ✅ - Moderate concurrency
    results = client.request_many(requests, max_workers=10)
    
    # Avoid ❌ - Too many workers may overwhelm server
    results = client.request_many(requests, max_workers=100)
    
  2. Handle Errors Gracefully

    # Good ✅ - Process both success and failure
    for index, result in client.request_many(requests):
        if isinstance(result, HttpResponse):
            process_success(result)
        else:
            handle_error(index, result)
    
  3. Use Fail-Fast for Critical Operations

    # Good ✅ - Stop on first error for critical batch
    try:
        results = client.request_many(requests, fail_fast=True)
    except HttpClientException:
        rollback_changes()
    
  4. Monitor Progress for Long Operations

    # Good ✅ - Show progress to users
    with tqdm(total=len(requests)) as pbar:
        for index, result in client.request_many(requests):
            pbar.update(1)
    
  5. Respect Rate Limits

    # Good ✅ - Limit concurrent requests
    results = client.request_many(requests, max_workers=5)
    
    # Or use rate limiting wrapper
    rate_limited_client = RateLimitedClient(client, requests_per_second=10)
    

Next Steps