别再为模型切换头疼了!litellm 一行代码搞定100+大模型调用指南

别再为模型切换头疼了!litellm 一行代码搞定100+大模型调用指南

别再为模型切换头疼了!litellm 一行代码搞定100+大模型调用指南

在AI应用开发中,你是否遇到过这些痛点:项目需要切换到不同的大模型时,代码要重写;调试不同API的认证方式让你抓狂;想对比不同模型的效果,却要维护多套接口?

今天要介绍的这个开源项目——litellm,正是为解决这些难题而生。它用一个统一的接口,让你用完全相同的方式调用OpenAI、Anthropic、Google、Azure、Hugging Face等100多个大模型服务。无论你是要快速原型开发,还是构建生产级AI应用,litellm都能大幅简化你的工作。


为什么值得关注 / 项目价值分析

当前大模型集成的困境

在大模型应用开发中,开发者面临的最大挑战之一就是模型选择和切换。随着AI领域快速发展,新的模型和服务层出不穷:

  • OpenAI 的 GPT-4 系列持续更新迭代
  • Anthropic 推出 Claude 系列,以长上下文和安全著称
  • Google 的 Gemini 开始挑战市场格局
  • 开源社区 贡献了 Llama、Mistral、Falcon 等优秀模型
  • 各大云厂商 都推出了自己的大模型服务

当你想要切换模型时,传统做法需要重写大量代码,因为每个服务商的API格式、认证方式、错误处理都各不相同。这种紧耦合的设计不仅增加了开发成本,也让后续维护变得困难重重。

litellm 的核心价值

litellm 正是为解决这个痛点而设计的。它的核心理念是:提供一个统一的、接口一致的调用方式,让任何支持的大模型都能像调用本地函数一样简单

这个项目的亮点非常突出:

第一,统一接口设计。无论你调用的是OpenAI还是Anthropic,代码几乎不需要任何改动。这种设计让你可以轻松地在不同模型之间切换,进行A/B测试,或者根据不同场景选择最适合的模型。

第二,开箱即用的企业级功能。litellm 内置了重试机制、熔断器、负载均衡、请求日志、成本追踪等生产环境必备的功能。你不需要自己实现这些基础设施,可以专注于业务逻辑开发。

第三,灵活的部署方式。litellm 支持直接API调用,也支持本地部署开源模型。这种灵活性让你可以根据成本、安全性、性能等因素灵活选择。

第四,活跃的社区和持续的更新。作为BerriAI维护的开源项目,litellm拥有活跃的贡献者社区,持续跟进各大模型服务商的新功能。这个项目的GitHub Star数已经超过3万,足以说明其受欢迎程度。

典型应用场景

litellm 特别适合以下场景:

  • AI应用原型开发:需要快速尝试不同模型,找到最适合项目需求的那个
  • 多模型编排:在同一个应用中集成多个模型,处理不同类型的任务
  • 成本优化:通过对比不同模型的价格和性能,找到最优解
  • 负载均衡和容错:在高并发场景下保证服务稳定性
  • 企业内部AI平台:构建统一的大模型调用服务,规范团队的开发方式

环境搭建 / Getting Started

前置要求

在开始使用 litellm 之前,你需要准备以下环境:

Python 版本:litellm 要求 Python 3.8 或更高版本。建议使用 Python 3.10 或更新版本,以获得最佳性能和兼容性。

包管理工具:建议使用 pip 或 conda 来管理 Python 依赖。

API 密钥:你需要准备想要使用的大模型服务的 API 密钥。不同的服务商有不同的获取方式:

  • OpenAI:访问 platform.openai.com 注册并获取 API Key
  • Anthropic:访问 console.anthropic.com 注册并获取 API Key
  • Azure OpenAI:通过 Azure 门户创建资源后获取
  • Google Vertex AI:在 Google Cloud Platform 创建项目并配置服务账号

安装 litellm

安装 litellm 非常简单,只需要一条 pip 命令:

pip install litellm

如果你需要使用某些特定功能,可能需要安装额外的依赖包。例如:

# 完整安装,包含所有可选依赖
pip install litellm[all]

# 只安装常用依赖
pip install litellm[proxy]

# 安装流式输出支持
pip install litellm[streaming]

安装完成后,可以通过以下命令验证安装是否成功:

import litellm
print(litellm.__version__)

基础配置

litellm 支持多种配置方式,从简单到复杂,可以根据你的需求选择。

方式一:环境变量配置

这是最简单的方式,只需要设置环境变量即可。创建一个 .env 文件:

