**还在为LLM应用调试头疼?Opik让你5分钟搞定全链路追踪**

**还在为LLM应用调试头疼?Opik让你5分钟搞定全链路追踪**

还在为LLM应用调试头疼?Opik让你5分钟搞定全链路追踪


为什么LLM应用开发需要专业工具

在大型语言模型应用开发中,你是否遇到过这些令人崩溃的场景?

当你部署了一个看似完美的RAG系统,却在生产环境中发现返回的结果与预期相差甚远。你开始疯狂打印日志,试图定位问题所在,但面对海量的API调用记录和数据流转,却无从下手。这正是大多数开发者在构建LLM应用时面临的真实困境。

传统的调试方法在LLM应用面前显得力不从心。语言模型的黑盒特性、不确定性的输出、以及复杂的提示词链路,让问题排查变成了一场噩梦。更糟糕的是,当用户反馈某个功能表现异常时,你甚至无法复现当时的完整调用上下文。

Opik 正是为解决这些痛点而生的。它是Comet ML团队开源的LLM应用全链路追踪平台,让开发者能够像调试传统应用一样,轻松追踪、监控和优化自己的AI应用。

Opik的核心价值主张

这个开源项目解决了三个关键问题:

追踪可视化 —— 每一次LLM调用都被完整记录,包括输入提示词、模型参数、输出结果、执行时间,以及调用链路中的每一个环节。你可以像观看电影回放一样,重现任意一次请求的完整过程。

评估自动化 —— 内置丰富的评估指标和自定义评估器,让你可以系统性地测试LLM应用的输出质量,无需人工逐条检查。

问题定位精准 —— 当应用表现异常时,你可以快速定位到具体是哪一次调用、哪个环节出了问题,大幅缩短调试时间。


环境搭建:10分钟快速入门

系统要求与前置条件

在开始之前,确保你的开发环境满足以下要求:

  • Python 3.8 或更高版本
  • pip 或 conda 包管理器
  • 基本的Python编程经验

Opik支持本地部署和云端使用两种模式。对于个人开发者和小团队,本地部署是完全免费的;如果你需要团队协作功能,可以考虑使用Comet提供的云端服务。

安装Opik

打开终端,执行以下命令安装Opik的核心包:

pip install opik

如果你需要使用OpenAI的模型,还需要安装对应的集成包:

pip install opik[openai]

对于Anthropic Claude、Google Gemini等其他模型,Opik同样提供了相应的集成支持:

# 支持的集成包括
pip install opik[anthropic]   # Claude
pip install opik[gcp_bedrock] # Google Gemini
pip install opik[langchain]   # LangChain集成
pip install opik[llamaindex]  # LlamaIndex集成

配置Opik

安装完成后,需要进行简单的配置。你可以选择使用本地服务器或连接Comet云端服务。

方式一:使用本地服务器(推荐个人开发)

启动本地Opik服务器,只需一行命令:

opik dev

这个命令会自动下载并运行Docker容器,Opik的UI界面将在本地的5173端口提供服务。

方式二:连接Comet云端

如果你有Comet账号,可以直接配置API密钥:

export OPIK_API_KEY=your_api_key_here
export OPIK_WORKSPACE=your_workspace_name

配置完成后,Opik会自动将追踪数据同步到云端,你可以在任何设备上通过网页访问这些数据。

验证安装

创建一个简单的测试脚本,验证安装是否成功:

import opik

# 初始化客户端
client = opik.Client()

# 创建第一个追踪
with client.trace("test_trace") as trace:
    trace.log("message", "Opik安装成功!")

print("验证完成,请访问 http://localhost:5173 查看追踪结果")

运行这个脚本后,访问本地服务器地址,你应该能看到刚创建的测试追踪记录。


核心功能详解

追踪系统架构

Opik的追踪系统采用了分层架构设计,理解这个架构能帮助你更好地使用工具。

Trace(追踪) 是最顶层的概念,代表一个完整的请求生命周期。比如用户的一次对话请求,从开始到结束的所有处理过程就是一个Trace。

Span(跨度) 是Trace的子单元,代表一个具体的操作步骤。一次Trace可能包含多个Span,比如“解析用户输入”、“检索相关文档”、“构建提示词”、“调用LLM”、“后处理输出”等。

