别再为模型切换头疼了!litellm 一行代码搞定100+大模型调用指南
在AI应用开发中,你是否遇到过这些痛点:项目需要切换到不同的大模型时,代码要重写;调试不同API的认证方式让你抓狂;想对比不同模型的效果,却要维护多套接口?
今天要介绍的这个开源项目——litellm,正是为解决这些难题而生。它用一个统一的接口,让你用完全相同的方式调用OpenAI、Anthropic、Google、Azure、Hugging Face等100多个大模型服务。无论你是要快速原型开发,还是构建生产级AI应用,litellm都能大幅简化你的工作。
为什么值得关注 / 项目价值分析
当前大模型集成的困境
在大模型应用开发中,开发者面临的最大挑战之一就是模型选择和切换。随着AI领域快速发展,新的模型和服务层出不穷:
- OpenAI 的 GPT-4 系列持续更新迭代
- Anthropic 推出 Claude 系列,以长上下文和安全著称
- Google 的 Gemini 开始挑战市场格局
- 开源社区 贡献了 Llama、Mistral、Falcon 等优秀模型
- 各大云厂商 都推出了自己的大模型服务
当你想要切换模型时,传统做法需要重写大量代码,因为每个服务商的API格式、认证方式、错误处理都各不相同。这种紧耦合的设计不仅增加了开发成本,也让后续维护变得困难重重。
litellm 的核心价值
litellm 正是为解决这个痛点而设计的。它的核心理念是:提供一个统一的、接口一致的调用方式,让任何支持的大模型都能像调用本地函数一样简单。
这个项目的亮点非常突出:
第一,统一接口设计。无论你调用的是OpenAI还是Anthropic,代码几乎不需要任何改动。这种设计让你可以轻松地在不同模型之间切换,进行A/B测试,或者根据不同场景选择最适合的模型。
第二,开箱即用的企业级功能。litellm 内置了重试机制、熔断器、负载均衡、请求日志、成本追踪等生产环境必备的功能。你不需要自己实现这些基础设施,可以专注于业务逻辑开发。
第三,灵活的部署方式。litellm 支持直接API调用,也支持本地部署开源模型。这种灵活性让你可以根据成本、安全性、性能等因素灵活选择。
第四,活跃的社区和持续的更新。作为BerriAI维护的开源项目,litellm拥有活跃的贡献者社区,持续跟进各大模型服务商的新功能。这个项目的GitHub Star数已经超过3万,足以说明其受欢迎程度。
典型应用场景
litellm 特别适合以下场景:
- AI应用原型开发:需要快速尝试不同模型,找到最适合项目需求的那个
- 多模型编排:在同一个应用中集成多个模型,处理不同类型的任务
- 成本优化:通过对比不同模型的价格和性能,找到最优解
- 负载均衡和容错:在高并发场景下保证服务稳定性
- 企业内部AI平台:构建统一的大模型调用服务,规范团队的开发方式
环境搭建 / Getting Started
前置要求
在开始使用 litellm 之前,你需要准备以下环境:
Python 版本:litellm 要求 Python 3.8 或更高版本。建议使用 Python 3.10 或更新版本,以获得最佳性能和兼容性。
包管理工具:建议使用 pip 或 conda 来管理 Python 依赖。
API 密钥:你需要准备想要使用的大模型服务的 API 密钥。不同的服务商有不同的获取方式:
- OpenAI:访问 platform.openai.com 注册并获取 API Key
- Anthropic:访问 console.anthropic.com 注册并获取 API Key
- Azure OpenAI:通过 Azure 门户创建资源后获取
- Google Vertex AI:在 Google Cloud Platform 创建项目并配置服务账号
安装 litellm
安装 litellm 非常简单,只需要一条 pip 命令:
pip install litellm
如果你需要使用某些特定功能,可能需要安装额外的依赖包。例如:
# 完整安装,包含所有可选依赖
pip install litellm[all]
# 只安装常用依赖
pip install litellm[proxy]
# 安装流式输出支持
pip install litellm[streaming]
安装完成后,可以通过以下命令验证安装是否成功:
import litellm
print(litellm.__version__)
基础配置
litellm 支持多种配置方式,从简单到复杂,可以根据你的需求选择。
方式一:环境变量配置
这是最简单的方式,只需要设置环境变量即可。创建一个 .env 文件:
# OpenAI 配置
OPENAI_API_KEY=sk-xxxxx
# Anthropic 配置
ANTHROPIC_API_KEY=sk-ant-xxxxx
# Azure OpenAI 配置
AZURE_API_KEY=xxxxx
AZURE_API_BASE=https://xxxxx.openai.azure.com
AZURE_API_VERSION=2024-02-01
# 模型降级配置(当主模型不可用时自动切换)
MODEL_BATCH_WRITE_PRICE=0.00003
MODEL_BATCH_WRITE_MAX_TOKENS=2048
MODEL_BATCH_WRITE_MODEL=gpt-3.5-turbo
然后在代码中加载这些环境变量:
from dotenv import load_dotenv
load_dotenv() # 加载 .env 文件中的环境变量
import litellm
# 现在可以直接使用模型调用
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "你好"}]
)
方式二:配置文件方式
对于更复杂的场景,可以使用 YAML 配置文件:
# litellm_config.yaml
model_list:
- model_name: gpt-4-turbo
litellm_params:
model: gpt-4-turbo
api_key: os.environ/GPT4_API_KEY
temperature: 0.7
max_tokens: 2048
- model_name: claude-3-opus
litellm_params:
model: claude-3-opus-20240229
api_key: os.environ/ANTHROPIC_API_KEY
- model_name: azure-gpt-4
litellm_params:
model: gpt-4
api_base: os.environ/AZURE_API_BASE
api_key: os.environ/AZURE_API_KEY
api_version: "2024-02-01"
在代码中加载这个配置:
import litellm
import yaml
# 读取配置
with open('litellm_config.yaml', 'r') as f:
config = yaml.safe_load(f)
# 设置配置
litellm.set_verbose = True # 开启详细日志
方式三:使用 Proxy Server
litellm 还提供了一个强大的代理服务器功能,特别适合团队协作和生产环境部署:
pip install 'litellm[proxy]'
# 启动代理服务器
litellm --config your_config.yaml
# 或者使用 Docker
docker run \
-e AZURE_API_KEY=your-azure-key \
-e OPENAI_API_KEY=your-openai-key \
-v $(pwd)/config.yaml:/app/config.yaml \
-p 4000:4000 \
ghcr.io/berriai/litellm:main
代理服务器启动后,所有支持的模型都可以通过统一的 API 端点访问:
# 通过代理服务器调用
import openai
client = openai.OpenAI(
api_key="sk-1234", # 代理服务器的访问密钥
base_url="http://localhost:4000"
)
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "你好"}]
)
核心功能详解 / Core Features
统一的 Completion API
litellm 的核心是一个统一的 completion 接口。无论你使用哪个模型服务商,调用方式完全一致:
import litellm
# 调用 OpenAI GPT-4
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "用Python写一个快速排序"}]
)
print(response.choices[0].message.content)
如果想要切换到 Claude,只需要改一个参数:
# 切换到 Anthropic Claude,代码几乎不需要改动
response = litellm.completion(
model="claude-3-opus-20240229",
messages=[{"role": "user", "content": "用Python写一个快速排序"}]
)
print(response.choices[0].message.content)
这个设计让你可以在完全不修改业务逻辑的情况下,测试和切换不同的模型。
支持的模型列表
litellm 支持的模型非常广泛,截至目前已经超过100个。以下是主要分类:
OpenAI 系列
- gpt-4-turbo、gpt-4、gpt-4-32k
- gpt-3.5-turbo、gpt-3.5-turbo-16k
- gpt-4-vision-preview(支持图片理解)
Anthropic 系列
- claude-3-opus、claude-3-sonnet、claude-3-haiku
- claude-2.1、claude-2.0
- claude-instant
Google 系列
- gemini-pro、gemini-pro-vision
- text-bison-002、chat-bison-002
- 通过 Vertex AI 访问的 PaLM2、Gemini 等
开源模型
- Llama 2 系列(meta-llama/Llama-2-70b-chat)
- Mistral 系列(mistral/Mistral-7B-Instruct)
- Vicuna、Alpaca、Falcon 等
Azure OpenAI
- 所有 Azure 托管的 OpenAI 模型
其他服务商
- AWS Bedrock(Claude、Llama、Titan 等)
- Cohere(command-r 系列)
- AI21(Jurassic-2 系列)
- Replicate(各种开源模型)
- Hugging Face(Inference Endpoints)
高级参数配置
litellm 不仅支持基本的调用,还支持丰富的参数配置:
response = litellm.completion(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": "你是一位专业的Python教练"},
{"role": "user", "content": "解释什么是装饰器"}
],
temperature=0.7, # 控制随机性,0-2之间
max_tokens=1000, # 最大生成token数
top_p=0.9, # 核采样参数
frequency_penalty=0.5, # 频率惩罚
presence_penalty=0.3, # 存在惩罚
stop=["END"], # 停止词
response_format={"type": "json_object"}, # JSON输出格式
seed=42, # 随机种子,用于可复现结果
user="user_123" # 用户标识
)
流式输出支持
对于需要实时展示生成内容的应用,流式输出非常有用:
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "给我讲一个关于AI的科幻故事"}],
stream=True
)
# 逐块获取响应
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Embeddings 嵌入向量
litellm 同样支持生成文本嵌入向量,这在 RAG(检索增强生成)等场景中非常有用:
# 使用 OpenAI 的 text-embedding-3-small
response = litellm.embedding(
model="text-embedding-3-small",
input="这是一个用于生成嵌入向量的示例文本"
)
print(response.data[0].embedding)
# 切换到其他服务商只需要改模型名
response = litellm.embedding(
model="cohere/embed-v3-light",
input="这是一个用于生成嵌入向量的示例文本"
)
多模态支持
现代大模型已经支持图像理解,litellm 也提供了完善的图片处理能力:
response = litellm.completion(
model="gpt-4-vision-preview",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "这张图片里有什么?"},
{
"type": "image_url",
"image_url": {
"url": "https://example.com/image.jpg"
}
}
]
}
],
max_tokens=300
)
你也可以使用 base64 编码的图片:
import base64
# 读取本地图片并转为base64
with open("image.jpg", "rb") as f:
image_base64 = base64.b64encode(f.read()).decode()
response = litellm.completion(
model="gpt-4-vision-preview",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "分析这张图片"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
]
)
模型降级与容错机制
生产环境中,某个模型可能会临时不可用。litellm 提供了智能的模型降级功能:
# 定义一个模型组,支持自动降级
litellm.fallbacks = [
{"model": "gpt-4-turbo"},
{"model": "gpt-3.5-turbo"},
{"model": "claude-3-haiku-20240307"}
]
response = litellm.completion(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "复杂的推理任务"}],
fallbacks=litellm.fallbacks
)
当 GPT-4-Turbo 不可用时,系统会自动尝试 GPT-3.5-Turbo,再不行就尝试 Claude Haiku。
成本追踪
litellm 内置了成本追踪功能,让你清楚知道每次调用的花费:
# 启用成本追踪
import litellm
litellm.success_callback = ["liteLLMCallback"]
# 或者创建自定义回调
class CostTracker(litellm.Callback):
def __init__(self):
self.total_cost = 0
self.request_count = 0
def on_successful_response(self, response, kwargs, start_time, end_time):
cost = litellm.completion_cost(completion_response=response)
self.total_cost += cost
self.request_count += 1
print(f"请求 #{self.request_count}: 花费 ${cost:.6f}, 累计 ${self.total_cost:.6f}")
tracker = CostTracker()
litellm.callbacks = [tracker]
# 多次调用后查看总成本
response = litellm.completion(model="gpt-4", messages=[{"role": "user", "content": "Hello"}])
你也可以手动计算成本:
# 创建请求前估算成本
cost = litellm.completion_cost(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
completion_response=None # 传None会估算
)
print(f"预计成本: ${cost:.6f}")
# 或者计算已返回响应的成本
response = litellm.completion(model="gpt-4", messages=[{"role": "user", "content": "Hello"}])
actual_cost = litellm.completion_cost(completion_response=response)
print(f"实际成本: ${actual_cost:.6f}")
速率限制与重试机制
litellm 自动处理 API 的速率限制和临时错误:
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "分析这个代码"}],
max_retries=3, # 最大重试次数
request_timeout=60 # 请求超时时间(秒)
)
你也可以自定义重试策略:
from litellm import RetryConfig
retry_config = RetryConfig(
max_retries=5,
retry_on=["rate_limit_error", "timeout", "api_error"],
backoff_factor=0.5,
jitter=True
)
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "请求内容"}],
retry_config=retry_config
)
实战教程 / Step-by-Step Tutorial
实战一:构建一个智能问答机器人
让我们从头开始,构建一个基于litellm的智能问答系统。
第一步:创建基础项目结构
# qa_bot.py
import litellm
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
class QABot:
"""智能问答机器人类"""
def __init__(self, model="gpt-4"):
self.model = model
self.conversation_history = []
def ask(self, question, system_prompt=None):
"""
提问并获取回答
Args:
question: 用户问题
system_prompt: 系统提示词
Returns:
模型的回答
"""
messages = []
# 添加系统提示
if system_prompt:
messages.append({
"role": "system",
"content": system_prompt
})
else:
messages.append({
"role": "system",
"content": "你是一个有帮助的AI助手,用简洁清晰的语言回答问题。"
})
# 添加对话历史
messages.extend(self.conversation_history)
# 添加当前问题
messages.append({
"role": "user",
"content": question
})
# 调用模型
response = litellm.completion(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=500
)
# 提取回答
answer = response.choices[0].message.content
# 更新对话历史
self.conversation_history.append({
"role": "user",
"content": question
})
self.conversation_history.append({
"role": "assistant",
"content": answer
})
return answer
def clear_history(self):
"""清除对话历史"""
self.conversation_history = []
def switch_model(self, new_model):
"""切换模型"""
self.model = new_model
self.clear_history() # 切换模型时清除历史
# 使用示例
if __name__ == "__main__":
bot = QABot(model="gpt-4")
# 第一次对话
response1 = bot.ask("Python中的列表和元组有什么区别?")
print("Bot回答:", response1)
# 继续对话
response2 = bot.ask("能给我一些代码示例吗?")
print("Bot回答:", response2)
print("\n" + "="*50)
print("切换到Claude模型...")
bot.switch_model("claude-3-opus-20240229")
response3 = bot.ask("同样的问题,用Claude回答一下")
print("Claude回答:", response3)
运行效果
Bot回答: 列表(List)和元组(Tuple)主要有以下区别:
1. 可变性:列表是可变的,可以添加、删除、修改元素;元组是不可变的,创建后不能修改。
2. 语法:列表用方括号[],元组用圆括号()
3. 性能:元组通常比列表更快,因为不可变性让它可以做更多优化。
4. 用途:列表适合需要经常修改的数据;元组适合表示固定的数据结构,如坐标点。
==================================================
切换到Claude模型...
Claude回答: ...
第二步:添加错误处理和重试机制
import litellm
from litellm import RateLimitError, TimeoutError, APIError
import time
class RobustQABot:
"""带错误处理的健壮问答机器人"""
def __init__(self, model="gpt-4", max_retries=3):
self.model = model
self.max_retries = max_retries
self.conversation_history = []
def ask(self, question, system_prompt=None):
"""带重试机制的提问方法"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
else:
messages.append({
"role": "system",
"content": "你是一个有帮助的AI助手。"
})
messages.extend(self.conversation_history)
messages.append({"role": "user", "content": question})
last_error = None
# 重试循环
for attempt in range(self.max_retries):
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=500,
request_timeout=30
)
answer = response.choices[0].message.content
# 更新历史
self.conversation_history.append({
"role": "user", "content": question
})
self.conversation_history.append({
"role": "assistant", "content": answer
})
return answer
except RateLimitError as e:
# 遇到速率限制,等待后重试
last_error = e
wait_time = 2 ** attempt # 指数退避
print(f"遇到速率限制,等待 {wait_time} 秒...")
time.sleep(wait_time)
except TimeoutError as e:
# 超时错误
last_error = e
print(f"请求超时,重试中... ({attempt + 1}/{self.max_retries})")
except APIError as e:
# 其他API错误
last_error = e
print(f"API错误: {e},重试中...")
except Exception as e:
# 未知错误
print(f"发生错误: {type(e).__name__}: {e}")
raise
# 所有重试都失败
raise Exception(f"重试 {self.max_retries} 次后仍然失败: {last_error}")
实战二:构建 RAG 知识库问答系统
Retrieval-Augmented Generation(RAG)是当前最流行的知识库问答架构。让我们使用litellm构建一个简化版的RAG系统。
完整的RAG系统代码
# rag_system.py
import litellm
import os
from dotenv import load_dotenv
from typing import List, Dict
import json
load_dotenv()
class SimpleVectorStore:
"""
简单的向量存储(用于演示)
生产环境建议使用 Chroma、Pinecone、Weaviate 等专业向量数据库
"""
def __init__(self):
self.documents = []
self.embeddings = []
def add_documents(self, texts: List[str], embeddings: List[List[float]]):
"""添加文档和对应的嵌入向量"""
for text, embedding in zip(texts, embeddings):
self.documents.append(text)
self.embeddings.append(embedding)
def similarity_search(self, query_embedding: List[float], top_k: int = 3) -> List[str]:
"""简单的余弦相似度搜索"""
if not self.documents:
return []
similarities = []
for doc_embedding in self.embeddings:
# 计算余弦相似度
dot_product = sum(a * b for a, b in zip(query_embedding, doc_embedding))
norm_a = sum(a * a for a in query_embedding) ** 0.5
norm_b = sum(b * b for b in doc_embedding) ** 0.5
similarity = dot_product / (norm_a * norm_b)
similarities.append(similarity)
# 获取top_k个最相似的文档
top_indices = sorted(range(len(similarities)),
key=lambda i: similarities[i],
reverse=True)[:top_k]
return [self.documents[i] for i in top_indices]
class RAGSystem:
"""检索增强生成系统"""
def __init__(self, embedding_model="text-embedding-3-small",
llm_model="gpt-4"):
self.embedding_model = embedding_model
self.llm_model = llm_model
self.vector_store = SimpleVectorStore()
self.knowledge_base = {} # 存储原始文档
def ingest_documents(self, documents: List[Dict[str, str]]):
"""
摄入文档到知识库
Args:
documents: 文档列表,每个文档包含 id, title, content
"""
texts = [doc["content"] for doc in documents]
# 为每个文档生成嵌入向量
print("正在生成嵌入向量...")
embeddings = []
for i, text in enumerate(texts):
response = litellm.embedding(
model=self.embedding_model,
input=text
)
embeddings.append(response.data[0].embedding)
print(f" 处理文档 {i+1}/{len(texts)}")
# 存入向量数据库
self.vector_store.add_documents(texts, embeddings)
# 保存原始文档
for doc in documents:
self.knowledge_base[doc["id"]] = doc
print(f"成功摄入 {len(documents)} 个文档")
def retrieve_context(self, query: str, top_k: int = 3) -> List[str]:
"""检索相关上下文"""
# 生成查询的嵌入向量
response = litellm.embedding(
model=self.embedding_model,
input=query
)
query_embedding = response.data[0].embedding
# 搜索最相似的文档
relevant_docs = self.vector_store.similarity_search(
query_embedding,
top_k=top_k
)
return relevant_docs
def ask(self, question: str) -> str:
"""
基于知识库回答问题
"""
# 1. 检索相关上下文
context_docs = self.retrieve_context(question, top_k=3)
if not context_docs:
return "抱歉,知识库中没有找到相关信息。"
# 2. 构建提示词
context_text = "\n\n".join([
f"文档 {i+1}:\n{doc}"
for i, doc in enumerate(context_docs)
])
prompt = f"""你是一个基于知识库的回答助手。请根据以下参考资料回答用户的问题。
参考资料:
{context_text}
用户问题:{question}
回答要求:
1. 只基于提供的参考资料回答
2. 如果参考资料中没有相关信息,请明确指出
3. 用清晰、专业的语言回答
"""
# 3. 调用语言模型
response = litellm.completion(
model=self.llm_model,
messages=[
{"role": "system", "content": "你是一个专业的知识库问答助手。"},
{"role": "user", "content": prompt}
],
temperature=0.3, # 较低温度保证准确性
max_tokens=800
)
return response.choices[0].message.content
# 使用示例
if __name__ == "__main__":
# 创建RAG系统
rag = RAGSystem(
embedding_model="text-embedding-3-small",
llm_model="gpt-4"
)
# 准备示例文档
documents = [
{
"id": "doc1",
"title": "Python基础",
"content": "Python是一种高级编程语言,由Guido van Rossum于1991年创建。它以简洁易读的语法著称,适合初学者学习编程。"
},
{
"id": "doc2",
"title": "Python特点",
"content": "Python的主要特点包括:1) 语法简洁易懂;2) 跨平台支持;3) 丰富的标准库;4) 强大的社区支持;5) 支持多种编程范式。"
},
{
"id": "doc3",
"title": "Python应用领域",
"content": "Python广泛应用于Web开发、数据分析、人工智能、科学计算、自动化运维等领域。在AI领域,TensorFlow、PyTorch等框架都支持Python。"
},
{
"id": "doc4",
"title": "JavaScript简介",
"content": "JavaScript是一种脚本语言,主要用于网页前端开发。它可以创建动态内容和交互效果。现代JavaScript也可用于后端开发(Node.js)和移动应用开发。"
}
]
# 摄入文档
print("=" * 50)
print("初始化知识库...")
rag.ingest_documents(documents)
# 问答测试
print("\n" + "=" * 50)
print("问答测试")
print("=" * 50)
questions = [
"Python是谁创建的?",
"Python有哪些特点?",
"Python在哪些领域有应用?",
"JavaScript主要用于什么?"
]
for q in questions:
print(f"\n问题: {q}")
print("-" * 30)
answer = rag.ask(q)
print(f"回答: {answer}")
实战三:批量处理与并发调用
当需要处理大量请求时,高效的批量处理至关重要:
import litellm
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import time
class BatchProcessor:
"""批量处理工具"""
def __init__(self, model="gpt-3.5-turbo", max_workers=5):
self.model = model
self.max_workers = max_workers
def process_sequential(self, prompts: List[str]) -> List[str]:
"""顺序处理"""
results = []
for i, prompt in enumerate(prompts):
print(f"处理 {i+1}/{len(prompts)}...")
response = litellm.completion(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
results.append(response.choices[0].message.content)
return results
def process_parallel(self, prompts: List[str]) -> List[Dict]:
"""并行处理(使用线程池)"""
results = []
def call_model(prompt, idx):
response = litellm.completion(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
return {
"index": idx,
"prompt": prompt,
"response": response.choices[0].message.content
}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(call_model, prompt, i): i
for i, prompt in enumerate(prompts)
}
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"完成 {result['index']+1}/{len(prompts)}")
# 按原始顺序返回
results.sort(key=lambda x: x["index"])
return results
async def process_async(self, prompts: List[str]) -> List[str]:
"""异步处理"""
results = [None] * len(prompts)
async def call_model(prompt: str, idx: int):
# 使用 asyncio.to_thread 在异步中运行同步的 litellm 调用
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: litellm.completion(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
)
results[idx] = response.choices[0].message.content
print(f"完成 {idx+1}/{len(prompts)}")
# 创建所有任务
tasks = [call_model(prompt, i) for i, prompt in enumerate(prompts)]
# 并发执行
await asyncio.gather(*tasks)
return results
# 使用示例
if __name__ == "__main__":
processor = BatchProcessor(model="gpt-3.5-turbo", max_workers=5)
# 测试数据
prompts = [
"解释什么是机器学习",
"Python的装饰器是什么",
"如何优化SQL查询",
"什么是微服务架构",
"解释RESTful API设计原则",
"什么是容器化技术",
"数据库索引的原理",
"分布式系统的一致性问题"
]
print("=" * 50)
print("顺序处理测试")
print("=" * 50)
start = time.time()
results = processor.process_sequential(prompts[:3]) # 只测试3个
print(f"耗时: {time.time() - start:.2f}秒")
print("\n" + "=" * 50)
print("并行处理测试")
print("=" * 50)
start = time.time()
results = processor.process_parallel(prompts)
print(f"耗时: {time.time() - start:.2f}秒")
实战四:构建多模型对比评估工具
有时候需要对比不同模型在同一个任务上的表现:
import litellm
from dataclasses import dataclass
from typing import List, Dict, Any
import json
@dataclass
class ModelResult:
"""模型评估结果"""
model: str
prompt: str
response: str
success: bool
error: str = None
latency: float = 0
tokens_used: int = 0
cost: float = 0
class ModelEvaluator:
"""多模型评估工具"""
def __init__(self):
self.models = [
"gpt-4",
"gpt-4-turbo-preview",
"gpt-3.5-turbo",
"claude-3-opus-20240229",
"claude-3-sonnet-20240229"
]
self.results = {}
def evaluate_task(self, task_name: str, prompt: str) -> Dict[str, ModelResult]:
"""
对单个任务评估所有模型
"""
print(f"\n{'='*60}")
print(f"评估任务: {task_name}")
print(f"{'='*60}")
task_results = {}
for model in self.models:
print(f"\n测试模型: {model}")
print("-" * 40)
try:
import time
start_time = time.time()
response = litellm.completion(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=60
)
end_time = time.time()
# 提取响应内容
content = response.choices[0].message.content
# 计算token使用量
usage = response.usage
total_tokens = usage.prompt_tokens + usage.completion_tokens
# 计算成本
cost = litellm.completion_cost(
model=model,
messages=[{"role": "user", "content": prompt}],
completion_response=response
)
result = ModelResult(
model=model,
prompt=prompt,
response=content,
success=True,
latency=end_time - start_time,
tokens_used=total_tokens,
cost=cost
)
print(f" ✓ 成功")
print(f" 延迟: {result.latency:.2f}秒")
print(f" Token: {result.tokens_used}")
print(f" 成本: ${result.cost:.6f}")
except Exception as e:
result = ModelResult(
model=model,
prompt=prompt,
response="",
success=False,
error=str(e)
)
print(f" ✗ 失败: {e}")
task_results[model] = result
self.results[task_name] = task_results
return task_results
def generate_report(self) -> str:
"""生成评估报告"""
report = []
report.append("\n" + "=" * 70)
report.append("模型评估报告")
report.append("=" * 70)
for task_name, task_results in self.results.items():
report.append(f"\n任务: {task_name}")
report.append("-" * 50)
# 按成本排序
successful = [r for r in task_results.values() if r.success]
if not successful:
report.append(" 所有模型均失败")
continue
successful.sort(key=lambda x: x.cost)
for i, result in enumerate(successful, 1):
report.append(f"\n [{i}] {result.model}")
report.append(f" 延迟: {result.latency:.2f}秒")
report.append(f" Token: {result.tokens_used}")
report.append(f" 成本: ${result.cost:.6f}")
# 只显示前200个字符
preview = result.response[:200].replace("\n", " ")
report.append(f" 响应预览: {preview}...")
return "\n".join(report)
# 使用示例
if __name__ == "__main__":
evaluator = ModelEvaluator()
# 定义评估任务
tasks = [
{
"name": "代码审查",
"prompt": """审查以下Python代码,指出潜在问题和改进建议:
```python
def get_user_data(user_id):
data = requests.get(f"https://api.example.com/users/{user_id}")
return data.json()
```"""
},
{
"name": "技术解释",
"prompt": "用通俗易懂的语言解释什么是数据库事务,以及为什么它很重要。"
}
]
# 执行评估
for task in tasks:
evaluator.evaluate_task(task["name"], task["prompt"])
# 生成报告
print(evaluator.generate_report())
实战五:使用 Proxy 构建团队 AI 服务平台
当团队需要共享 AI 能力时,Proxy Server 是最佳选择:
第一步:编写配置文件
# litellm_config.yaml
model_list:
# OpenAI 模型
- model_name: gpt-4-turbo
litellm_params:
model: gpt-4-turbo
api_key: os.environ/OPENAI_API_KEY
# Anthropic 模型
- model_name: claude-3-opus
litellm_params:
model: claude-3-opus-20240229
api_key: os.environ/ANTHROPIC_API_KEY
# Azure 模型
- model_name: azure-gpt-4
litellm_params:
model: azure/gpt-4
api_base: os.environ/AZURE_API_BASE
api_key: os.environ/AZURE_API_KEY
api_version: "2024-02-01"
# 通用设置
general_settings:
master_key: sk-team-secret-key # 团队访问密钥
database_url: sqlite:///litellm.db # 日志数据库
# Litellm 日志设置
litellm_settings:
drop_params: true
set_verbose: true
request_timeout: 60
telemetry: false # 关闭遥测
第二步:启动服务
# 使用 Docker 启动
docker run -d \
--name litellm-proxy \
-p 4000:4000 \
-v $(pwd)/litellm_config.yaml:/app/config.yaml \
-e OPENAI_API_KEY=$OPENAI_API_KEY \
-e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \
-e AZURE_API_KEY=$AZURE_API_KEY \
-e AZURE_API_BASE=$AZURE_API_BASE \
ghcr.io/berriai/litellm:main
第三步:团队成员使用
# team_member_a.py - 团队成员A的代码
import openai
# 连接到团队代理服务
client = openai.OpenAI(
api_key="sk-team-secret-key",
base_url="http://litellm-proxy:4000"
)
# 使用团队共享的模型
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "帮我写一个用户认证模块"}]
)
print(response.choices[0].message.content)
第四步:使用管理 API
import requests
BASE_URL = "http://litellm-proxy:4000"
headers = {"Authorization": "Bearer sk-team-secret-key"}
# 查看所有可用模型
response = requests.get(f"{BASE_URL}/v1/model/info", headers=headers)
print("可用模型:", response.json())
# 查看使用统计
response = requests.get(f"{BASE_URL}/v1 Spend/all", headers=headers)
print("花费统计:", response.json())
# 查看活跃请求
response = requests.get(f"{BASE_URL}/v1/active_requests", headers=headers)
print("活跃请求:", response.json())
常见使用场景 / Common Use Cases
场景一:内容审核系统
import litellm
from enum import Enum
class ContentType(Enum):
"""内容类型枚举"""
TEXT = "text"
IMAGE = "image"
MULTIMODAL = "multimodal"
class ContentModerator:
"""内容审核器"""
def __init__(self):
self.system_prompt = """你是一个内容安全审核专家。请判断用户输入的内容是否包含以下违规类型:
1. 暴力血腥
2. 色情低俗
3. 政治敏感
4. 违法犯罪
5. 虚假信息
6. 仇恨言论
请用JSON格式返回结果,格式如下:
{
"is_safe": true/false,
"violation_types": ["违规类型1", "违规类型2"],
"risk_level": "low/medium/high",
"reason": "判断理由"
}"""
def moderate(self, content: str, content_type: ContentType = ContentType.TEXT):
"""审核内容"""
if content_type == ContentType.TEXT:
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": content}
]
else:
# 多模态审核
messages = [
{"role": "system", "content": self.system_prompt + "\n\n请同时审核图片和文字内容。"},
{"role": "user", "content": [
{"type": "text", "text": f"审核内容: {content}"},
{"type": "image_url", "image_url": {"url": content}}
]}
]
response = litellm.completion(
model="gpt-4-turbo",
messages=messages,
response_format={"type": "json_object"}
)
import json
result = json.loads(response.choices[0].message.content)
return result
# 使用示例
moderator = ContentModerator()
test_content = "这是一段正常的技术讨论内容"
result = moderator.moderate(test_content)
print(f"内容安全: {result['is_safe']}")
print(f"风险等级: {result['risk_level']}")
场景二:智能客服系统
import litellm
from typing import Dict, List
import json
class SmartCustomerService:
"""智能客服系统"""
def __init__(self):
self.conversation_history
总结
本文详细介绍了该项目的核心功能和使用方法,从环境搭建到实战应用。希望读者能通过本教程快速上手,在实际项目中灵活运用。如果你有任何问题或建议,欢迎在评论区交流讨论。
评论区