# OpenAI 配置
OPENAI_API_KEY=sk-xxxxx

# Anthropic 配置
ANTHROPIC_API_KEY=sk-ant-xxxxx

# Azure OpenAI 配置
AZURE_API_KEY=xxxxx
AZURE_API_BASE=https://xxxxx.openai.azure.com
AZURE_API_VERSION=2024-02-01

# 模型降级配置(当主模型不可用时自动切换)
MODEL_BATCH_WRITE_PRICE=0.00003
MODEL_BATCH_WRITE_MAX_TOKENS=2048
MODEL_BATCH_WRITE_MODEL=gpt-3.5-turbo

然后在代码中加载这些环境变量:

from dotenv import load_dotenv
load_dotenv()  # 加载 .env 文件中的环境变量

import litellm
# 现在可以直接使用模型调用
response = litellm.completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "你好"}]
)

方式二:配置文件方式

对于更复杂的场景,可以使用 YAML 配置文件:

# litellm_config.yaml
model_list:
  - model_name: gpt-4-turbo
    litellm_params:
      model: gpt-4-turbo
      api_key: os.environ/GPT4_API_KEY
      temperature: 0.7
      max_tokens: 2048

  - model_name: claude-3-opus
    litellm_params:
      model: claude-3-opus-20240229
      api_key: os.environ/ANTHROPIC_API_KEY

  - model_name: azure-gpt-4
    litellm_params:
      model: gpt-4
      api_base: os.environ/AZURE_API_BASE
      api_key: os.environ/AZURE_API_KEY
      api_version: "2024-02-01"

在代码中加载这个配置:

import litellm
import yaml

# 读取配置
with open('litellm_config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# 设置配置
litellm.set_verbose = True  # 开启详细日志

方式三:使用 Proxy Server

litellm 还提供了一个强大的代理服务器功能,特别适合团队协作和生产环境部署:

pip install 'litellm[proxy]'

# 启动代理服务器
litellm --config your_config.yaml

# 或者使用 Docker
docker run \
    -e AZURE_API_KEY=your-azure-key \
    -e OPENAI_API_KEY=your-openai-key \
    -v $(pwd)/config.yaml:/app/config.yaml \
    -p 4000:4000 \
    ghcr.io/berriai/litellm:main

代理服务器启动后,所有支持的模型都可以通过统一的 API 端点访问:

# 通过代理服务器调用
import openai

client = openai.OpenAI(
    api_key="sk-1234",  # 代理服务器的访问密钥
    base_url="http://localhost:4000"
)

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "你好"}]
)

核心功能详解 / Core Features

统一的 Completion API

litellm 的核心是一个统一的 completion 接口。无论你使用哪个模型服务商,调用方式完全一致:

import litellm

# 调用 OpenAI GPT-4
response = litellm.completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "用Python写一个快速排序"}]
)
print(response.choices[0].message.content)

如果想要切换到 Claude,只需要改一个参数:

# 切换到 Anthropic Claude,代码几乎不需要改动
response = litellm.completion(
    model="claude-3-opus-20240229",
    messages=[{"role": "user", "content": "用Python写一个快速排序"}]
)
print(response.choices[0].message.content)

这个设计让你可以在完全不修改业务逻辑的情况下,测试和切换不同的模型。

支持的模型列表

litellm 支持的模型非常广泛,截至目前已经超过100个。以下是主要分类:

OpenAI 系列

  • gpt-4-turbo、gpt-4、gpt-4-32k
  • gpt-3.5-turbo、gpt-3.5-turbo-16k
  • gpt-4-vision-preview(支持图片理解)

Anthropic 系列

  • claude-3-opus、claude-3-sonnet、claude-3-haiku
  • claude-2.1、claude-2.0
  • claude-instant

Google 系列

  • gemini-pro、gemini-pro-vision
  • text-bison-002、chat-bison-002
  • 通过 Vertex AI 访问的 PaLM2、Gemini 等

开源模型

  • Llama 2 系列(meta-llama/Llama-2-70b-chat)
  • Mistral 系列(mistral/Mistral-7B-Instruct)
  • Vicuna、Alpaca、Falcon 等

Azure OpenAI

  • 所有 Azure 托管的 OpenAI 模型

其他服务商

  • AWS Bedrock(Claude、Llama、Titan 等)
  • Cohere(command-r 系列)
  • AI21(Jurassic-2 系列)
  • Replicate(各种开源模型)
  • Hugging Face(Inference Endpoints)