Event(事件) 是最小的记录单元,用于记录具体的数据点,比如某次函数调用的输入参数或返回结果。

这种层级设计让你既能获得全局视角,俯瞰整个请求的处理流程,又能深入到每一个细节,查看具体的数据内容。

自动追踪与手动追踪

Opik支持两种追踪方式,可以根据场景灵活选择。

自动追踪 是最推荐的方式,Opik与主流的LLM框架做了深度集成。只需要添加一个装饰器或初始化语句,就能自动捕获所有相关调用:

import opik

# 初始化即开启全局追踪
opik.init()

# 从这里开始的所有LLM调用都会被自动追踪
# 你的代码无需任何修改

手动追踪 给你更多控制权,适合需要追踪特定逻辑的场景:

from opik import trace, span

# 方式一:使用装饰器
@trace
def my_llm_function(prompt):
    # 函数内的所有操作都会被追踪
    result = call_llm(prompt)
    return result

# 方式二:使用上下文管理器
with trace("custom_trace_name") as my_trace:
    my_trace.log("input", user_input)
    result = process_with_llm(user_input)
    my_trace.log("output", result)

# 方式三:创建子跨度
with span("sub_task") as my_span:
    my_span.log("task_detail", "具体任务描述")
    # 执行任务

评估功能

LLM应用的输出质量往往难以量化,Opik提供了系统化的评估方案。

内置评估指标 覆盖了最常见的评估需求:

  • 精确匹配(Exact Match) —— 检查输出是否与预期完全一致
  • 余弦相似度(Cosine Similarity) —— 衡量语义相似程度
  • 幻觉检测(Hallucination) —— 识别回答中与上下文不符的内容
  • 毒性检测(Toxicity) —— 评估内容是否包含有害信息
  • 答案相关性(Answer Relevancy) —— 评估回答与问题的关联程度

自定义评估器 让你可以定义符合业务需求的评估标准:

from opik.evaluation import Evaluator, EvaluateTask
from opik.evaluation.metrics import ScoreMetric

# 定义自定义评估函数
def business_metric(output, expected):
    """
    自定义业务指标评估
    根据你的具体业务逻辑来判断输出质量
    """
    # 检查输出是否包含必要的业务关键词
    required_keywords = ["产品名称", "价格信息", "联系方式"]
    score = sum(1 for kw in required_keywords if kw in output)
    return score / len(required_keywords)

# 创建评估任务
eval_task = EvaluateTask(
    input="用户的输入问题",
    expected_output="期望的输出内容",
    metadata={"task_type": "product_inquiry"}
)

# 运行评估
evaluator = Evaluator(metrics=[
    ScoreMetric(name="business_score", value_fn=business_metric)
])

results = evaluator.evaluate([eval_task])

实战教程:从零构建可追踪的LLM应用

项目背景

让我们通过一个实战项目来学习Opik的使用方法。我们将构建一个“智能客服助手”,它能够根据产品文档回答用户问题,并具备一定的对话上下文理解能力。

这个项目将涵盖以下功能点:

  • 用户意图识别
  • 文档检索与问答
  • 对话历史管理
  • 回答质量评估

第一步:基础框架搭建

首先创建项目结构:

mkdir llm-customer-service
cd llm-customer-service
mkdir src
touch src/__init__.py
touch src/chatbot.py
touch src/retriever.py
touch src/evaluator.py

核心的聊天机器人类设计如下:

# src/chatbot.py
import opik
from opik import trace, span
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime

# 初始化Opik追踪
opik.init(project_name="customer-service-bot")

@dataclass
class Message:
    """对话消息数据结构"""
    role: str  # "user" 或 "assistant"
    content: str
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class ConversationContext:
    """
    对话上下文管理器
    维护多轮对话的历史记录
    """
    messages: List[Message] = field(default_factory=list)
    max_history: int = 10  # 最多保留10轮对话

    def add_message(self, role: str, content: str):
        """添加新消息到对话历史"""
        self.messages.append(Message(role=role, content=content))

        # 保持历史长度限制
        if len(self.messages) > self.max_history:
            self.messages = self.messages[-self.max_history:]

    def get_conversation_history(self) -> List[Message]:
        """获取完整的对话历史"""
        return self.messages

    def format_for_prompt(self) -> str:
        """格式化对话历史,用于构建提示词"""
        if not self.messages:
            return ""

        history_parts = []
        for msg in self.messages[-5:]:  # 只取最近5轮
            role_label = "用户" if msg.role == "user" else "助手"
            history_parts.append(f"{role_label}{msg.content}")

        return "\n".join(history_parts)


