Concurrent Requests
HTTP Client provides built-in support for executing multiple requests concurrently using thread pools.
Overview
The request_many() method allows you to execute multiple HTTP requests in parallel, improving throughput for batch operations.
Benefits:
Execute multiple requests simultaneously
Configurable number of worker threads
Results returned in original request order
Optional fail-fast mode
Thread-safe execution
Basic Usage
Simple Concurrent Requests
from requestforge import HttpClient, HttpRequest, HttpMethod
client = create_client('https://api.example.com')
# Create multiple requests
requests = [
HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
for i in range(1, 11)
]
# Execute concurrently with 5 workers
results = client.request_many(requests, max_workers=5)
# Process results
for index, result in results:
if isinstance(result, HttpResponse):
print(f"Request {index}: Success - {result.status_code}")
user = result.json()
print(f" User: {user['name']}")
else:
print(f"Request {index}: Failed - {result}")
Fail-Fast Mode
Stop execution on first error:
from requestforge import HttpClientException
try:
results = client.request_many(
requests,
max_workers=10,
fail_fast=True # Stop on first error
)
# All requests succeeded
for index, response in results:
process_response(response)
except HttpClientException as e:
print(f"Request failed: {e}")
# Processing stopped at first error
Configuration
Worker Threads
Control the number of concurrent requests:
# Conservative: 3 concurrent requests
results = client.request_many(requests, max_workers=3)
# Moderate: 10 concurrent requests (default: 5)
results = client.request_many(requests, max_workers=10)
# Aggressive: 20 concurrent requests
results = client.request_many(requests, max_workers=20)
Choosing max_workers:
Too few: Slower execution, underutilized resources
Too many: May overwhelm server, hit rate limits
Rule of thumb: Start with 5-10, adjust based on API rate limits
Request Order
Results are yielded as they complete, but include the original index:
requests = [
HttpRequest(method=HttpMethod.GET, url='/slow-endpoint'), # 0 - Takes 5s
HttpRequest(method=HttpMethod.GET, url='/fast-endpoint'), # 1 - Takes 1s
HttpRequest(method=HttpMethod.GET, url='/medium-endpoint'), # 2 - Takes 3s
]
for index, result in client.request_many(requests):
print(f"Completed request {index}")
# Output might be:
# Completed request 1 (fast endpoint finished first)
# Completed request 2 (medium endpoint finished second)
# Completed request 0 (slow endpoint finished last)
Sorting Results
Collect and sort by original index:
# Collect all results
results = list(client.request_many(requests, max_workers=10))
# Sort by index
results.sort(key=lambda x: x[0])
# Process in order
for index, response in results:
print(f"Processing request {index}")
Common Patterns
Batch API Calls
Fetch multiple resources:
from requestforge import HttpRequest, HttpMethod, HttpClientException
def fetch_users_batch(user_ids):
"""Fetch multiple users concurrently."""
requests = [
HttpRequest(method=HttpMethod.GET, url=f'/users/{user_id}')
for user_id in user_ids
]
users = []
errors = []
for index, result in client.request_many(requests, max_workers=10):
if isinstance(result, HttpClientException):
errors.append({
'user_id': user_ids[index],
'error': str(result)
})
else:
users.append(result.json())
return users, errors
# Usage
users, errors = fetch_users_batch([1, 2, 3, 4, 5])
print(f"Fetched {len(users)} users, {len(errors)} errors")
Parallel POST Requests
Create multiple resources:
def create_users_batch(users_data):
"""Create multiple users concurrently."""
requests = [
HttpRequest(
method=HttpMethod.POST,
url='/users',
json_data=user_data
)
for user_data in users_data
]
created_users = []
failures = []
for index, result in client.request_many(requests, max_workers=5):
if isinstance(result, HttpResponse) and result.is_success:
created_users.append(result.json())
else:
failures.append({
'data': users_data[index],
'error': result if isinstance(result, Exception) else f"HTTP {result.status_code}"
})
return created_users, failures
Mixed Request Types
Different HTTP methods in one batch:
requests = [
# GET requests
HttpRequest(method=HttpMethod.GET, url='/users/1'),
HttpRequest(method=HttpMethod.GET, url='/users/2'),
# POST requests
HttpRequest(
method=HttpMethod.POST,
url='/users',
json_data={'name': 'John'}
),
# PUT requests
HttpRequest(
method=HttpMethod.PUT,
url='/users/3',
json_data={'name': 'Jane Updated'}
),
# DELETE requests
HttpRequest(method=HttpMethod.DELETE, url='/users/4'),
]
results = client.request_many(requests, max_workers=10)
Pagination with Concurrency
Fetch multiple pages in parallel:
def fetch_all_pages(base_url, total_pages):
"""Fetch all pages concurrently."""
requests = [
HttpRequest(
method=HttpMethod.GET,
url=base_url,
params={'page': page, 'limit': 100}
)
for page in range(1, total_pages + 1)
]
all_items = []
for index, response in client.request_many(requests, max_workers=10):
if isinstance(response, HttpResponse) and response.is_success:
data = response.json()
all_items.extend(data.get('items', []))
return all_items
# First, get total count
response = client.get('/items', params={'page': 1, 'limit': 100})
total = response.json()['total']
total_pages = (total + 99) // 100 # Ceiling division
# Fetch all pages
items = fetch_all_pages('/items', total_pages)
Error Handling
Collecting Errors
from requestforge import HttpClientException, HttpResponse
successful_responses = []
failed_requests = []
for index, result in client.request_many(requests, max_workers=10):
if isinstance(result, HttpResponse):
if result.is_success:
successful_responses.append((index, result))
else:
failed_requests.append({
'index': index,
'status_code': result.status_code,
'error': result.text
})
elif isinstance(result, HttpClientException):
failed_requests.append({
'index': index,
'error': str(result),
'exception_type': type(result).__name__
})
print(f"Success: {len(successful_responses)}, Failed: {len(failed_requests)}")
Partial Retry
Retry only failed requests:
from requestforge import HttpClientException
def fetch_with_retry(requests, max_retries=2):
"""Fetch requests with retry for failures."""
remaining_requests = list(enumerate(requests))
all_results = {}
for attempt in range(max_retries + 1):
if not remaining_requests:
break
print(f"Attempt {attempt + 1}: {len(remaining_requests)} requests")
# Execute current batch
batch = [req for _, req in remaining_requests]
results = client.request_many(batch, max_workers=10, fail_fast=False)
# Process results
failed = []
for result_index, result in results:
original_index = remaining_requests[result_index][0]
if isinstance(result, HttpResponse) and result.is_success:
all_results[original_index] = result
else:
if attempt < max_retries:
failed.append(remaining_requests[result_index])
remaining_requests = failed
return all_results
Error Statistics
Track error types:
from collections import Counter
from requestforge import (
TimeoutException,
ConnectionException,
HttpStatusException
)
error_types = Counter()
status_codes = Counter()
for index, result in client.request_many(requests, max_workers=10):
if isinstance(result, TimeoutException):
error_types['timeout'] += 1
elif isinstance(result, ConnectionException):
error_types['connection'] += 1
elif isinstance(result, HttpStatusException):
error_types['http_error'] += 1
status_codes[result.status_code] += 1
print(f"Error types: {dict(error_types)}")
print(f"Status codes: {dict(status_codes)}")
Performance Optimization
Connection Pooling
Configure pool size for concurrent requests:
config = (
HttpClientConfigBuilder()
.with_base_url('https://api.example.com')
.with_pool_connection(20) # Pool connections
.with_pool_maxsize(50) # Max pool size
.build()
)
client = HttpClient(config)
# Now concurrent requests reuse connections
results = client.request_many(requests, max_workers=20)
Batching Requests
Process requests in batches to control memory:
def process_in_batches(all_requests, batch_size=100, max_workers=10):
"""Process requests in batches."""
all_results = []
for i in range(0, len(all_requests), batch_size):
batch = all_requests[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1}")
batch_results = list(
client.request_many(batch, max_workers=max_workers)
)
all_results.extend(batch_results)
return all_results
# Process 1000 requests in batches of 100
results = process_in_batches(requests, batch_size=100)
Rate Limiting
Respect API rate limits:
import time
from threading import Semaphore
class RateLimitedClient:
def __init__(self, client, requests_per_second=10):
self.client = client
self.min_interval = 1.0 / requests_per_second
self.last_request_time = 0
self.semaphore = Semaphore(1)
def request(self, http_request):
with self.semaphore:
# Wait if needed
now = time.time()
time_since_last = now - self.last_request_time
if time_since_last < self.min_interval:
time.sleep(self.min_interval - time_since_last)
result = self.client.request(http_request)
self.last_request_time = time.time()
return result
# Usage
rate_limited = RateLimitedClient(client, requests_per_second=10)
Progress Tracking
Progress Bar
Using tqdm for progress:
from tqdm import tqdm
requests = [
HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
for i in range(1, 101)
]
results = []
with tqdm(total=len(requests), desc="Fetching users") as pbar:
for index, result in client.request_many(requests, max_workers=10):
results.append((index, result))
pbar.update(1)
Custom Progress
def fetch_with_progress(requests, max_workers=10):
"""Fetch with custom progress reporting."""
total = len(requests)
completed = 0
successful = 0
failed = 0
print(f"Starting {total} requests with {max_workers} workers...")
for index, result in client.request_many(requests, max_workers=max_workers):
completed += 1
if isinstance(result, HttpResponse) and result.is_success:
successful += 1
else:
failed += 1
# Print progress every 10 requests
if completed % 10 == 0:
print(f"Progress: {completed}/{total} "
f"(✓ {successful}, ✗ {failed})")
print(f"Completed: {successful} successful, {failed} failed")
Advanced Patterns
Request Prioritization
Submit high-priority requests first:
from dataclasses import dataclass
from typing import Any
@dataclass
class PrioritizedRequest:
priority: int
request: HttpRequest
metadata: Any = None
def execute_prioritized(prioritized_requests, max_workers=10):
"""Execute requests in priority order."""
# Sort by priority (lower number = higher priority)
sorted_requests = sorted(
prioritized_requests,
key=lambda x: x.priority
)
requests = [pr.request for pr in sorted_requests]
results = {}
for index, result in client.request_many(requests, max_workers=max_workers):
original_request = sorted_requests[index]
results[original_request.metadata] = result
return results
# Usage
prioritized = [
PrioritizedRequest(priority=1, request=HttpRequest(...), metadata='critical'),
PrioritizedRequest(priority=5, request=HttpRequest(...), metadata='low'),
PrioritizedRequest(priority=3, request=HttpRequest(...), metadata='medium'),
]
Request Deduplication
Avoid duplicate requests:
def deduplicate_requests(requests):
"""Remove duplicate requests based on URL."""
seen = set()
unique_requests = []
index_map = {}
for i, request in enumerate(requests):
key = f"{request.method.value}:{request.url}"
if key not in seen:
seen.add(key)
index_map[len(unique_requests)] = i
unique_requests.append(request)
return unique_requests, index_map
# Deduplicate
unique_requests, index_map = deduplicate_requests(requests)
# Execute only unique requests
results = list(client.request_many(unique_requests, max_workers=10))
# Map back to original indices
full_results = [None] * len(requests)
for unique_index, (_, result) in enumerate(results):
original_index = index_map[unique_index]
full_results[original_index] = result
Dynamic Worker Adjustment
Adjust workers based on response times:
import statistics
def adaptive_fetch(requests, initial_workers=5):
"""Adjust worker count based on performance."""
batch_size = 50
max_workers = initial_workers
for i in range(0, len(requests), batch_size):
batch = requests[i:i + batch_size]
# Measure performance
import time
start = time.time()
results = list(client.request_many(batch, max_workers=max_workers))
duration = time.time() - start
# Calculate average response time
response_times = [
r.elapsed_ms for _, r in results
if isinstance(r, HttpResponse)
]
if response_times:
avg_time = statistics.mean(response_times)
# Adjust workers (simple heuristic)
if avg_time < 100: # Fast responses
max_workers = min(max_workers + 2, 20)
elif avg_time > 500: # Slow responses
max_workers = max(max_workers - 2, 3)
print(f"Avg response: {avg_time:.0f}ms, Workers: {max_workers}")
Testing
Mock Concurrent Requests
import pytest
from unittest.mock import Mock, patch
import responses
@responses.activate
def test_concurrent_requests():
# Mock multiple endpoints
for i in range(1, 11):
responses.add(
responses.GET,
f'https://api.example.com/users/{i}',
json={'id': i, 'name': f'User {i}'},
status=200
)
client = create_client('https://api.example.com')
requests = [
HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
for i in range(1, 11)
]
results = list(client.request_many(requests, max_workers=5))
assert len(results) == 10
for index, response in results:
assert isinstance(response, HttpResponse)
assert response.is_success
Test Error Scenarios
@responses.activate
def test_concurrent_with_errors():
# Some succeed, some fail
responses.add(responses.GET, 'https://api.example.com/users/1', json={'id': 1}, status=200)
responses.add(responses.GET, 'https://api.example.com/users/2', json={'error': 'Not found'}, status=404)
responses.add(responses.GET, 'https://api.example.com/users/3', body='Timeout', status=408)
client = create_client('https://api.example.com')
requests = [
HttpRequest(method=HttpMethod.GET, url=f'/users/{i}')
for i in range(1, 4)
]
results = list(client.request_many(requests, max_workers=3, fail_fast=False))
successful = sum(1 for _, r in results if isinstance(r, HttpResponse) and r.is_success)
failed = len(results) - successful
assert successful == 1
assert failed == 2
Best Practices
Choose Appropriate Worker Count
# Good ✅ - Moderate concurrency results = client.request_many(requests, max_workers=10) # Avoid ❌ - Too many workers may overwhelm server results = client.request_many(requests, max_workers=100)
Handle Errors Gracefully
# Good ✅ - Process both success and failure for index, result in client.request_many(requests): if isinstance(result, HttpResponse): process_success(result) else: handle_error(index, result)
Use Fail-Fast for Critical Operations
# Good ✅ - Stop on first error for critical batch try: results = client.request_many(requests, fail_fast=True) except HttpClientException: rollback_changes()
Monitor Progress for Long Operations
# Good ✅ - Show progress to users with tqdm(total=len(requests)) as pbar: for index, result in client.request_many(requests): pbar.update(1)
Respect Rate Limits
# Good ✅ - Limit concurrent requests results = client.request_many(requests, max_workers=5) # Or use rate limiting wrapper rate_limited_client = RateLimitedClient(client, requests_per_second=10)
Next Steps
Learn about Hooks for concurrent request logging
Explore Error Handling for managing partial failures
Check Django Integration Examples for async views