Home Knowledge Base Multi-provider failover

Multi-provider failover implements redundancy across multiple LLM providers to ensure availability and reliability — automatically detecting failures, switching between OpenAI, Anthropic, and other providers, and routing requests based on health checks, latency, and cost, critical for production systems that can't tolerate downtime.

Why Multi-Provider Matters

Failover Patterns

Simple Fallback Chain:

async def generate_with_fallback(prompt: str) -> str:
    providers = [
        ("openai", "gpt-4o"),
        ("anthropic", "claude-3-5-sonnet"),
        ("together", "llama-3.1-70b"),
    ]
    
    for provider, model in providers:
        try:
            return await call_provider(provider, model, prompt)
        except Exception as e:
            logger.warning(f"{provider}/{model} failed: {e}")
            continue
    
    raise AllProvidersFailedError("No providers available")

Health-Check Based Routing:

class ProviderPool:
    def __init__(self, providers):
        self.providers = providers
        self.health_status = {p: True for p in providers}
    
    async def check_health(self):
        """Periodic health check."""
        for provider in self.providers:
            try:
                await provider.health_check()
                self.health_status[provider] = True
            except:
                self.health_status[provider] = False
    
    def get_healthy_provider(self):
        """Return first healthy provider."""
        for provider in self.providers:
            if self.health_status[provider]:
                return provider
        return None

Circuit Breaker Pattern:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failures = 0
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = None
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
    
    async def call(self, func):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError()
        
        try:
            result = await func()
            if self.state == "half-open":
                self.state = "closed"
            self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            if self.failures >= self.failure_threshold:
                self.state = "open"
            raise

Provider Abstraction

from abc import ABC, abstractmethod

class LLMProvider(ABC):
    @abstractmethod
    async def generate(self, messages: list, **kwargs) -> str:
        pass
    
    @abstractmethod
    async def health_check(self) -> bool:
        pass

class OpenAIProvider(LLMProvider):
    async def generate(self, messages, **kwargs):
        response = await self.client.chat.completions.create(
            model=kwargs.get("model", "gpt-4o"),
            messages=messages
        )
        return response.choices[0].message.content
    
    async def health_check(self):
        try:
            await self.generate([{"role": "user", "content": "hi"}])
            return True
        except:
            return False

class AnthropicProvider(LLMProvider):
    async def generate(self, messages, **kwargs):
        response = await self.client.messages.create(
            model=kwargs.get("model", "claude-3-5-sonnet"),
            messages=messages,
            max_tokens=1024
        )
        return response.content[0].text

Smart Routing

Cost-Based Routing:

COSTS = {
    "gpt-4o": 0.01,           # $/1K tokens
    "gpt-4o-mini": 0.00015,
    "claude-3-5-sonnet": 0.003,
    "llama-3.1-70b": 0.001,
}

def route_by_cost(task_complexity: str) -> str:
    if task_complexity == "simple":
        return "gpt-4o-mini"  # Cheapest capable
    elif task_complexity == "complex":
        return "gpt-4o"       # Best quality
    else:
        return "claude-3-5-sonnet"  # Balance

Latency-Based Routing:

async def route_by_latency(providers, prompt):
    """Route to fastest responding provider."""
    
    async def try_provider(provider):
        start = time.time()
        try:
            result = await asyncio.wait_for(
                provider.generate(prompt),
                timeout=5.0
            )
            return (provider, result, time.time() - start)
        except:
            return (provider, None, float('inf'))
    
    # Race providers (first good response wins)
    tasks = [try_provider(p) for p in providers]
    results = await asyncio.gather(*tasks)
    
    fastest = min(results, key=lambda x: x[2])
    if fastest[1] is not None:
        return fastest[1]
    raise AllProvidersFailedError()

Implementation Checklist

□ Abstract provider interface
□ Health check endpoints
□ Circuit breakers per provider
□ Fallback chain configured
□ Monitoring per provider
□ Alert on primary failure
□ Cost tracking per provider
□ Latency tracking per provider
□ Regular failover testing

Multi-provider failover is essential for production AI reliability — the most capable model means nothing if it's unavailable, so robust fallback mechanisms transform fragile AI features into dependable product capabilities.

multi providerfailoverredundancycircuit breakerfallbackhigh availabilityreliability

Explore 500+ Semiconductor & AI Topics

From EUV lithography to CUDA optimization — search the full knowledge base or chat with our AI assistant.