高级参数配置

litellm 不仅支持基本的调用,还支持丰富的参数配置:

response = litellm.completion(
    model="gpt-4-turbo",
    messages=[
        {"role": "system", "content": "你是一位专业的Python教练"},
        {"role": "user", "content": "解释什么是装饰器"}
    ],
    temperature=0.7,           # 控制随机性,0-2之间
    max_tokens=1000,           # 最大生成token数
    top_p=0.9,                 # 核采样参数
    frequency_penalty=0.5,    # 频率惩罚
    presence_penalty=0.3,      # 存在惩罚
    stop=["END"],             # 停止词
    response_format={"type": "json_object"},  # JSON输出格式
    seed=42,                  # 随机种子,用于可复现结果
    user="user_123"           # 用户标识
)

流式输出支持

对于需要实时展示生成内容的应用,流式输出非常有用:

response = litellm.completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "给我讲一个关于AI的科幻故事"}],
    stream=True
)

# 逐块获取响应
for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Embeddings 嵌入向量

litellm 同样支持生成文本嵌入向量,这在 RAG(检索增强生成)等场景中非常有用:

# 使用 OpenAI 的 text-embedding-3-small
response = litellm.embedding(
    model="text-embedding-3-small",
    input="这是一个用于生成嵌入向量的示例文本"
)
print(response.data[0].embedding)

# 切换到其他服务商只需要改模型名
response = litellm.embedding(
    model="cohere/embed-v3-light",
    input="这是一个用于生成嵌入向量的示例文本"
)

多模态支持

现代大模型已经支持图像理解,litellm 也提供了完善的图片处理能力:

response = litellm.completion(
    model="gpt-4-vision-preview",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "这张图片里有什么?"},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": "https://example.com/image.jpg"
                    }
                }
            ]
        }
    ],
    max_tokens=300
)

你也可以使用 base64 编码的图片:

import base64

# 读取本地图片并转为base64
with open("image.jpg", "rb") as f:
    image_base64 = base64.b64encode(f.read()).decode()

response = litellm.completion(
    model="gpt-4-vision-preview",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "分析这张图片"},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{image_base64}"
                    }
                }
            ]
        }
    ]
)

模型降级与容错机制

生产环境中,某个模型可能会临时不可用。litellm 提供了智能的模型降级功能:

# 定义一个模型组,支持自动降级
litellm.fallbacks = [
    {"model": "gpt-4-turbo"},
    {"model": "gpt-3.5-turbo"},
    {"model": "claude-3-haiku-20240307"}
]

response = litellm.completion(
    model="gpt-4-turbo",
    messages=[{"role": "user", "content": "复杂的推理任务"}],
    fallbacks=litellm.fallbacks
)

当 GPT-4-Turbo 不可用时,系统会自动尝试 GPT-3.5-Turbo,再不行就尝试 Claude Haiku。

成本追踪

litellm 内置了成本追踪功能,让你清楚知道每次调用的花费:

# 启用成本追踪
import litellm
litellm.success_callback = ["liteLLMCallback"]

# 或者创建自定义回调
class CostTracker(litellm.Callback):
    def __init__(self):
        self.total_cost = 0
        self.request_count = 0

    def on_successful_response(self, response, kwargs, start_time, end_time):
        cost = litellm.completion_cost(completion_response=response)
        self.total_cost += cost
        self.request_count += 1
        print(f"请求 #{self.request_count}: 花费 ${cost:.6f}, 累计 ${self.total_cost:.6f}")

tracker = CostTracker()
litellm.callbacks = [tracker]

# 多次调用后查看总成本
response = litellm.completion(model="gpt-4", messages=[{"role": "user", "content": "Hello"}])

你也可以手动计算成本:

# 创建请求前估算成本
cost = litellm.completion_cost(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}],
    completion_response=None  # 传None会估算
)
print(f"预计成本: ${cost:.6f}")

# 或者计算已返回响应的成本
response = litellm.completion(model="gpt-4", messages=[{"role": "user", "content": "Hello"}])
actual_cost = litellm.completion_cost(completion_response=response)
print(f"实际成本: ${actual_cost:.6f}")

速率限制与重试机制

litellm 自动处理 API 的速率限制和临时错误:

response = litellm.completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "分析这个代码"}],
    max_retries=3,           # 最大重试次数
    request_timeout=60       # 请求超时时间(秒)
)

你也可以自定义重试策略:

from litellm import RetryConfig