class CustomerServiceBot:
    """
    智能客服机器人主类
    整合意图识别、文档检索和回答生成
    """

    def __init__(self, retriever):
        self.retriever = retriever
        self.conversation_context = ConversationContext()

    @trace
    def chat(self, user_input: str) -> str:
        """
        处理用户输入并生成回复
        这是主要的入口方法,所有追踪都从这里开始
        """
        # 记录用户消息
        self.conversation_context.add_message("user", user_input)

        # 追踪各处理阶段
        intent = self._identify_intent(user_input)
        context = self._retrieve_relevant_docs(user_input)
        response = self._generate_response(user_input, intent, context)

        # 记录助手回复
        self.conversation_context.add_message("assistant", response)

        return response

    @span(name="intent_recognition")
    def _identify_intent(self, user_input: str) -> str:
        """
        识别用户意图
        将用户问题分类到不同的处理类别
        """
        # 简化的意图识别逻辑
        intent_keywords = {
            "price": ["价格", "多少钱", "收费", "费用"],
            "feature": ["功能", "能做什么", "如何使用", "特点"],
            "complaint": ["投诉", "问题", "故障", "不行"],
            "refund": ["退款", "退货", "取消", "退款"]
        }

        detected_intent = "general"  # 默认意图
        for intent, keywords in intent_keywords.items():
            if any(kw in user_input for kw in keywords):
                detected_intent = intent
                break

        return detected_intent

    @span(name="document_retrieval")
    def _retrieve_relevant_docs(self, query: str) -> List[str]:
        """
        从文档库检索相关内容
        """
        # 调用检索器获取相关文档
        relevant_docs = self.retriever.search(query, top_k=3)
        return relevant_docs

    @span(name="response_generation")
    def _generate_response(
        self, 
        user_input: str, 
        intent: str,
        context_docs: List[str]
    ) -> str:
        """
        基于意图和上下文生成最终回复
        """
        # 构建提示词
        conversation_history = self.conversation_context.format_for_prompt()

        prompt = f"""
你是专业的客服助手,请根据以下信息回答用户问题。

对话历史:
{conversation_history}

相关文档:
{chr(10).join(context_docs)}

当前问题:{user_input}

请用友好、专业的语气回答,如果文档中没有相关信息,请如实告知用户。
"""

        # 这里简化处理,实际项目中应该调用真实的LLM
        response = f"[模拟回复] 根据您的{intent}相关问题,我为您找到以下信息:\n"
        response += "\n".join([f"- {doc[:50]}..." for doc in context_docs[:2]])

        return response

第二步:集成检索系统

文档检索是RAG系统的核心组件,让我们创建一个支持追踪的检索器:

# src/retriever.py
from opik import span
from typing import List
import numpy as np

class SimpleRetriever:
    """
    简化版文档检索器
    实际项目中应该接入向量数据库(如Chroma、Pinecone等)
    """

    def __init__(self):
        # 模拟文档库
        self.documents = [
            "产品价格说明:基础版免费,高级版每月99元,企业版每月299元。",
            "产品功能介绍:支持文本分析、情感识别、关键词提取等AI能力。",
            "退款政策:购买后7天内可申请全额退款,30天内可申请部分退款。",
            "客服工作时间:工作日9:00-18:00,节假日休息。",
            "技术支持:可通过邮件 support@example.com 获取帮助。",
            "数据安全:我们采用256位加密标准,确保用户数据安全。",
            "产品更新日志:v2.0版本新增了多语言支持和自定义训练功能。",
            "常见问题FAQ:包含账户注册、密码重置、订阅管理等常见问题解答。"
        ]

        # 简单的关键词索引
        self._build_index()

    def _build_index(self):
        """
        构建简单的关键词索引
        实际项目中应该使用向量嵌入进行语义检索
        """
        self.keyword_index = {}
        for idx, doc in enumerate(self.documents):
            words = set(doc)  # 简化处理,实际应该分词
            for word in words:
                if word not in self.keyword_index:
                    self.keyword_index[word] = []
                self.keyword_index[word].append(idx)

    @span(name="search_documents")
    def search(self, query: str, top_k: int = 3) -> List[str]:
        """
        检索与查询最相关的文档
        """
        # 计算每个文档与查询的匹配度
        query_words = set(query)
        scores = []

        for idx, doc in enumerate(self.documents):
            doc_words = set(doc)
            # 简化的相似度计算
            overlap = len(query_words & doc_words)
            scores.append((idx, overlap))

        # 按分数排序,取最高的top_k个
        scores.sort(key=lambda x: x[1], reverse=True)
        top_indices = [idx for idx, score in scores[:top_k] if score > 0]

        return [self.documents[idx] for idx in top_indices]

