Router - Load Balancing, Fallbacks
LiteLLM manages:
- Load-balance across multiple deployments (e.g. Azure/OpenAI)
 - Prioritizing important requests to ensure they don't fail (i.e. Queueing)
 - Basic reliability logic - cooldowns, fallbacks, timeouts and retries (fixed + exponential backoff) across multiple deployments/providers.
 
In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).
If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server
Load Balancing​
(s/o @paulpierre and sweep proxy for their contributions to this implementation) See Code
Quick Start​
from litellm import Router
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
}]
router = Router(model_list=model_list)
# openai.ChatCompletion.create replacement
response = await router.acompletion(model="gpt-3.5-turbo", 
                messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
Available Endpoints​
router.completion()- chat completions endpoint to call 100+ LLMsrouter.acompletion()- async chat completion callsrouter.embeddings()- embedding endpoint for Azure, OpenAI, Huggingface endpointsrouter.aembeddings()- async embeddings callsrouter.text_completion()- completion calls in the old OpenAI/v1/completionsendpoint formatrouter.atext_completion()- async text completion callsrouter.image_generation()- completion calls in OpenAI/v1/images/generationsendpoint formatrouter.aimage_generation()- async image generation calls
Advanced​
Routing Strategies - Weighted Pick, Rate Limit Aware, Least Busy, Latency Based​
Router provides 4 strategies for routing your calls across multiple deployments:
- Latency-Based
 - (Default) Weighted Pick
 - Rate-Limit Aware
 - Least-Busy
 
Picks the deployment with the lowest response time.
It caches, and updates the response times for deployments based on when a request was sent and received from a deployment.
from litellm import Router 
import asyncio
model_list = [{ ... }]
# init router
router = Router(model_list=model_list, routing_strategy="latency-based-routing") # 👈 set routing strategy
## CALL 1+2
tasks = []
response = None
final_response = None
for _ in range(2):
    tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)
if response is not None:
    ## CALL 3 
    await asyncio.sleep(1)  # let the cache update happen
    picked_deployment = router.lowestlatency_logger.get_available_deployments(
        model_group=model, healthy_deployments=router.healthy_deployments
    )
    final_response = await router.acompletion(model=model, messages=messages)
    print(f"min deployment id: {picked_deployment}")
    print(f"model id: {final_response._hidden_params['model_id']}")
    assert (
        final_response._hidden_params["model_id"]
        == picked_deployment["model_info"]["id"]
    )
Set Time Window​
Set time window for how far back to consider when averaging latency for a deployment.
In Router
router = Router(..., routing_strategy_args={"ttl": 10})
In Proxy
router_settings:
    routing_strategy_args: {"ttl": 10}
Default Picks a deployment based on the provided Requests per minute (rpm) or Tokens per minute (tpm)
If rpm or tpm is not provided, it randomly picks a deployment
from litellm import Router 
import asyncio
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE"),
        "rpm": 900,         # requests per minute for this API
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE"),
        "rpm": 10,
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
        "rpm": 10,
    }
}]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
    response = await router.acompletion(
        model="gpt-3.5-turbo", 
        messages=[{"role": "user", "content": "Hey, how's it going?"}]
    )
    print(response)
    return response
asyncio.run(router_acompletion())
This will route to the deployment with the lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments.
If you pass in the deployment's tpm/rpm limits, this will also check against that, and filter out any who's limits would be exceeded.
For Azure, your RPM = TPM/6.
from litellm import Router 
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }, 
    "tpm": 100000,
    "rpm": 10000,
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    },
    "tpm": 100000,
    "rpm": 1000,
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
    },
    "tpm": 100000,
    "rpm": 1000,
}]
router = Router(model_list=model_list, 
                redis_host=os.environ["REDIS_HOST"], 
                redis_password=os.environ["REDIS_PASSWORD"], 
                redis_port=os.environ["REDIS_PORT"], 
                routing_strategy="usage-based-routing")
response = await router.acompletion(model="gpt-3.5-turbo", 
                messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
Picks a deployment with the least number of ongoing calls, it's handling.
from litellm import Router 
import asyncio
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE"),
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE"),
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
}]
# init router
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
    response = await router.acompletion(
        model="gpt-3.5-turbo", 
        messages=[{"role": "user", "content": "Hey, how's it going?"}]
    )
    print(response)
    return response
asyncio.run(router_acompletion())
Basic Reliability​
Timeouts​
The timeout set in router is for the entire length of the call, and is passed down to the completion() call level as well.
from litellm import Router 
model_list = [{...}]
router = Router(model_list=model_list, 
                timeout=30) # raise timeout error if call takes > 30s 
print(response)
Cooldowns​
Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list, 
                allowed_fails=1) # cooldown model if it fails > 1 call in a minute. 
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Retries​
For both async + sync functions, we support retrying failed requests.
For RateLimitError we implement exponential backoffs
For generic errors, we retry immediately
Here's a quick look at how we can set num_retries = 3: 
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,  
                num_retries=3)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