retry_config = RetryConfig(
    max_retries=5,
    retry_on=["rate_limit_error", "timeout", "api_error"],
    backoff_factor=0.5,
    jitter=True
)

response = litellm.completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "请求内容"}],
    retry_config=retry_config
)

实战教程 / Step-by-Step Tutorial

实战一:构建一个智能问答机器人

让我们从头开始,构建一个基于litellm的智能问答系统。

第一步:创建基础项目结构

# qa_bot.py
import litellm
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

class QABot:
    """智能问答机器人类"""

    def __init__(self, model="gpt-4"):
        self.model = model
        self.conversation_history = []

    def ask(self, question, system_prompt=None):
        """
        提问并获取回答

        Args:
            question: 用户问题
            system_prompt: 系统提示词

        Returns:
            模型的回答
        """
        messages = []

        # 添加系统提示
        if system_prompt:
            messages.append({
                "role": "system",
                "content": system_prompt
            })
        else:
            messages.append({
                "role": "system",
                "content": "你是一个有帮助的AI助手,用简洁清晰的语言回答问题。"
            })

        # 添加对话历史
        messages.extend(self.conversation_history)

        # 添加当前问题
        messages.append({
            "role": "user",
            "content": question
        })

        # 调用模型
        response = litellm.completion(
            model=self.model,
            messages=messages,
            temperature=0.7,
            max_tokens=500
        )

        # 提取回答
        answer = response.choices[0].message.content

        # 更新对话历史
        self.conversation_history.append({
            "role": "user",
            "content": question
        })
        self.conversation_history.append({
            "role": "assistant",
            "content": answer
        })

        return answer

    def clear_history(self):
        """清除对话历史"""
        self.conversation_history = []

    def switch_model(self, new_model):
        """切换模型"""
        self.model = new_model
        self.clear_history()  # 切换模型时清除历史

# 使用示例
if __name__ == "__main__":
    bot = QABot(model="gpt-4")

    # 第一次对话
    response1 = bot.ask("Python中的列表和元组有什么区别?")
    print("Bot回答:", response1)

    # 继续对话
    response2 = bot.ask("能给我一些代码示例吗?")
    print("Bot回答:", response2)

    print("\n" + "="*50)
    print("切换到Claude模型...")
    bot.switch_model("claude-3-opus-20240229")

    response3 = bot.ask("同样的问题,用Claude回答一下")
    print("Claude回答:", response3)

运行效果

Bot回答: 列表(List)和元组(Tuple)主要有以下区别:

1. 可变性:列表是可变的,可以添加、删除、修改元素;元组是不可变的,创建后不能修改。

2. 语法:列表用方括号[],元组用圆括号()

3. 性能:元组通常比列表更快,因为不可变性让它可以做更多优化。

4. 用途:列表适合需要经常修改的数据;元组适合表示固定的数据结构,如坐标点。

==================================================
切换到Claude模型...
Claude回答: ...

第二步:添加错误处理和重试机制

import litellm
from litellm import RateLimitError, TimeoutError, APIError
import time

class RobustQABot:
    """带错误处理的健壮问答机器人"""

    def __init__(self, model="gpt-4", max_retries=3):
        self.model = model
        self.max_retries = max_retries
        self.conversation_history = []

    def ask(self, question, system_prompt=None):
        """带重试机制的提问方法"""
        messages = []

        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        else:
            messages.append({
                "role": "system",
                "content": "你是一个有帮助的AI助手。"
            })

        messages.extend(self.conversation_history)
        messages.append({"role": "user", "content": question})

        last_error = None

        # 重试循环
        for attempt in range(self.max_retries):
            try:
                response = litellm.completion(
                    model=self.model,
                    messages=messages,
                    temperature=0.7,
                    max_tokens=500,
                    request_timeout=30
                )

                answer = response.choices[0].message.content

                # 更新历史
                self.conversation_history.append({
                    "role": "user", "content": question
                })
                self.conversation_history.append({
                    "role": "assistant", "content": answer
                })

                return answer

            except RateLimitError as e:
                # 遇到速率限制,等待后重试
                last_error = e
                wait_time = 2 ** attempt  # 指数退避
                print(f"遇到速率限制,等待 {wait_time} 秒...")
                time.sleep(wait_time)

            except TimeoutError as e:
                # 超时错误
                last_error = e
                print(f"请求超时,重试中... ({attempt + 1}/{self.max_retries})")

            except APIError as e:
                # 其他API错误
                last_error = e
                print(f"API错误: {e},重试中...")

            except Exception as e:
                # 未知错误
                print(f"发生错误: {type(e).__name__}: {e}")
                raise

        # 所有重试都失败
        raise Exception(f"重试 {self.max_retries} 次后仍然失败: {last_error}")

