Pipelines API Reference

This page documents the multi-step token fetch pipeline system.

TokenFetchPipeline

class TokenFetchPipeline(steps, storage, cache_key_prefix='', validate_dependencies=True)

Pipeline for multi-step token fetching with caching.

Parameters:
  • steps (list) – List of TokenFetcherInterface instances

  • storage (TokenStorageInterface) – Token storage for caching

  • cache_key_prefix (str) – Prefix for all cache keys

  • validate_dependencies (bool) – Validate dependencies on initialization

Features:

  • Per-step caching with configurable TTL

  • Automatic dependency resolution

  • Selective cache invalidation

  • Cascading invalidation (invalidating step clears dependents)

  • Thread-safe execution

Example:

from requestforge import TokenFetchPipeline
from requestforge.fetcher import BodyTokenFetcher
from requestforge.token_manager import InMemoryTokenStorage
from datetime import timedelta

# Step 1: App token
app_token = BodyTokenFetcher(
    name='app_token',
    base_url='https://auth.example.com',
    endpoint='/v1/app/token',
    method='POST',
    request_data={'client_id': 'app-id', 'client_secret': 'app-secret'},
    token_field='access_token',
    ttl=timedelta(hours=1)
)

# Step 2: User token (depends on app_token)
user_token = BodyTokenFetcher(
    name='user_token',
    base_url='https://auth.example.com',
    endpoint='/v1/user/token',
    method='POST',
    request_data={'username': 'user', 'password': 'pass'},
    token_field='access_token',
    ttl=timedelta(minutes=30),
    depends_on=['app_token']
)

# Create pipeline
pipeline = TokenFetchPipeline(
    steps=[app_token, user_token],
    storage=InMemoryTokenStorage(),
    cache_key_prefix='myapp'
)

Methods

TokenFetchPipeline.execute(force_refresh=False)

Execute pipeline and return final token.

Parameters:

force_refresh (bool) – If True, ignores cache and fetches all steps

Returns:

TokenData from the last step

Return type:

TokenData

Raises:

Behavior:

  1. Iterate through steps in order

  2. For each step: a. Check cache (unless force_refresh) b. If cached and valid, use cached token c. If not cached, verify dependencies available d. Fetch new token e. Cache token with step’s TTL f. Add to context for dependent steps

  3. Return token from last step

Example:

# Execute with cache
token = pipeline.execute()

# Force refresh all steps
token = pipeline.execute(force_refresh=True)
TokenFetchPipeline.invalidate_step(step_name)

Invalidate cached token for a specific step.

Parameters:

step_name (str) – Name of step to invalidate

Also invalidates all dependent steps (cascading invalidation).

Example:

# Invalidate app_token (also invalidates user_token if it depends on it)
pipeline.invalidate_step('app_token')
TokenFetchPipeline.invalidate_all()

Invalidate all cached tokens.

Example:

pipeline.invalidate_all()
TokenFetchPipeline.get_step(name)

Get a step by name.

Parameters:

name (str) – Step name

Returns:

Step instance or None

Return type:

TokenFetcherInterface | None

Example:

app_step = pipeline.get_step('app_token')
print(app_step.ttl)

Properties

TokenFetchPipeline.step_names: list[str]

Get list of step names in execution order.

Example:

print(pipeline.step_names)  # ['app_token', 'user_token']

PipelineTokenProvider

class PipelineTokenProvider(pipeline, service_name, refresh_from_step=None)

Token provider that wraps a TokenFetchPipeline.

Implements TokenProviderInterface for use with TokenManager.

Parameters:
  • pipeline (TokenFetchPipeline) – The token fetch pipeline

  • service_name (str) – Service name for this provider

  • refresh_from_step (str) – Step to invalidate on refresh (optional)

Example:

from requestforge import PipelineTokenProvider, TokenManager

pipeline = TokenFetchPipeline(...)
provider = PipelineTokenProvider(
    pipeline=pipeline,
    service_name='myapp',
    refresh_from_step='user_token'  # Only refresh user_token on refresh
)

token_manager = TokenManager(provider)

Properties

PipelineTokenProvider.service_name: str

Service name for this provider.

PipelineTokenProvider.pipeline: TokenFetchPipeline

Get the underlying pipeline.

Example:

pipeline = provider.pipeline
pipeline.invalidate_step('app_token')

Methods

PipelineTokenProvider.fetch_token()

Fetch token by executing pipeline.

Returns:

Token from last pipeline step

Return type:

TokenData

PipelineTokenProvider.refresh_token(current_token)

