Thread Safety
HTTP Client is designed to be thread-safe, allowing safe concurrent usage in multi-threaded applications like Django, Flask, and FastAPI.
Thread-Safe Components
HttpClient
Thread Safety: ✅ Safe
Mechanism:
Thread-local sessions: Each thread gets its own
requests.SessioninstanceLazy initialization: Sessions created on first use per thread
Session lock:
threading.Lockprotects session creation
Implementation:
class HttpClient:
def __init__(self, config, session=None):
self._session = session # Optional shared session
self._session_lock = threading.Lock()
self._local = threading.local() # Thread-local storage
@property
def session(self) -> requests.Session:
"""Get or create thread-safe session."""
if self._session:
return self._session # Use shared session if provided
# Thread-local session
if not hasattr(self._local, 'session') or self._local.session is None:
with self._session_lock:
self._local.session = self._create_session()
return self._local.session
Thread-Local Sessions:
Thread 1 Thread 2 Thread 3
↓ ↓ ↓
Get session Get session Get session
↓ ↓ ↓
Create session 1 Create session 2 Create session 3
↓ ↓ ↓
Use session 1 Use session 2 Use session 3
↓ ↓ ↓
Connection pool 1 Connection pool 2 Connection pool 3
Usage:
# Safe: Single client shared across threads
client = HttpClient(config)
def worker():
response = client.get('/users') # Each thread gets own session
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
Caveat - Shared Session:
# If you provide a shared session, YOU are responsible for thread-safety
shared_session = requests.Session()
client = HttpClient(config, session=shared_session)
# This may not be thread-safe depending on requests.Session implementation
# Better to let HttpClient manage sessions (don't pass session parameter)
TokenManager
Thread Safety: ✅ Safe
Mechanism:
Refresh lock:
threading.Lockprevents concurrent token fetchesDouble-checked locking: Minimizes lock contention
Implementation:
class TokenManager:
def __init__(self, provider, storage):
self._provider = provider
self._storage = storage
self._lock = threading.RLock()
self._refresh_lock = threading.Lock()
def get_token(self):
# Try cache first (no lock)
token = self._storage.get(self._cache_key)
if token and not token.is_expired:
return token
# Need refresh - acquire lock
with self._refresh_lock:
# Double-check after acquiring lock
token = self._storage.get(self._cache_key)
if token and not token.is_expired:
return token # Another thread refreshed
# Actually fetch new token
new_token = self._provider.fetch_token()
self._storage.set(self._cache_key, new_token)
return new_token
Concurrent Access Pattern:
Thread 1 Thread 2 Thread 3
↓ ↓ ↓
get_token() get_token() get_token()
↓ ↓ ↓
Check cache Check cache Check cache
↓ ↓ ↓
Expired! Expired! Expired!
↓ ↓ ↓
Acquire lock Wait for lock Wait for lock
↓ ↓ ↓
Fetch token Lock acquired Lock acquired
Store token Token in cache! Token in cache!
↓ Return cached Return cached
Release lock
↓
Return token
Benefits:
Only one thread fetches token
Other threads wait for result
No duplicate token requests
Cache hit after first fetch
Usage:
# Safe: Share token manager across threads
token_manager = TokenManager(provider, storage)
def worker():
token = token_manager.get_token() # Thread-safe
threads = [threading.Thread(target=worker) for _ in range(100)]
for t in threads:
t.start()
InMemoryTokenStorage
Thread Safety: ✅ Safe
Mechanism:
RLock:
threading.RLock()protects dictionary accessReentrant: Same thread can acquire lock multiple times
Implementation:
class InMemoryTokenStorage:
def __init__(self):
self._tokens = {}
self._lock = threading.RLock()
def get(self, key):
with self._lock:
return self._tokens.get(key)
def set(self, key, token):
with self._lock:
self._tokens[key] = token
def delete(self, key):
with self._lock:
self._tokens.pop(key, None)
Why RLock?
Allows same thread to acquire lock multiple times
Prevents deadlock in nested calls
Example:
set()called from withinget()in same thread
Usage:
# Safe: Share storage across threads
storage = InMemoryTokenStorage()
def writer():
storage.set('key', token)
def reader():
token = storage.get('key')
threads = [
threading.Thread(target=writer),
threading.Thread(target=reader),
]
for t in threads:
t.start()
DjangoCacheTokenStorage
Thread Safety: ✅ Safe
Mechanism:
Delegates to Django cache backend
Django cache backends are thread-safe
Redis/Memcached backends are process-safe
Implementation:
class DjangoCacheTokenStorage:
def get(self, key):
from django.core.cache import caches
cache = caches[self._cache_alias]
return cache.get(self._make_key(key))
def set(self, key, token):
from django.core.cache import caches
cache = caches[self._cache_alias]
cache.set(self._make_key(key), token, timeout=ttl)
Django Cache Backend Thread-Safety:
Backend |
Thread-Safe |
Process-Safe |
Recommended For |
|---|---|---|---|
LocMemCache |
✅ Yes |
❌ No |
Development only |
RedisCache |
✅ Yes |
✅ Yes |
Production |
MemcachedCache |
✅ Yes |
✅ Yes |
Production |
DatabaseCache |
✅ Yes |
✅ Yes |
Production |
FileBasedCache |
⚠️ Depends |
⚠️ Depends |
Not recommended |
Usage:
# Safe for multi-process Django deployment
storage = DjangoCacheTokenStorage(cache_alias='default')
token_manager = TokenManager(provider, storage)
# Multiple Django workers/processes can share tokens
TokenFetchPipeline
Thread Safety: ✅ Safe (with thread-safe storage)
Mechanism:
Lock-free design
Relies on storage locking
Each step is independent
Implementation:
class TokenFetchPipeline:
def execute(self, force_refresh=False):
context = FetchContext()
for step in self._steps:
# Check cache (storage handles locking)
if not force_refresh:
cached = self._get_cached_token(step)
if cached:
context.add_token(step.name, cached)
continue
# Fetch (storage handles locking)
token = step.fetch(context.tokens)
self._cache_token(step, token)
context.add_token(step.name, token)
return final_token
Thread Safety Analysis:
Reads from storage (thread-safe)
Writes to storage (thread-safe)
FetchContextis local to each execution (no sharing)Multiple threads can execute pipeline concurrently
Concurrent Pipeline Execution:
Thread 1 Thread 2
↓ ↓
execute() execute()
↓ ↓
Check cache (step 1) Check cache (step 1)
↓ ↓
Cache miss Cache hit (Thread 1's fetch)
↓ ↓
Fetch step 1 Use cached
Store step 1
↓ ↓
Fetch step 2 Fetch step 2
Store step 2 Store step 2
Usage:
# Safe: Share pipeline across threads
pipeline = TokenFetchPipeline(steps, storage)
def worker():
token = pipeline.execute()
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
Immutable Components
Immutable objects are inherently thread-safe:
HttpRequest
Thread Safety: ✅ Safe (immutable)
@dataclass(frozen=True)
class HttpRequest:
method: HttpMethod
url: str
headers: dict | None = None
# ... other fields
Safety:
Cannot be modified after creation
Safe to share across threads
Methods return new instances
HttpResponse
Thread Safety: ✅ Safe (immutable)
@dataclass(frozen=True)
class HttpResponse:
status_code: int
headers: dict
content: bytes
# ... other fields
HttpClientConfig
Thread Safety: ✅ Safe (immutable)
@dataclass(frozen=True)
class HttpClientConfig:
base_url: str
default_timeout: float
# ... other fields
TokenData
Thread Safety: ✅ Safe (effectively immutable)
@dataclass
class TokenData:
access_token: str
token_type: str
expires_at: datetime | None
# ... other fields
Note: While not frozen, TokenData is used as immutable in practice.
Mutable Components
RequestContext
Thread Safety: ⚠️ Not thread-safe (by design)
Reason: Mutable, designed for single-thread request lifecycle.
@dataclass
class RequestContext:
request: HttpRequest
attempt: int = 0
max_retries: int = 0
metadata: dict = field(default_factory=dict)
def increment_attempt(self):
self.attempt += 1 # Mutable
Usage:
Created per request
Not shared across threads
Modified during retry loop
Each request gets own context
Why Not Thread-Safe?
# Each request has its own context
def handle_request(request):
context = RequestContext(request) # Local to this call
# Multiple retries in same thread
while context.attempt < context.max_retries:
try:
response = execute(request)
return response
except Exception:
context.increment_attempt() # Safe: single thread
FetchContext
Thread Safety: ⚠️ Not thread-safe (by design)
Reason: Mutable, designed for single pipeline execution.
@dataclass
class FetchContext:
tokens: dict[str, TokenData] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
def add_token(self, name, token):
self.tokens[name] = token # Mutable
Usage:
Created per pipeline execution
Not shared across threads
Modified as steps execute
Common Patterns
Django Integration
Pattern: Application-Wide Client
# settings.py
from requestforge import HttpClient, HttpClientConfigBuilder, TokenManager
# Single client instance
API_CLIENT = HttpClient(
HttpClientConfigBuilder()
.with_base_url('https://api.example.com')
.with_token_auth(token_manager=TOKEN_MANAGER)
.build()
)
# views.py
from django.conf import settings
def my_view(request):
# Safe: Each request uses same client
response = settings.API_CLIENT.get('/users')
# Each request thread gets own session
Why Safe:
Django request threads are independent
Thread-local sessions
Shared token manager with locking
Potential Issues
Blocking in Hooks
⚠️ Caution:
# This blocks all threads using this hook
class SlowHook(RequestHookInterface):
def before_request(self, request, context):
time.sleep(5) # Blocks for 5 seconds!
return request
Impact:
Slows down all requests
Reduces throughput
May cause timeouts
Solution:
Keep hooks fast
Avoid I/O in hooks
Use async operations if needed
Testing Thread Safety
Example Test
import pytest
import threading
from requestforge import HttpClient, TokenManager
def test_concurrent_requests():
"""Test concurrent requests are thread-safe."""
client = HttpClient(config)
results = []
errors = []
def worker(i):
try:
response = client.get(f'/users/{i}')
results.append(response.status_code)
except Exception as e:
errors.append(e)
# 100 concurrent requests
threads = [threading.Thread(target=worker, args=(i,)) for i in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
# All should succeed
assert len(errors) == 0
assert len(results) == 100
def test_concurrent_token_fetch():
"""Test only one thread fetches token."""
fetch_count = 0
lock = threading.Lock()
class CountingProvider(TokenProviderInterface):
def fetch_token(self):
nonlocal fetch_count
with lock:
fetch_count += 1
return TokenData(access_token='token')
provider = CountingProvider()
token_manager = TokenManager(provider)
# 100 threads request token simultaneously
def worker():
token_manager.get_token()
threads = [threading.Thread(target=worker) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
# Only one fetch should occur
assert fetch_count == 1
Best Practices
Share HttpClient Instances
# Good ✅ - Share client client = HttpClient(config) def worker(): response = client.get('/data')
Share TokenManager Instances
# Good ✅ - Share token manager token_manager = TokenManager(provider, storage) config = builder.with_token_auth(token_manager).build()
Use Thread-Safe Storage in Production
# Good ✅ - For multi-process storage = DjangoCacheTokenStorage() # Avoid ❌ - For multi-process storage = InMemoryTokenStorage() # Not shared across processes
Keep Hooks Stateless
# Good ✅ - Stateless hook class StatelessHook(RequestHookInterface): def before_request(self, request, context): # No shared mutable state return request.with_headers({'X-ID': uuid.uuid4()})
Don’t Share RequestContext
# Good ✅ - Each request gets own context def handle_request(request): context = RequestContext(request) # Use context
Test Concurrent Access
# Good ✅ - Test with multiple threads def test_thread_safety(): threads = [threading.Thread(target=worker) for _ in range(100)] # ... test concurrent access
Summary
Thread-Safe ✅:
HttpClient(with thread-local sessions)TokenManager(with locking)InMemoryTokenStorage(with locking)DjangoCacheTokenStorage(delegates to Django)TokenFetchPipeline(with thread-safe storage)All immutable models (HttpRequest, HttpResponse, etc.)
Not Thread-Safe ⚠️ (by design):
RequestContext(per-request, not shared)FetchContext(per-execution, not shared)
Key Takeaways:
Share client and token manager instances safely
Use thread-safe storage (Django cache, Redis)
Keep hooks stateless
Don’t share mutable context objects
Test concurrent access
See Also
Design Principles - Design principles
Components Overview - Components overview
Concurrent Requests - Concurrent requests guide