实战二:构建 RAG 知识库问答系统

Retrieval-Augmented Generation(RAG)是当前最流行的知识库问答架构。让我们使用litellm构建一个简化版的RAG系统。

完整的RAG系统代码

# rag_system.py
import litellm
import os
from dotenv import load_dotenv
from typing import List, Dict
import json

load_dotenv()

class SimpleVectorStore:
    """
    简单的向量存储(用于演示)
    生产环境建议使用 Chroma、Pinecone、Weaviate 等专业向量数据库
    """

    def __init__(self):
        self.documents = []
        self.embeddings = []

    def add_documents(self, texts: List[str], embeddings: List[List[float]]):
        """添加文档和对应的嵌入向量"""
        for text, embedding in zip(texts, embeddings):
            self.documents.append(text)
            self.embeddings.append(embedding)

    def similarity_search(self, query_embedding: List[float], top_k: int = 3) -> List[str]:
        """简单的余弦相似度搜索"""
        if not self.documents:
            return []

        similarities = []
        for doc_embedding in self.embeddings:
            # 计算余弦相似度
            dot_product = sum(a * b for a, b in zip(query_embedding, doc_embedding))
            norm_a = sum(a * a for a in query_embedding) ** 0.5
            norm_b = sum(b * b for b in doc_embedding) ** 0.5
            similarity = dot_product / (norm_a * norm_b)
            similarities.append(similarity)

        # 获取top_k个最相似的文档
        top_indices = sorted(range(len(similarities)), 
                            key=lambda i: similarities[i], 
                            reverse=True)[:top_k]

        return [self.documents[i] for i in top_indices]