Refresh token by re-executing pipeline.

Parameters:

current_token (TokenData) – Current token (ignored)

Returns:

Fresh token from pipeline

Return type:

TokenData

Behavior:

  • If refresh_from_step specified, invalidates that step and dependents

  • Otherwise, invalidates all steps

  • Re-executes pipeline

Example:

new_token = provider.refresh_token(current_token)

Complete Examples

Two-Step Authentication

from requestforge import (
    TokenFetchPipeline,
    PipelineTokenProvider,
    TokenManager,
    HttpClient,
    HttpClientConfigBuilder
)
from requestforge.fetcher import BodyTokenFetcher
from requestforge.token_manager import InMemoryTokenStorage
from datetime import timedelta

# Step 1: Application token
app_token_fetcher = BodyTokenFetcher(
    name='app_token',
    base_url='https://auth.example.com',
    endpoint='/v1/app/token',
    method='POST',
    request_data={
        'grant_type': 'client_credentials',
        'client_id': 'app-id',
        'client_secret': 'app-secret'
    },
    token_field='access_token',
    expires_in_field='expires_in',
    ttl=timedelta(hours=1)
)

# Step 2: User token (uses app token)
class UserTokenFetcher(BodyTokenFetcher):
    def _build_request_headers(self, context):
        headers = super()._build_request_headers(context)
        if context and 'app_token' in context:
            headers['X-App-Token'] = context['app_token'].access_token
        return headers

user_token_fetcher = UserTokenFetcher(
    name='user_token',
    base_url='https://auth.example.com',
    endpoint='/v1/user/token',
    method='POST',
    request_data={
        'username': 'user@example.com',
        'password': 'password123'
    },
    token_field='access_token',
    ttl=timedelta(minutes=30),
    depends_on=['app_token']
)

# Create pipeline
pipeline = TokenFetchPipeline(
    steps=[app_token_fetcher, user_token_fetcher],
    storage=InMemoryTokenStorage(),
    cache_key_prefix='myapp'
)

# Wrap in provider
provider = PipelineTokenProvider(
    pipeline=pipeline,
    service_name='myapp',
    refresh_from_step='user_token'  # Only refresh user token
)

# Use with TokenManager
token_manager = TokenManager(provider)

# Configure HTTP client
config = (
    HttpClientConfigBuilder()
    .with_base_url('https://api.example.com')
    .with_token_auth(token_manager=token_manager)
    .build()
)

client = HttpClient(config)

# Use client (pipeline executes automatically)
response = client.get('/user/profile')

Three-Step Authentication

from requestforge.fetcher import BodyTokenFetcher, HeaderTokenFetcher
from datetime import timedelta

# Step 1: Device token (from header)
device_token = HeaderTokenFetcher(
    name='device_token',
    base_url='https://auth.example.com',
    endpoint='/v1/device/register',
    method='POST',
    request_headers={'X-Device-ID': 'device-123'},
    token_header='X-Device-Token',
    token_type='Device',
    ttl=timedelta(days=30)
)

# Step 2: App token (using device token)
class AppTokenFetcher(BodyTokenFetcher):
    def _build_request_headers(self, context):
        headers = super()._build_request_headers(context)
        if context and 'device_token' in context:
            headers['X-Device-Token'] = context['device_token'].access_token
        return headers

app_token = AppTokenFetcher(
    name='app_token',
    base_url='https://auth.example.com',
    endpoint='/v1/app/token',
    method='POST',
    request_data={'client_id': 'app-id'},
    token_field='access_token',
    ttl=timedelta(hours=1),
    depends_on=['device_token']
)

# Step 3: User token (using app token)
class UserTokenFetcher(BodyTokenFetcher):
    def _build_request_headers(self, context):
        headers = super()._build_request_headers(context)
        if context and 'app_token' in context:
            headers['X-App-Token'] = context['app_token'].access_token
        return headers

user_token = UserTokenFetcher(
    name='user_token',
    base_url='https://auth.example.com',
    endpoint='/v1/user/token',
    method='POST',
    request_data={'username': 'user', 'password': 'pass'},
    token_field='access_token',
    ttl=timedelta(minutes=15),
    depends_on=['app_token']
)

# Create pipeline with all three steps
pipeline = TokenFetchPipeline(
    steps=[device_token, app_token, user_token],
    storage=InMemoryTokenStorage(),
    cache_key_prefix='myapp'
)

# Use pipeline
token = pipeline.execute()
print(f"Final token: {token.access_token}")

Selective Cache Invalidation