第三步:构建评估系统

现在让我们添加评估功能,用于测试和监控机器人的表现:

# src/evaluator.py
from opik.evaluation import Evaluator, EvaluateTask
from opik.evaluation.metrics import (
    Faithfulness,
    AnswerRelevancy,
    ContextPrecision
)
from typing import List, Dict, Any

class BotEvaluator:
    """
    机器人性能评估器
    使用Opik的内置指标评估回答质量
    """

    def __init__(self, bot):
        self.bot = bot

    def evaluate_single(
        self, 
        test_input: str, 
        expected_aspects: List[str]
    ) -> Dict[str, Any]:
        """
        评估单个测试用例

        Args:
            test_input: 测试输入
            expected_aspects: 期望回答应该包含的关键方面
        """
        # 生成回答
        response = self.bot.chat(test_input)

        # 构建评估任务
        eval_task = EvaluateTask(
            input=test_input,
            output=response,
            expected_output="\n".join(expected_aspects),
            metadata={"expected_aspects": expected_aspects}
        )

        # 定义评估指标
        evaluator = Evaluator(metrics=[
            Faithfulness(name="faithfulness"),
            AnswerRelevancy(name="relevancy"),
        ])

        # 执行评估
        results = evaluator.evaluate([eval_task])

        return {
            "input": test_input,
            "response": response,
            "metrics": results
        }

    def run_test_suite(self, test_cases: List[Dict]) -> Dict[str, Any]:
        """
        运行完整的测试套件

        Args:
            test_cases: 测试用例列表,每个用例包含:
                - input: 用户输入
                - expected_aspects: 期望包含的方面
        """
        all_results = []

        for case in test_cases:
            result = self.evaluate_single(
                case["input"], 
                case["expected_aspects"]
            )
            all_results.append(result)

        # 计算整体统计
        avg_faithfulness = np.mean([
            r["metrics"]["faithfulness"] 
            for r in all_results
        ])
        avg_relevancy = np.mean([
            r["metrics"]["relevancy"] 
            for r in all_results
        ])

        return {
            "total_tests": len(test_cases),
            "results": all_results,
            "summary": {
                "avg_faithfulness": avg_faithfulness,
                "avg_relevancy": avg_relevancy
            }
        }

第四步:整合运行与追踪

创建主程序,整合所有组件并展示追踪效果:

# main.py
from src.chatbot import CustomerServiceBot
from src.retriever import SimpleRetriever
from src.evaluator import BotEvaluator
import opik

def main():
    """
    主程序入口
    演示完整的客服机器人使用流程
    """
    print("=" * 60)
    print("智能客服机器人演示")
    print("=" * 60)

    # 初始化组件
    print("\n[1] 初始化组件...")
    retriever = SimpleRetriever()
    bot = CustomerServiceBot(retriever)
    evaluator = BotEvaluator(bot)

    # 测试对话
    print("\n[2] 开始测试对话...")

    test_queries = [
        "你们的收费是多少?",
        "产品有什么功能?",
        "如果我不满意可以退款吗?",
        "怎么联系客服?"
    ]

    for query in test_queries:
        print(f"\n用户: {query}")
        response = bot.chat(query)
        print(f"助手: {response}")
        print("-" * 40)

    # 运行评估测试
    print("\n[3] 运行性能评估...")

    test_cases = [
        {
            "input": "你们的高级版多少钱?",
            "expected_aspects": ["价格", "高级版", "99"]
        },
        {
            "input": "产品支持哪些功能?",
            "expected_aspects": ["功能", "文本分析", "AI"]
        },
        {
            "input": "退款政策是怎样的?",
            "expected_aspects": ["退款", "7天", "全额"]
        }
    ]

    eval_results = evaluator.run_test_suite(test_cases)

    print(f"\n评估完成,共测试 {eval_results['total_tests']} 个用例")
    print(f"平均忠诚度得分: {eval_results['summary']['avg_faithfulness']:.2f}")
    print(f"平均相关性得分: {eval_results['summary']['avg_relevancy']:.2f}")

    print("\n" + "=" * 60)
    print("演示结束!请访问 http://localhost:5173 查看追踪详情")
    print("=" * 60)