class RAGSystem:
    """检索增强生成系统"""

    def __init__(self, embedding_model="text-embedding-3-small", 
                 llm_model="gpt-4"):
        self.embedding_model = embedding_model
        self.llm_model = llm_model
        self.vector_store = SimpleVectorStore()
        self.knowledge_base = {}  # 存储原始文档

    def ingest_documents(self, documents: List[Dict[str, str]]):
        """
        摄入文档到知识库

        Args:
            documents: 文档列表,每个文档包含 id, title, content
        """
        texts = [doc["content"] for doc in documents]

        # 为每个文档生成嵌入向量
        print("正在生成嵌入向量...")
        embeddings = []
        for i, text in enumerate(texts):
            response = litellm.embedding(
                model=self.embedding_model,
                input=text
            )
            embeddings.append(response.data[0].embedding)
            print(f"  处理文档 {i+1}/{len(texts)}")

        # 存入向量数据库
        self.vector_store.add_documents(texts, embeddings)

        # 保存原始文档
        for doc in documents:
            self.knowledge_base[doc["id"]] = doc

        print(f"成功摄入 {len(documents)} 个文档")

    def retrieve_context(self, query: str, top_k: int = 3) -> List[str]:
        """检索相关上下文"""
        # 生成查询的嵌入向量
        response = litellm.embedding(
            model=self.embedding_model,
            input=query
        )
        query_embedding = response.data[0].embedding

        # 搜索最相似的文档
        relevant_docs = self.vector_store.similarity_search(
            query_embedding, 
            top_k=top_k
        )

        return relevant_docs

    def ask(self, question: str) -> str:
        """
        基于知识库回答问题
        """
        # 1. 检索相关上下文
        context_docs = self.retrieve_context(question, top_k=3)

        if not context_docs:
            return "抱歉,知识库中没有找到相关信息。"

        # 2. 构建提示词
        context_text = "\n\n".join([
            f"文档 {i+1}:\n{doc}" 
            for i, doc in enumerate(context_docs)
        ])

        prompt = f"""你是一个基于知识库的回答助手。请根据以下参考资料回答用户的问题。

参考资料:
{context_text}

用户问题:{question}

回答要求:
1. 只基于提供的参考资料回答
2. 如果参考资料中没有相关信息,请明确指出
3. 用清晰、专业的语言回答
"""

        # 3. 调用语言模型
        response = litellm.completion(
            model=self.llm_model,
            messages=[
                {"role": "system", "content": "你是一个专业的知识库问答助手。"},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,  # 较低温度保证准确性
            max_tokens=800
        )

        return response.choices[0].message.content


# 使用示例
if __name__ == "__main__":
    # 创建RAG系统
    rag = RAGSystem(
        embedding_model="text-embedding-3-small",
        llm_model="gpt-4"
    )

    # 准备示例文档
    documents = [
        {
            "id": "doc1",
            "title": "Python基础",
            "content": "Python是一种高级编程语言,由Guido van Rossum于1991年创建。它以简洁易读的语法著称,适合初学者学习编程。"
        },
        {
            "id": "doc2", 
            "title": "Python特点",
            "content": "Python的主要特点包括:1) 语法简洁易懂;2) 跨平台支持;3) 丰富的标准库;4) 强大的社区支持;5) 支持多种编程范式。"
        },
        {
            "id": "doc3",
            "title": "Python应用领域",
            "content": "Python广泛应用于Web开发、数据分析、人工智能、科学计算、自动化运维等领域。在AI领域,TensorFlow、PyTorch等框架都支持Python。"
        },
        {
            "id": "doc4",
            "title": "JavaScript简介",
            "content": "JavaScript是一种脚本语言,主要用于网页前端开发。它可以创建动态内容和交互效果。现代JavaScript也可用于后端开发(Node.js)和移动应用开发。"
        }
    ]

    # 摄入文档
    print("=" * 50)
    print("初始化知识库...")
    rag.ingest_documents(documents)

    # 问答测试
    print("\n" + "=" * 50)
    print("问答测试")
    print("=" * 50)

    questions = [
        "Python是谁创建的?",
        "Python有哪些特点?",
        "Python在哪些领域有应用?",
        "JavaScript主要用于什么?"
    ]

    for q in questions:
        print(f"\n问题: {q}")
        print("-" * 30)
        answer = rag.ask(q)
        print(f"回答: {answer}")

实战三:批量处理与并发调用

当需要处理大量请求时,高效的批量处理至关重要:

import litellm
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import time

class BatchProcessor:
    """批量处理工具"""

    def __init__(self, model="gpt-3.5-turbo", max_workers=5):
        self.model = model
        self.max_workers = max_workers

    def process_sequential(self, prompts: List[str]) -> List[str]:
        """顺序处理"""
        results = []
        for i, prompt in enumerate(prompts):
            print(f"处理 {i+1}/{len(prompts)}...")
            response = litellm.completion(
                model=self.model,
                messages=[{"role": "user", "content": prompt}]
            )
            results.append(response.choices[0].message.content)
        return results

    def process_parallel(self, prompts: List[str]) -> List[Dict]:
        """并行处理(使用线程池)"""
        results = []

        def call_model(prompt, idx):
            response = litellm.completion(
                model=self.model,
                messages=[{"role": "user", "content": prompt}]
            )
            return {
                "index": idx,
                "prompt": prompt,
                "response": response.choices[0].message.content
            }

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(call_model, prompt, i): i 
                for i, prompt in enumerate(prompts)
            }

            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                print(f"完成 {result['index']+1}/{len(prompts)}")

        # 按原始顺序返回
        results.sort(key=lambda x: x["index"])
        return results

    async def process_async(self, prompts: List[str]) -> List[str]:
        """异步处理"""
        results = [None] * len(prompts)

        async def call_model(prompt: str, idx: int):
            # 使用 asyncio.to_thread 在异步中运行同步的 litellm 调用
            loop = asyncio.get_event_loop()
            response = await loop.run_in_executor(
                None,
                lambda: litellm.completion(
                    model=self.model,
                    messages=[{"role": "user", "content": prompt}]
                )
            )
            results[idx] = response.choices[0].message.content
            print(f"完成 {idx+1}/{len(prompts)}")

        # 创建所有任务
        tasks = [call_model(prompt, i) for i, prompt in enumerate(prompts)]

        # 并发执行
        await asyncio.gather(*tasks)

        return results


# 使用示例
if __name__ == "__main__":
    processor = BatchProcessor(model="gpt-3.5-turbo", max_workers=5)

    # 测试数据
    prompts = [
        "解释什么是机器学习",
        "Python的装饰器是什么",
        "如何优化SQL查询",
        "什么是微服务架构",
        "解释RESTful API设计原则",
        "什么是容器化技术",
        "数据库索引的原理",
        "分布式系统的一致性问题"
    ]

    print("=" * 50)
    print("顺序处理测试")
    print("=" * 50)
    start = time.time()
    results = processor.process_sequential(prompts[:3])  # 只测试3个
    print(f"耗时: {time.time() - start:.2f}秒")

    print("\n" + "=" * 50)
    print("并行处理测试")
    print("=" * 50)
    start = time.time()
    results = processor.process_parallel(prompts)
    print(f"耗时: {time.time() - start:.2f}秒")