# Execute pipeline (all steps cached)
token = pipeline.execute()

# Later, invalidate just user token
pipeline.invalidate_step('user_token')

# Next execution reuses device_token and app_token from cache
# Only fetches new user_token
token = pipeline.execute()

# Invalidate app_token (cascades to user_token)
pipeline.invalidate_step('app_token')

# Next execution reuses device_token from cache
# Fetches new app_token and user_token
token = pipeline.execute()

# Force refresh everything
token = pipeline.execute(force_refresh=True)

With Django Cache

from requestforge import DjangoCacheTokenStorage

pipeline = TokenFetchPipeline(
    steps=[app_token, user_token],
    storage=DjangoCacheTokenStorage(
        cache_alias='default',
        key_prefix='auth_pipeline'
    ),
    cache_key_prefix='myapp'
)

# Tokens now shared across Django instances
provider = PipelineTokenProvider(pipeline, 'myapp')
token_manager = TokenManager(provider)

Error Handling

from requestforge import AuthenticationException

try:
    token = pipeline.execute()
except AuthenticationException as e:
    print(f"Auth failed at step: {e.service_name}")
    print(f"Error: {e.message}")

    # Invalidate failed step
    if e.service_name:
        pipeline.invalidate_step(e.service_name)

Dependency Validation

# This will raise ValueError on initialization
try:
    pipeline = TokenFetchPipeline(
        steps=[
            user_token,   # Depends on 'app_token'
            app_token     # But app_token comes after!
        ],
        storage=InMemoryTokenStorage(),
        validate_dependencies=True  # Default
    )
except ValueError as e:
    print(f"Invalid dependencies: {e}")

# Correct order
pipeline = TokenFetchPipeline(
    steps=[
        app_token,    # No dependencies
        user_token    # Depends on app_token
    ],
    storage=InMemoryTokenStorage()
)

Pipeline Execution Flow

Execute Pipeline:

┌─────────────────────────────────────┐
│  Step 1: device_token               │
│  ├─ Check cache                     │
│  ├─ Cache miss → Fetch              │
│  └─ Cache token (TTL: 30 days)      │
└──────────────┬──────────────────────┘
               ↓
┌─────────────────────────────────────┐
│  Step 2: app_token                  │
│  ├─ Check cache                     │
│  ├─ Cache miss → Fetch              │
│  ├─ Use device_token from context   │
│  └─ Cache token (TTL: 1 hour)       │
└──────────────┬──────────────────────┘
               ↓
┌─────────────────────────────────────┐
│  Step 3: user_token                 │
│  ├─ Check cache                     │
│  ├─ Cache miss → Fetch              │
│  ├─ Use app_token from context      │
│  └─ Cache token (TTL: 15 min)       │
└──────────────┬──────────────────────┘
               ↓
       Return user_token

Next Execution (within 15 min):

┌─────────────────────────────────────┐
│  Step 1: device_token               │
│  └─ Cache hit → Use cached          │
└──────────────┬──────────────────────┘
               ↓
┌─────────────────────────────────────┐
│  Step 2: app_token                  │
│  └─ Cache hit → Use cached          │
└──────────────┬──────────────────────┘
               ↓
┌─────────────────────────────────────┐
│  Step 3: user_token                 │
│  └─ Cache hit → Use cached          │
└──────────────┬──────────────────────┘
               ↓
       Return cached user_token

Best Practices

  1. Order Steps by Dependencies

    # Good ✅
    pipeline = TokenFetchPipeline(
        steps=[step_a, step_b, step_c],  # a → b → c
        storage=storage
    )
    
  2. Set Appropriate TTL

    # Good ✅ - Longer TTL for stable tokens
    device_token = HeaderTokenFetcher(..., ttl=timedelta(days=30))
    app_token = BodyTokenFetcher(..., ttl=timedelta(hours=1))
    user_token = BodyTokenFetcher(..., ttl=timedelta(minutes=15))
    
  3. Use Selective Refresh

    # Good ✅ - Only refresh what's needed
    provider = PipelineTokenProvider(
        pipeline=pipeline,
        service_name='myapp',
        refresh_from_step='user_token'  # Don't refresh device/app
    )
    
  4. Handle Dependencies in Custom Fetchers

    # Good ✅ - Access dependencies from context
    class CustomFetcher(BodyTokenFetcher):
        def _build_request_headers(self, context):
            headers = super()._build_request_headers(context)
            if context and 'previous_step' in context:
                headers['X-Token'] = context['previous_step'].access_token
            return headers
    

See Also