if __name__ == "__main__":
    main()

运行程序后,你可以在Opik的Web界面中看到完整的追踪记录,包括每个请求的处理链路、各个阶段的耗时、以及评估结果。


高级用法与集成

与LangChain深度集成

如果你在使用LangChain构建LLM应用,Opik提供了开箱即用的集成支持:

from opik.integrations.langchain import OpikTracer
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate

# 初始化Opik追踪器
opik_tracer = OpikTracer()

# 创建LangChain组件,添加追踪器
llm = ChatOpenAI(model="gpt-4", callbacks=[opik_tracer])

# 你的Agent和工具定义
tools = [...]  # 你的工具列表
agent = create_react_agent(llm, tools, prompt=...)

# 创建执行器,自动追踪所有Agent行为
agent_executor = AgentExecutor(
    agent=agent, 
    tools=tools,
    callbacks=[opik_tracer]
)

# 执行查询,所有LangChain操作都会被追踪
result = agent_executor.invoke({"input": "帮我分析这篇文章的主要内容"})

自定义追踪装饰器

对于更细粒度的追踪需求,可以创建自定义装饰器:

import opik
from functools import wraps
import time

def tracked_llm_call(operation_name: str):
    """
    自定义LLM调用追踪装饰器

    追踪内容:
    - 函数执行时间
    - 输入输出参数
    - 异常信息
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            client = opik.Client()

            with client.span(
                name=operation_name,
                type="llm_call"
            ) as span:
                start_time = time.time()

                try:
                    # 记录输入
                    span.log("input_args", str(args[:3]))  # 只记录前3个参数
                    span.log("input_kwargs", str(kwargs))

                    # 执行函数
                    result = func(*args, **kwargs)

                    # 记录输出
                    span.log("output", str(result)[:500])  # 限制长度
                    span.log("success", True)

                    return result

                except Exception as e:
                    # 记录异常
                    span.log("error", str(e))
                    span.log("success", False)
                    raise

                finally:
                    # 记录执行时间
                    elapsed = time.time() - start_time
                    span.log("execution_time_ms", elapsed * 1000)

        return wrapper
    return decorator

# 使用装饰器
@tracked_llm_call("text_generation")
def generate_text(prompt, max_tokens=100, temperature=0.7):
    """
    封装的文本生成函数
    所有调用都会被自动追踪
    """
    # 实际的LLM调用逻辑
    return "生成的文本内容"

分布式追踪

在微服务架构中,Opik支持跨服务的分布式追踪:

# 服务A(API网关)
import opik

def call_downstream_service(user_request):
    """调用下游服务"""
    # 创建分布式追踪上下文
    trace_context = opik.get_current_trace_context()

    # 将追踪ID传递给下游服务
    headers = {
        "X-Opik-Trace-Id": trace_context.trace_id,
        "X-Opik-Span-Id": trace_context.span_id
    }

    response = requests.post(
        "http://downstream-service/api",
        json=user_request,
        headers=headers
    )

    return response.json()
# 服务B(下游处理服务)
import opik

def receive_request(request_data, trace_id, span_id):
    """接收上游追踪上下文"""
    # 从请求头恢复追踪上下文
    opik.continue_trace(trace_id=trace_id, parent_span_id=span_id)

    with opik.trace("downstream_processing") as trace:
        # 继续在同一追踪中添加新的跨度
        result = process_data(request_data)
        trace.log("processed_items", len(result))

    return result

常见应用场景

场景一:RAG系统开发与优化

Retrieval-Augmented Generation(检索增强生成)是当前最流行的LLM应用架构之一。Opik能帮助你:

  • 监控检索质量:追踪每次检索返回的文档及其相关性得分
  • 分析生成效果:评估基于不同检索结果生成的答案质量
  • 定位问题案例:快速找到检索或生成出现问题的具体案例

典型用法示例:

@trace
def rag_pipeline(query: str):
    """
    RAG管道的完整追踪
    """
    # 1. 查询理解
    with span("query_understanding") as span:
        expanded_queries = query_expander.expand(query)
        span.log("expanded_queries", expanded_queries)

    # 2. 向量检索
    with span("vector_search") as span:
        retrieved_docs = vector_db.search(
            expanded_queries,
            top_k=10
        )
        span.log("retrieved_count", len(retrieved_docs))
        span.log("doc_scores", [d.score for d in retrieved_docs])

    # 3. 重排序
    with span("reranking") as span:
        reranked_docs = reranker.rerank(query, retrieved_docs, top_k=5)
        span.log("final_doc_count", len(reranked_docs))

    # 4. 生成答案
    with span("answer_generation") as span:
        context = format_documents(reranked_docs)
        answer = llm.generate(f"Context: {context}\n\nQuery: {query}")
        span.log("answer_length", len(answer))

    return answer

场景二:Agent系统调试

自主Agent系统通常涉及多轮工具调用和复杂的决策流程,Opik能清晰呈现整个执行过程:

@trace
class TrackedAgent:
    """带追踪的Agent基类"""

    def __init__(self):
        self.action_history = []

    def execute_task(self, task_description: str):
        """执行任务并追踪每一步"""
        current_state = {"task": task_description, "steps": []}

        while not self.is_complete(current_state):
            # 选择下一步行动
            with span("action_selection") as span:
                action = self.select_next_action(current_state)
                span.log("selected_action", action.name)
                span.log("action_reasoning", action.reasoning)

            # 执行行动
            with span("action_execution") as span:
                result = action.execute()
                span.log("action_result", str(result)[:200])

            # 更新状态
            current_state["steps"].append({
                "action": action.name,
                "result": result
            })

            # 安全检查,防止无限循环
            if len(current_state["steps"]) > 20:
                raise RuntimeError("Agent执行超过最大步数限制")

        return current_state["steps"]

场景三:提示词工程实验

在开发LLM应用时,往往需要尝试大量不同的提示词。Opik帮你系统化管理这些实验:

@trace
def compare_prompts(test_input: str, prompt_variants: List[str]):
    """
    对比不同提示词变体的效果
    """
    results = {}

    for idx, prompt_template in enumerate(prompt_variants):
        with span(f"prompt_variant_{idx}") as span:
            # 填充提示词
            prompt = prompt_template.format(user_input=test_input)
            span.log("filled_prompt", prompt)

            # 生成回答
            start = time.time()
            response = llm.generate(prompt)
            elapsed = time.time() - start

            span.log("response", response)
            span.log("generation_time", elapsed)

            # 评估回答
            evaluation = evaluate_response(response, test_input)
            span.log("evaluation", evaluation)

            results[f"variant_{idx}"] = {
                "prompt": prompt_template,
                "response": response,
                "evaluation": evaluation,
                "latency": elapsed
            }

    return results

最佳实践与性能优化

追踪数据管理

在大规模应用中,追踪数据可能非常庞大,需要合理管理:

采样策略 —— 不需要追踪每一次请求:

# 配置采样,只追踪10%的请求
opik.init(
    project_name="production-app",
    sampling_rate=0.1  # 10%的采样率
)

# 或者基于条件采样
@trace(sampling_fn=lambda ctx: ctx.user_id in premium_users)
def handle_request(request):
    # 只追踪高级用户的请求
    pass

数据保留策略 —— 配置合理的保留期限:

# 在Opik配置文件中设置
# opik.yaml
retention:
  keep_all: false
  keep_last_days: 30
  archive_old: true
  archive_backend: s3  # 或本地文件系统

性能影响最小化

追踪本身不应该显著影响应用性能:

异步日志记录 —— 使用异步方式记录追踪数据:

# Opik默认使用异步记录
# 但可以通过配置调整
opik.init(
    project_name="performance-critical",
    async_logging=True,  # 默认开启
    batch_size=100,      # 批量记录
    flush_interval_ms=1000  # 每秒刷新一次
)

选择性追踪 —— 只追踪关键路径:

# 细粒度控制追踪范围
with trace("outer_operation") as outer:
    # 这个会被追踪
    for item in items:
        # 这些操作可以选择性追踪
        if should_trace(item):
            with span(f"process_{item.id}") as span:
                process_item(item, span)
        else:
            # 不追踪的快速路径
            process_item_fast(item)

代码组织建议

保持追踪代码与业务逻辑分离:

# 推荐的目录结构
llm_application/
├── src/
   ├── __init__.py
   ├── core/
      ├── __init__.py
      ├── llm_client.py      # LLM调用封装
      ├── prompts.py         # 提示词模板
      └── evaluation.py      # 评估逻辑
   ├── tracing/
      ├── __init__.py
      ├── decorators.py      # 追踪装饰器
      ├── config.py          # 追踪配置
      └── exporters.py       # 数据导出
   └── services/
       ├── __init__.py
       └── chatbot.py         # 业务逻辑
├── tests/
├── configs/
   └── opik.yaml
└── main.py

与团队协作

团队共享与协作

Opik支持团队成员共享追踪数据和分析结果:

添加团队成员注释 —— 在追踪记录中添加分析注释:

with span("investigation") as span:
    # 添加分析注释
    span.add_annotation(
        author="zhang_san",
        content="这个案例的问题出在第三步检索,关键词提取有误",
        tags=["bug", "priority-high"]
    )

创建实验对比 —— 方便团队成员比较不同实验的结果:

# 标记实验
with opik.Experiment(
    name="prompt-v2-comparison",
    description="对比新旧提示词的效果"
) as experiment:
    for test_case in test_suite:
        result = run_experiment(test_case)
        experiment.log(result)

与CI/CD集成

将LLM应用测试集成到持续集成流程中:

# .github/workflows/llm-tests.yml
name: LLM Application Tests

on:
  push:
    branches: [main, develop]
  pull_request:

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          pip install opik pytest pytest-asyncio

      - name: Run evaluation tests
        env:
          OPIK_API_KEY: ${{ secrets.OPIK_API_KEY }}
        run: |
          pytest tests/evaluation/ -v --opik-report

      - name: Check performance regression
        run: |
          python scripts/check_metrics.py
        # 如果指标下降超过阈值,会失败CI构建

总结与资源链接

通过本教程,你应该已经掌握了以下核心技能:

环境搭建 —— 能够快速安装和配置Opik,无论是本地开发还是连接云端服务。

追踪系统 —— 学会了使用自动追踪和手动追踪,记录LLM应用的完整执行链路。

评估方法 —— 掌握了使用内置指标和自定义评估器,系统性评估LLM应用的输出质量。

高级集成 —— 了解了与LangChain等框架的集成方式,以及分布式追踪的实现方法。

最佳实践 —— 学会了管理大规模追踪数据,优化性能,以及与团队协作。

Opik作为Comet ML团队的开源项目,持续活跃地维护和更新。如果你遇到问题或有新功能需求,可以在GitHub仓库中提交Issue或Pull Request。

延伸学习资源

项目官方资源帮助你进一步深入:

  • 官方文档 —— https://www.comet.com/docs/opik
  • GitHub仓库 —— https://github.com/comet-ml/opik
  • 示例项目 —— https://github.com/comet-ml/opik/tree/main/examples
  • Discord社区 —— 加入与其他开发者的讨论

相关开源项目推荐

与Opik互补的优秀开源项目,可以组合使用:

  • LangSmith —— LangChain官方推出的LLM追踪平台
  • Phoenix —— Arize出品的LLM可观测性工具
  • Weave —— Weights & Biases的LLM追踪方案
  • PromptLayer —— 专注于提示词管理的平台
  • Auto评估 —— 开源的LLM评估框架

每个项目都有其独特的定位和优势,建议根据具体需求选择合适的工具组合。在LLM应用开发日益复杂的今天,拥有完善的追踪和评估能力,将成为保证应用质量的关键竞争力。

现在就开始在你的项目中集成Opik吧!十分钟后,你就能拥有一个完整的LLM应用可视化追踪系统,让调试和问题定位变得前所未有的简单。祝你开发愉快!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注