实战四:构建多模型对比评估工具

有时候需要对比不同模型在同一个任务上的表现:

import litellm
from dataclasses import dataclass
from typing import List, Dict, Any
import json

@dataclass
class ModelResult:
    """模型评估结果"""
    model: str
    prompt: str
    response: str
    success: bool
    error: str = None
    latency: float = 0
    tokens_used: int = 0
    cost: float = 0

class ModelEvaluator:
    """多模型评估工具"""

    def __init__(self):
        self.models = [
            "gpt-4",
            "gpt-4-turbo-preview",
            "gpt-3.5-turbo",
            "claude-3-opus-20240229",
            "claude-3-sonnet-20240229"
        ]
        self.results = {}

    def evaluate_task(self, task_name: str, prompt: str) -> Dict[str, ModelResult]:
        """
        对单个任务评估所有模型
        """
        print(f"\n{'='*60}")
        print(f"评估任务: {task_name}")
        print(f"{'='*60}")

        task_results = {}

        for model in self.models:
            print(f"\n测试模型: {model}")
            print("-" * 40)

            try:
                import time
                start_time = time.time()

                response = litellm.completion(
                    model=model,
                    messages=[{"role": "user", "content": prompt}],
                    timeout=60
                )

                end_time = time.time()

                # 提取响应内容
                content = response.choices[0].message.content

                # 计算token使用量
                usage = response.usage
                total_tokens = usage.prompt_tokens + usage.completion_tokens

                # 计算成本
                cost = litellm.completion_cost(
                    model=model,
                    messages=[{"role": "user", "content": prompt}],
                    completion_response=response
                )

                result = ModelResult(
                    model=model,
                    prompt=prompt,
                    response=content,
                    success=True,
                    latency=end_time - start_time,
                    tokens_used=total_tokens,
                    cost=cost
                )

                print(f"  ✓ 成功")
                print(f"  延迟: {result.latency:.2f}秒")
                print(f"  Token: {result.tokens_used}")
                print(f"  成本: ${result.cost:.6f}")

            except Exception as e:
                result = ModelResult(
                    model=model,
                    prompt=prompt,
                    response="",
                    success=False,
                    error=str(e)
                )
                print(f"  ✗ 失败: {e}")

            task_results[model] = result
            self.results[task_name] = task_results

        return task_results

    def generate_report(self) -> str:
        """生成评估报告"""
        report = []
        report.append("\n" + "=" * 70)
        report.append("模型评估报告")
        report.append("=" * 70)

        for task_name, task_results in self.results.items():
            report.append(f"\n任务: {task_name}")
            report.append("-" * 50)

            # 按成本排序
            successful = [r for r in task_results.values() if r.success]
            if not successful:
                report.append("  所有模型均失败")
                continue

            successful.sort(key=lambda x: x.cost)

            for i, result in enumerate(successful, 1):
                report.append(f"\n  [{i}] {result.model}")
                report.append(f"      延迟: {result.latency:.2f}秒")
                report.append(f"      Token: {result.tokens_used}")
                report.append(f"      成本: ${result.cost:.6f}")
                # 只显示前200个字符
                preview = result.response[:200].replace("\n", " ")
                report.append(f"      响应预览: {preview}...")

        return "\n".join(report)


# 使用示例
if __name__ == "__main__":
    evaluator = ModelEvaluator()

    # 定义评估任务
    tasks = [
        {
            "name": "代码审查",
            "prompt": """审查以下Python代码,指出潜在问题和改进建议:

```python
def get_user_data(user_id):
    data = requests.get(f"https://api.example.com/users/{user_id}")
    return data.json()
```"""
        },
        {
            "name": "技术解释",
            "prompt": "用通俗易懂的语言解释什么是数据库事务,以及为什么它很重要。"
        }
    ]

    # 执行评估
    for task in tasks:
        evaluator.evaluate_task(task["name"], task["prompt"])

    # 生成报告
    print(evaluator.generate_report())

实战五:使用 Proxy 构建团队 AI 服务平台

当团队需要共享 AI 能力时,Proxy Server 是最佳选择:

第一步:编写配置文件