We also support setting minimum time to wait before retrying a failed request. This is via the retry_after param. 
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,  
                num_retries=3, retry_after=5) # waits min 5s before retrying request
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Fallbacks​
If a call fails after num_retries, fall back to another model group.
If the error is a context window exceeded error, fall back to a larger model group (if given).
from litellm import Router
model_list = [
    { # list of model deployments 
        "model_name": "azure/gpt-3.5-turbo", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "azure/chatgpt-v-2", 
            "api_key": "bad-key",
            "api_version": os.getenv("AZURE_API_VERSION"),
            "api_base": os.getenv("AZURE_API_BASE")
        },
        "tpm": 240000,
        "rpm": 1800
    }, 
    { # list of model deployments 
        "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "azure/chatgpt-v-2", 
            "api_key": "bad-key",
            "api_version": os.getenv("AZURE_API_VERSION"),
            "api_base": os.getenv("AZURE_API_BASE")
        },
        "tpm": 240000,
        "rpm": 1800
    }, 
    {
        "model_name": "azure/gpt-3.5-turbo", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "azure/chatgpt-functioncalling", 
            "api_key": "bad-key",
            "api_version": os.getenv("AZURE_API_VERSION"),
            "api_base": os.getenv("AZURE_API_BASE")
        },
        "tpm": 240000,
        "rpm": 1800
    }, 
    {
        "model_name": "gpt-3.5-turbo", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "gpt-3.5-turbo", 
            "api_key": os.getenv("OPENAI_API_KEY"),
        },
        "tpm": 1000000,
        "rpm": 9000
    },
    {
        "model_name": "gpt-3.5-turbo-16k", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "gpt-3.5-turbo-16k", 
            "api_key": os.getenv("OPENAI_API_KEY"),
        },
        "tpm": 1000000,
        "rpm": 9000
    }
]
router = Router(model_list=model_list, 
                fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], 
                context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
                set_verbose=True)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal fallback call 
response = router.completion(model="azure/gpt-3.5-turbo", messages=messages)
# context window fallback call
response = router.completion(model="azure/gpt-3.5-turbo-context-fallback", messages=messages)
print(f"response: {response}")
Caching​
In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.
In-memory Cache
router = Router(model_list=model_list, 
                cache_responses=True)
print(response)
Redis Cache
router = Router(model_list=model_list, 
                redis_host=os.getenv("REDIS_HOST"), 
                redis_password=os.getenv("REDIS_PASSWORD"), 
                redis_port=os.getenv("REDIS_PORT"),
                cache_responses=True)
print(response)
Pass in Redis URL, additional kwargs
router = Router(model_list: Optional[list] = None,
                 ## CACHING ## 
                 redis_url=os.getenv("REDIS_URL")",
                 cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
                 cache_responses=True)
Caching across model groups​
If you want to cache across 2 different model groups (e.g. azure deployments, and openai), use caching groups.
import litellm, asyncio, time
from litellm import Router 
# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""
async def test_acompletion_caching_on_router_caching_groups(): 
    # tests acompletion + caching on router 
    try:
        litellm.set_verbose = True
        model_list = [
            {
                "model_name": "openai-gpt-3.5-turbo",
                "litellm_params": {
                    "model": "gpt-3.5-turbo-0613",
                    "api_key": os.getenv("OPENAI_API_KEY"),
                },
            },
            {
                "model_name": "azure-gpt-3.5-turbo",
                "litellm_params": {
                    "model": "azure/chatgpt-v-2",
                    "api_key": os.getenv("AZURE_API_KEY"),
                    "api_base": os.getenv("AZURE_API_BASE"),
                    "api_version": os.getenv("AZURE_API_VERSION")
                },
            }
        ]
        messages = [
            {"role": "user", "content": f"write a one sentence poem {time.time()}?"}
        ]
        start_time = time.time()
        router = Router(model_list=model_list, 
                cache_responses=True, 
                caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
        response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
        print(f"response1: {response1}")
        await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
        response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
        assert response1.id == response2.id
        assert len(response1.choices[0].message.content) > 0
        assert response1.choices[0].message.content == response2.choices[0].message.content
    except Exception as e:
        traceback.print_exc()
asyncio.run(test_acompletion_caching_on_router_caching_groups())
Default litellm.completion/embedding params​
You can also set default params for litellm completion/embedding calls. Here's how to do that:
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list, 
                default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Deploy Router​
If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server
Init Params for the litellm.Router​
def __init__(
    model_list: Optional[list] = None,
    
    ## CACHING ##
    redis_url: Optional[str] = None,
    redis_host: Optional[str] = None,
    redis_port: Optional[int] = None,
    redis_password: Optional[str] = None,
    cache_responses: Optional[bool] = False,
    cache_kwargs: dict = {},  # additional kwargs to pass to RedisCache (see caching.py)
    caching_groups: Optional[
        List[tuple]
    ] = None,  # if you want to cache across model groups
    client_ttl: int = 3600,  # ttl for cached clients - will re-initialize after this time in seconds
    ## RELIABILITY ##
    num_retries: int = 0,
    timeout: Optional[float] = None,
    default_litellm_params={},  # default params for Router.chat.completion.create
    set_verbose: bool = False,
    fallbacks: List = [],
    allowed_fails: Optional[int] = None,
    context_window_fallbacks: List = [],
    model_group_alias: Optional[dict] = {},
    retry_after: int = 0,  # min time to wait before retrying a failed request
    routing_strategy: Literal[
        "simple-shuffle",
        "least-busy",
        "usage-based-routing",
        "latency-based-routing",
    ] = "simple-shuffle",
):