# litellm_config.yaml
model_list:
  # OpenAI 模型
  - model_name: gpt-4-turbo
    litellm_params:
      model: gpt-4-turbo
      api_key: os.environ/OPENAI_API_KEY

  # Anthropic 模型
  - model_name: claude-3-opus
    litellm_params:
      model: claude-3-opus-20240229
      api_key: os.environ/ANTHROPIC_API_KEY

  # Azure 模型
  - model_name: azure-gpt-4
    litellm_params:
      model: azure/gpt-4
      api_base: os.environ/AZURE_API_BASE
      api_key: os.environ/AZURE_API_KEY
      api_version: "2024-02-01"

# 通用设置
general_settings:
  master_key: sk-team-secret-key  # 团队访问密钥
  database_url: sqlite:///litellm.db  # 日志数据库

# Litellm 日志设置
litellm_settings:
  drop_params: true
  set_verbose: true
  request_timeout: 60
  telemetry: false  # 关闭遥测

第二步:启动服务

# 使用 Docker 启动
docker run -d \
  --name litellm-proxy \
  -p 4000:4000 \
  -v $(pwd)/litellm_config.yaml:/app/config.yaml \
  -e OPENAI_API_KEY=$OPENAI_API_KEY \
  -e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \
  -e AZURE_API_KEY=$AZURE_API_KEY \
  -e AZURE_API_BASE=$AZURE_API_BASE \
  ghcr.io/berriai/litellm:main

第三步:团队成员使用

# team_member_a.py - 团队成员A的代码
import openai

# 连接到团队代理服务
client = openai.OpenAI(
    api_key="sk-team-secret-key",
    base_url="http://litellm-proxy:4000"
)

# 使用团队共享的模型
response = client.chat.completions.create(
    model="gpt-4-turbo",
    messages=[{"role": "user", "content": "帮我写一个用户认证模块"}]
)
print(response.choices[0].message.content)

第四步:使用管理 API

import requests

BASE_URL = "http://litellm-proxy:4000"
headers = {"Authorization": "Bearer sk-team-secret-key"}

# 查看所有可用模型
response = requests.get(f"{BASE_URL}/v1/model/info", headers=headers)
print("可用模型:", response.json())

# 查看使用统计
response = requests.get(f"{BASE_URL}/v1 Spend/all", headers=headers)
print("花费统计:", response.json())

# 查看活跃请求
response = requests.get(f"{BASE_URL}/v1/active_requests", headers=headers)
print("活跃请求:", response.json())

常见使用场景 / Common Use Cases

场景一:内容审核系统

import litellm
from enum import Enum

class ContentType(Enum):
    """内容类型枚举"""
    TEXT = "text"
    IMAGE = "image"
    MULTIMODAL = "multimodal"

class ContentModerator:
    """内容审核器"""

    def __init__(self):
        self.system_prompt = """你是一个内容安全审核专家。请判断用户输入的内容是否包含以下违规类型:

1. 暴力血腥
2. 色情低俗
3. 政治敏感
4. 违法犯罪
5. 虚假信息
6. 仇恨言论

请用JSON格式返回结果,格式如下:
{
    "is_safe": true/false,
    "violation_types": ["违规类型1", "违规类型2"],
    "risk_level": "low/medium/high",
    "reason": "判断理由"
}"""

    def moderate(self, content: str, content_type: ContentType = ContentType.TEXT):
        """审核内容"""
        if content_type == ContentType.TEXT:
            messages = [
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": content}
            ]
        else:
            # 多模态审核
            messages = [
                {"role": "system", "content": self.system_prompt + "\n\n请同时审核图片和文字内容。"},
                {"role": "user", "content": [
                    {"type": "text", "text": f"审核内容: {content}"},
                    {"type": "image_url", "image_url": {"url": content}}
                ]}
            ]

        response = litellm.completion(
            model="gpt-4-turbo",
            messages=messages,
            response_format={"type": "json_object"}
        )

        import json
        result = json.loads(response.choices[0].message.content)
        return result


# 使用示例
moderator = ContentModerator()

test_content = "这是一段正常的技术讨论内容"
result = moderator.moderate(test_content)
print(f"内容安全: {result['is_safe']}")
print(f"风险等级: {result['risk_level']}")

场景二:智能客服系统

import litellm
from typing import Dict, List
import json

class SmartCustomerService:
    """智能客服系统"""

    def __init__(self):
        self.conversation_history

总结
本文详细介绍了该项目的核心功能和使用方法,从环境搭建到实战应用。希望读者能通过本教程快速上手,在实际项目中灵活运用。如果你有任何问题或建议,欢迎在评论区交流讨论。

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注