别再为实时AI应用掉头发了,TEN框架让开发效率提升10倍的实战指南
前言
当我第一次看到TEN框架的代码仓库时,我的内心其实是拒绝的——又一个声称能改变游戏规则的AI框架?但当我真正深入了解并动手实践之后,我不得不承认:这个来自TEN团队的实时AI应用开发框架,确实解决了我在过去一年里踩过的无数坑。
作为一个常年在一线写代码的开发者,我深知实时AI应用开发的痛点:延迟太高、扩展性差、调试困难、多模态集成复杂……这些问题几乎每个AI项目都会遇到。而TEN框架,正是为了解决这些问题而生的。
今天,我将用这篇实战指南,带你从零开始掌握TEN框架。无论你是想快速搭建一个AI聊天机器人,还是想构建复杂的多模态实时交互系统,这篇文章都能给你提供可操作的方案。
为什么值得深入学习TEN框架
传统开发方式的核心痛点
在深入TEN框架之前,让我们先聊聊传统实时AI应用开发的几大难题。
第一个问题是延迟与实时性的矛盾。很多AI应用需要实时响应用户的输入,但传统的请求-响应模式往往伴随着秒级甚至更长的延迟。想象一下,用户说了一句话,要等上3-5秒才能得到回复,这种体验简直让人崩溃。
第二个问题是架构扩展的噩梦。当你需要支持更多用户、更多功能时,往往需要对整个系统进行重构。微服务架构虽然能解决部分问题,但引入的复杂性也不容小觑。
第三个问题是多模态集成的地狱。现代AI应用往往需要整合语音、图像、文本等多种模态,但每种模态都有不同的API、不同的协议、不同的处理逻辑,整合起来费时费力。
第四个问题是状态管理的混乱。AI对话需要维护上下文状态,传统的无状态HTTP请求根本满足不了需求,而自己实现状态管理又容易出bug。
TEN框架的核心价值主张
TEN框架的设计理念非常清晰:让实时AI应用的开发变得像搭积木一样简单。
TEN的全称是Transport Extension Network,顾名思义,它是一个专注于数据传输与扩展的网络框架。但它又不仅仅是一个传输框架——它提供了完整的实时AI应用开发解决方案,包括但不限于:
- 统一的扩展系统:无论是LLM、语音识别、图像处理还是其他AI能力,都可以通过扩展的形式无缝集成
- 内置的状态管理:对话上下文、用户会话等状态信息由框架自动管理,开发者只需关注业务逻辑
- 高效的实时通信:基于WebRTC和其他现代通信协议,确保低延迟的用户体验
- 灵活的扩展机制:你可以使用C++编写高性能扩展,也可以使用Python进行快速原型开发
更重要的是,TEN是开源项目,这意味着你可以完全掌控自己的技术栈,不被任何封闭平台绑架。
社区与生态现状
TEN框架虽然相对年轻,但发展势头非常迅猛。项目在GitHub上已经获得了可观的关注,文档和示例也在不断完善。更重要的是,TEN团队提供了活跃的技术支持,社区中也有不少开发者分享自己的实践经验。
快速上手:环境搭建全流程
系统要求与依赖
在开始之前,让我们先确认你的开发环境是否满足要求。
TEN框架的主要组件使用C++编写,因此需要一台能够编译C++代码的机器。不过,得益于现代的构建系统和Docker支持,跨平台开发也变得非常简单。
基本要求:
- 操作系统:Linux(推荐Ubuntu 20.04或更高版本)、macOS、Windows(通过WSL2)
- 内存:至少8GB(推荐16GB或更多,用于运行AI模型)
- 磁盘空间:至少20GB可用空间
- C++编译器:GCC 9+ 或 Clang 10+
- Python 3.8+(用于编写扩展)
开发工具:
- Git
- CMake 3.16+
- Docker(可选,但强烈推荐)
- Visual Studio Code(或其他编辑器,配合C++扩展使用效果更佳)
安装步骤详解
方式一:使用Docker(推荐新手)
Docker方式是最简单快捷的安装方式,特别适合刚入门的朋友。
第一步,拉取TEN框架的官方Docker镜像:
docker pull tenAILab/ten_framework:latest
这个镜像包含了编译好的TEN框架核心库以及常用的开发工具。开箱即用,省去了手动配置的麻烦。
第二步,创建一个容器实例:
docker run -it --name ten-dev \
-v /path/to/your/project:/workspace \
-p 8080:8080 \
tenAILab/ten_framework:latest
这里我们将宿主机的项目目录挂载到容器内,方便在宿主机编辑代码,在容器内编译运行。端口8080用于暴露TEN框架的服务端口。
第三步,验证安装是否成功:
# 在容器内执行
ten-version
如果输出了TEN框架的版本号,说明安装成功。
方式二:源码编译安装
如果你需要定制化开发,或者想深入理解TEN框架的内部机制,源码编译是更好的选择。
首先,克隆TEN框架的代码仓库:
git clone https://github.com/TEN-framework/ten-framework.git
cd ten-framework
然后,创建构建目录并配置CMake:
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
CMake配置完成后,你可以看到详细的构建选项:
-- The C compiler identification is GNU 11.4.0
-- The CXX compiler identification is GNU 11.4.0
-- Check for working C/C++ compiler: /usr/bin/cc
-- Check for working C/C++ compiler: /usr/bin/cxx
-- Ten Framework version: 0.8.0
-- Found Git: /usr/bin/git
-- Found Python3: /usr/bin/python3
配置完成后,开始编译:
make -j$(nproc)
编译过程可能需要几分钟,取决于你的机器性能。编译完成后,安装到系统:
sudo make install
最后,验证安装:
ten-version
开发环境配置
安装完成后,我们需要配置一个趁手的开发环境。
IDE配置
推荐使用Visual Studio Code,配合以下扩展:
- C/C++(微软官方)
- CMake Tools
- Python
- GitLens
创建工作区配置文件.vscode/settings.json:
{
"cmake.configureOnOpen": true,
"cmake.buildDirectory": "${workspaceFolder}/build",
"C_Cpp.default.configurationProvider": "ms-vscode.cmake-tools",
"files.associations": {
"*.ten": "cpp"
}
}
Python环境配置
TEN框架支持使用Python编写扩展,我们需要配置Python开发环境:
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装TEN Python SDK
pip install ten-sdk
# 验证安装
python -c "import ten; print(ten.__version__)"
核心概念与架构
TEN框架的整体架构
在动手写代码之前,我们需要先理解TEN框架的整体架构。这就像建房子要先看图纸一样,磨刀不误砍柴工。
TEN框架采用了扩展驱动的架构模式。核心系统非常精简,提供了基础的通信、调度和状态管理能力,而具体的功能则通过扩展来实现。
核心组件
TEN运行时(TEN Runtime)是整个框架的心脏,负责管理扩展的生命周期、处理消息路由、维护状态等核心功能。运行时采用事件驱动模型,确保高效的资源利用。
扩展(Extension)是功能实现的基本单元。每个扩展都是一个独立的模块,可以访问TEN运行时提供的API,与其他扩展进行通信。每个扩展都有以下特性:
- 独立的生命周期(初始化、运行、销毁)
- 输入输出端口(用于接收和发送消息)
- 状态存储空间
- 配置参数
图(Graph)是扩展之间的连接关系。你可以把图理解为一张施工蓝图,定义了有哪些扩展,以及它们之间如何连接、如何通信。
端口(Port)是扩展之间通信的接口。每个扩展可以定义多个输入端口和输出端口,端口之间可以建立连接,形成数据流。
数据流模型
TEN框架的数据流模型非常直观:消息从源扩展的输出端口流出,经过图中的连接,到达目标扩展的输入端口。
这种模型有几个关键优势:
解耦:扩展之间不需要知道彼此的实现细节,只需要通过端口进行交互。这大大简化了代码结构,也方便了模块的复用。
可组合:你可以轻松地创建新的扩展组合来实现新功能,而不需要修改已有的扩展代码。
可视化:由于数据流是显式定义的,整个应用的结构一目了然。这对于调试和维护都非常有帮助。
扩展的分类
TEN框架的扩展可以分为几大类:
AI模型扩展是最常用的类型,包括LLM扩展(用于接入大语言模型)、ASR扩展(语音识别)、TTS扩展(语音合成)、图像生成扩展等。这类扩展通常需要调用外部API或本地模型服务。
通信扩展负责与外部世界的数据交换。比如WebRTC扩展处理浏览器端的实时通信,WebSocket扩展提供HTTP协议的实时双向通信,gRPC扩展则适用于微服务架构。
处理扩展对数据进行转换和处理。比如格式转换扩展负责把音频从PCM转成Opus,过滤扩展根据规则丢弃某些消息,聚合扩展则把多个消息合并成一个。
工具扩展提供各种辅助功能。比如日志扩展记录运行信息,监控扩展收集性能指标,缓存扩展加速重复请求的处理。
状态管理机制
实时AI应用往往需要维护对话上下文、用户状态等信息。TEN框架提供了内置的状态管理机制,让这变得简单。
每个扩展都可以访问一个状态存储空间,这个空间是持久化的,不会因为消息处理的结束而丢失。你可以把状态空间理解为一个键值数据库,支持基本的读写操作:
# 读取状态
context = state.get("conversation_context", default="")
# 更新状态
context += f"\nUser: {user_input}"
state.set("conversation_context", context)
状态管理还支持事务操作,确保在并发场景下的数据一致性:
# 原子性更新
state.update_atomic("request_count", lambda x: x + 1)
实战教程:从零构建一个AI对话助手
现在你已经掌握了基础概念,是时候动手实践了。我们将一步步构建一个完整的AI对话助手,功能包括:接收用户文本输入、调用LLM生成回复、支持多轮对话上下文。
项目结构
首先,创建项目目录结构:
mkdir -p my-ai-assistant/{src,extensions,graphs,config}
cd my-ai-assistant
目录结构说明:
src/:存放TEN框架的C++源码(如果需要修改或扩展核心)extensions/:存放自定义扩展的代码graphs/:存放图配置文件,定义扩展之间的连接关系config/:存放各种配置参数
对于这个简单项目,我们主要使用Python编写扩展,所以src/目录可以暂时为空。
编写第一个扩展:Echo扩展
让我们从最简单的扩展开始——一个回声扩展,收到什么就返回什么。这是理解扩展开发模式的最佳起点。
创建文件extensions/echo_extension.py:
"""
Echo扩展:最简单的TEN扩展示例
收到输入后,原样返回
"""
import ten
from ten import TenEnv
class EchoExtension(TenExtension):
"""
Echo扩展类
继承自TenExtension基类
"""
def __init__(self, config):
"""
构造函数
参数:
config: 扩展配置字典
"""
super().__init__(config)
self.echo_prefix = config.get("echo_prefix", "")
def on_start(self, ten_env: TenEnv) -> None:
"""
扩展启动时调用
这里可以做一些初始化工作
"""
ten_env.log_info("Echo extension started")
ten_env.on_start_done()
def on_stop(self, ten_env: TenEnv) -> None:
"""
扩展停止时调用
这里可以清理资源
"""
ten_env.log_info("Echo extension stopped")
ten_env.on_stop_done()
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
"""
处理接收到的命令
参数:
ten_env: TEN运行时环境
cmd_data: 命令数据字典
"""
# 获取输入文本
text = cmd_data.get("text", "")
# 添加前缀并构造回复
response_text = f"{self.echo_prefix}{text}"
# 发送回复
response_cmd = {
"text": response_text
}
ten_env.send_cmd(response_cmd)
def on_audio_frame(self, ten_env: TenEnv, frame_data: bytes) -> None:
"""
处理接收到的音频帧
参数:
ten_env: TEN运行时环境
frame_data: 音频帧原始数据
"""
# 对于echo扩展,直接原样返回音频数据
ten_env.send_audio_frame(frame_data)
# 导出扩展类
# 这是TEN框架识别扩展的入口点
ten.add_extension("echo", EchoExtension)
这个扩展虽然简单,但展示了扩展开发的核心模式:
- 继承
TenExtension基类 - 实现生命周期方法(
on_start、on_stop) - 实现消息处理方法(
on_cmd、on_audio_frame) - 调用
ten_env的API发送消息或日志
编写LLM扩展
现在我们把Echo扩展升级成真正的AI对话扩展。这需要接入LLM API来完成智能回复。
创建文件extensions/llm_extension.py:
"""
LLM扩展:接入大语言模型
使用OpenAI兼容API,支持自定义模型配置
"""
import json
import aiohttp
from typing import Optional, List, Dict
import ten
from ten import TenEnv
class Message:
"""对话消息结构"""
def __init__(self, role: str, content: str):
self.role = role
self.content = content
def to_dict(self) -> dict:
return {"role": self.role, "content": self.content}
class LLMExtension(TenExtension):
"""
LLM扩展类
负责与大语言模型API交互
"""
def __init__(self, config: dict):
super().__init__(config)
# API配置
self.api_base = config.get("api_base", "https://api.openai.com/v1")
self.api_key = config.get("api_key", "")
self.model = config.get("model", "gpt-3.5-turbo")
self.max_tokens = config.get("max_tokens", 1000)
self.temperature = config.get("temperature", 0.7)
# 对话历史
self.conversation_history: List[Message] = []
self.max_history = config.get("max_history", 10)
# 系统提示词
system_prompt = config.get("system_prompt",
"You are a helpful AI assistant.")
self.conversation_history.append(Message("system", system_prompt))
# HTTP会话
self.session: Optional[aiohttp.ClientSession] = None
def on_start(self, ten_env: TenEnv) -> None:
"""启动扩展"""
ten_env.log_info(f"LLM extension starting with model: {self.model}")
# 初始化HTTP会话
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
self.session = aiohttp.ClientSession(headers=headers)
ten_env.on_start_done()
def on_stop(self, ten_env: TenEnv) -> None:
"""停止扩展"""
if self.session:
import asyncio
asyncio.create_task(self.session.close())
ten_env.log_info("LLM extension stopped")
ten_env.on_stop_done()
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
"""处理用户输入"""
user_input = cmd_data.get("text", "")
if not user_input:
ten_env.log_warning("Received empty input")
return
ten_env.log_info(f"Processing input: {user_input[:50]}...")
# 添加用户消息到历史
self.conversation_history.append(Message("user", user_input))
# 调用LLM
import asyncio
async def call_llm():
try:
response = await self._call_api(ten_env)
return response
except Exception as e:
ten_env.log_error(f"LLM API error: {e}")
return "抱歉,我现在无法回答这个问题。"
# 在事件循环中执行
loop = asyncio.get_event_loop()
response_text = loop.run_until_complete(call_llm())
# 添加助手回复到历史
self.conversation_history.append(Message("assistant", response_text))
# 维护历史长度
while len(self.conversation_history) > self.max_history + 1:
# 保留系统消息,删除最早的用户-助手对
self.conversation_history.pop(1)
# 发送回复
response_cmd = {"text": response_text}
ten_env.send_cmd(response_cmd)
async def _call_api(self, ten_env: TenEnv) -> str:
"""
调用LLM API
参数:
ten_env: TEN运行时环境
返回:
模型生成的回复文本
"""
if not self.session:
raise RuntimeError("HTTP session not initialized")
# 构造请求
messages = [msg.to_dict() for msg in self.conversation_history]
payload = {
"model": self.model,
"messages": messages,
"max_tokens": self.max_tokens,
"temperature": self.temperature
}
# 发送请求
url = f"{self.api_base}/chat/completions"
async with self.session.post(url, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
raise RuntimeError(f"API returned {resp.status}: {error_text}")
result = await resp.json()
choices = result.get("choices", [])
if not choices:
return "抱歉,我没有得到有效的回复。"
return choices[0]["message"]["content"]
def on_reset(self, ten_env: TenEnv, cmd_data: dict) -> None:
"""
重置对话历史
当收到reset命令时调用
"""
# 保留系统消息,清空对话历史
system_msg = self.conversation_history[0]
self.conversation_history = [system_msg]
ten_env.log_info("Conversation history reset")
# 发送确认
ten_env.send_cmd({"status": "reset_done"})
# 导出扩展
ten.add_extension("llm", LLMExtension)
这个LLM扩展实现了以下功能:
- 维护对话历史,支持多轮对话
- 调用OpenAI兼容API进行文本生成
- 自动管理历史长度,防止内存溢出
- 支持自定义系统提示词
编写输入输出扩展
除了核心处理逻辑,我们还需要输入输出扩展来处理与外部系统的交互。
WebSocket输入扩展
创建extensions/ws_input_extension.py:
"""
WebSocket输入扩展
接收来自WebSocket连接的文本输入
"""
import asyncio
import websockets
from typing import Optional
import ten
from ten import TenEnv
class WSInputExtension(TenExtension):
"""
WebSocket输入扩展
监听WebSocket连接,将收到的消息转发给下游扩展
"""
def __init__(self, config: dict):
super().__init__(config)
# 监听配置
self.host = config.get("host", "0.0.0.0")
self.port = config.get("port", 8080)
# 连接集合
self.connections: set = set()
# 服务器实例
self.server: Optional[websockets.WebSocketServer] = None
def on_start(self, ten_env: TenEnv) -> None:
"""启动WebSocket服务器"""
ten_env.log_info(f"Starting WebSocket server on {self.host}:{self.port}")
# 创建并启动服务器
loop = asyncio.get_event_loop()
loop.create_task(self._start_server())
ten_env.on_start_done()
async def _start_server(self) -> None:
"""异步启动WebSocket服务器"""
self.server = await websockets.serve(
self._handle_connection,
self.host,
self.port
)
ten_env = self.get_ten_env()
ten_env.log_info("WebSocket server started")
async def _handle_connection(self, websocket, path: str) -> None:
"""
处理新的WebSocket连接
参数:
websocket: WebSocket连接对象
path: 请求路径
"""
self.connections.add(websocket)
ten_env = self.get_ten_env()
ten_env.log_info(f"New connection established: {path}")
try:
async for message in websocket:
# 解析消息
try:
data = json.loads(message)
except json.JSONDecodeError:
# 如果不是JSON,当作文本处理
data = {"text": message}
# 发送到下游扩展
self.send_to_graph(data)
except websockets.exceptions.ConnectionClosed:
ten_env.log_info("Connection closed")
finally:
self.connections.remove(websocket)
def send_to_graph(self, data: dict) -> None:
"""发送数据到图中其他扩展"""
ten_env = self.get_ten_env()
ten_env.send_cmd(data, dest="llm")
def on_stop(self, ten_env: TenEnv) -> None:
"""停止服务器"""
if self.server:
self.server.close()
asyncio.run(self.server.wait_closed())
ten_env.log_info("WebSocket server stopped")
ten_env.on_stop_done()
# 导出扩展
ten.add_extension("ws_input", WSInputExtension)
WebSocket输出扩展
创建extensions/ws_output_extension.py:
"""
WebSocket输出扩展
接收处理结果,通过WebSocket发送给客户端
"""
import json
import asyncio
import websockets
from typing import Set
import ten
from ten import TenEnv
class WSOutputExtension(TenExtension):
"""
WebSocket输出扩展
从上游扩展接收结果,广播给所有连接的客户端
"""
def __init__(self, config: dict):
super().__init__(config)
self.clients: Set[websockets.WebSocketClientProtocol] = set()
def on_start(self, ten_env: TenEnv) -> None:
"""扩展启动"""
ten_env.log_info("WebSocket output extension started")
ten_env.on_start_done()
def on_stop(self, ten_env: TenEnv) -> None:
"""扩展停止"""
# 关闭所有客户端连接
asyncio.run(self._close_all_clients())
ten_env.on_stop_done()
async def _close_all_clients(self) -> None:
"""关闭所有客户端连接"""
for client in list(self.clients):
try:
await client.close()
except Exception:
pass
self.clients.clear()
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
"""
接收来自上游扩展的输出
参数:
cmd_data: 包含回复文本的数据字典
"""
text = cmd_data.get("text", "")
if not text:
return
ten_env.log_info(f"Broadcasting response: {text[:50]}...")
# 广播给所有客户端
asyncio.run(self._broadcast(text))
async def _broadcast(self, text: str) -> None:
"""
向所有客户端广播消息
参数:
text: 要广播的文本内容
"""
if not self.clients:
return
message = json.dumps({"type": "response", "text": text})
# 并发发送给所有客户端
await asyncio.gather(
*[client.send(message) for client in list(self.clients)],
return_exceptions=True
)
# 注册客户端连接(由外部调用)
def register_client(websocket) -> None:
"""注册新的WebSocket客户端"""
global _ws_output_instance
if _ws_output_instance:
_ws_output_instance.clients.add(websocket)
def unregister_client(websocket) -> None:
"""取消注册WebSocket客户端"""
global _ws_output_instance
if _ws_output_instance:
_ws_output_instance.clients.discard(websocket)
# 导出扩展
ten.add_extension("ws_output", WSOutputExtension)
# 用于跨模块访问的全局变量
_ws_output_instance: WSOutputExtension = None
定义图配置
扩展编写完成后,我们需要定义扩展之间的连接关系。TEN框架使用JSON格式的图配置文件。
创建graphs/chat_graph.json:
{
"graph": {
"name": "chat_graph",
"version": "1.0.0"
},
"extensions": {
"ws_input": {
"type": "ws_input",
"config": {
"host": "0.0.0.0",
"port": 8080
}
},
"llm": {
"type": "llm",
"config": {
"api_base": "https://api.openai.com/v1",
"api_key": "${OPENAI_API_KEY}",
"model": "gpt-3.5-turbo",
"max_tokens": 1000,
"temperature": 0.7,
"system_prompt": "你是一个友好的AI助手。请用简洁、有帮助的方式回答问题。"
}
},
"ws_output": {
"type": "ws_output",
"config": {}
}
},
"connections": [
{
"from": "ws_input",
"to": "llm",
"port": {
"out": "cmd",
"in": "cmd"
}
},
{
"from": "llm",
"to": "ws_output",
"port": {
"out": "cmd",
"in": "cmd"
}
}
]
}
配置说明:
extensions部分定义了图中使用的所有扩展及其配置${OPENAI_API_KEY}是环境变量引用,会在运行时被替换为实际值connections部分定义了扩展之间的连接关系和数据流向
启动应用
创建启动脚本run_chat.py:
#!/usr/bin/env python3
"""
聊天应用启动脚本
"""
import os
import sys
import signal
import argparse
import ten
from ten import TenEnv
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="TEN Chat Application")
parser.add_argument(
"--config",
"-c",
type=str,
default="graphs/chat_graph.json",
help="Path to graph configuration file"
)
parser.add_argument(
"--log-level",
"-l",
type=str,
default="info",
choices=["debug", "info", "warning", "error"],
help="Logging level"
)
parser.add_argument(
"--port",
"-p",
type=int,
default=8080,
help="WebSocket server port"
)
return parser.parse_args()
def main():
"""主函数"""
args = parse_args()
# 加载图配置
if not os.path.exists(args.config):
print(f"Error: Configuration file not found: {args.config}")
sys.exit(1)
print(f"Loading graph configuration from: {args.config}")
# 创建TEN运行时
ten_env = TenEnv.create(args.config)
# 设置日志级别
ten_env.set_log_level(args.log_level)
# 注册信号处理
def signal_handler(signum, frame):
print("\nShutting down...")
ten_env.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("Starting TEN Chat Application...")
print(f"WebSocket server will listen on port: {args.port}")
print("Press Ctrl+C to stop")
# 启动TEN运行时
ten_env.run()
if __name__ == "__main__":
main()
运行应用前,需要设置环境变量:
export OPENAI_API_KEY="your-api-key-here"
python run_chat.py
如果一切正常,你应该能看到类似以下的输出:
Loading graph configuration from: graphs/chat_graph.json
Starting TEN Chat Application...
WebSocket server will listen on port: 8080
Press Ctrl+C to stop
[TEN] INFO: Extension 'ws_input' started
[TEN] INFO: Extension 'llm' started
[TEN] INFO: Extension 'ws_output' started
测试应用
应用启动后,我们可以用WebSocket客户端进行测试。
创建测试脚本test_client.py:
#!/usr/bin/env python3
"""
WebSocket测试客户端
"""
import asyncio
import websockets
import json
async def test_chat():
"""测试聊天功能"""
uri = "ws://localhost:8080"
print(f"Connecting to {uri}...")
try:
async with websockets.connect(uri) as websocket:
print("Connected! Start chatting (type 'quit' to exit):\n")
# 接收消息任务
async def receive():
try:
async for message in websocket:
data = json.loads(message)
if data.get("type") == "response":
print(f"\nAI: {data['text']}\n")
print("You: ", end="", flush=True)
except websockets.exceptions.ConnectionClosed:
print("\nConnection closed by server")
# 启动接收任务
receive_task = asyncio.create_task(receive())
# 发送消息循环
while True:
user_input = input("You: ")
if user_input.lower() in ["quit", "exit", "q"]:
break
if not user_input.strip():
continue
# 发送消息
await websocket.send(json.dumps({"text": user_input}))
# 取消接收任务
receive_task.cancel()
except websockets.exceptions.ConnectionRefused:
print("Connection refused. Is the server running?")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(test_chat())
运行测试客户端:
python test_client.py
你应该能看到这样的对话:
Connecting to ws://localhost:8080...
Connected! Start chatting (type 'quit' to exit):
You: 你好,请介绍一下你自己
AI: 你好!我是...(实际输出取决于你配置的API和模型)
You: 刚才说的什么,能再说一遍吗
AI: 当然可以...(展示了多轮对话的能力)
You: quit
Connection closed by server
进阶功能:语音交互
文本对话只是基础,让我们再添加语音交互能力。这需要用到ASR(自动语音识别)和TTS(文本转语音)扩展。
ASR扩展
创建extensions/asr_extension.py:
"""
ASR扩展:自动语音识别
使用Whisper API进行语音识别
"""
import base64
import json
import aiohttp
from typing import Optional
import ten
from ten import TenEnv
class ASRExtension(TenExtension):
"""
ASR扩展
将接收到的音频转换为文本
"""
def __init__(self, config: dict):
super().__init__(config)
# API配置
self.api_base = config.get("api_base", "https://api.openai.com/v1")
self.api_key = config.get("api_key", "")
self.model = config.get("model", "whisper-1")
# HTTP会话
self.session: Optional[aiohttp.ClientSession] = None
def on_start(self, ten_env: TenEnv) -> None:
"""启动扩展"""
ten_env.log_info("ASR extension starting")
headers = {
"Authorization": f"Bearer {self.api_key}"
}
self.session = aiohttp.ClientSession(headers=headers)
ten_env.on_start_done()
def on_stop(self, ten_env: TenEnv) -> None:
"""停止扩展"""
if self.session:
asyncio.create_task(self.session.close())
ten_env.log_info("ASR extension stopped")
ten_env.on_stop_done()
def on_audio_frame(self, ten_env: TenEnv, frame_data: bytes) -> None:
"""
接收音频帧
参数:
ten_env: TEN运行时环境
frame_data: 原始音频数据
"""
ten_env.log_info(f"Received audio frame: {len(frame_data)} bytes")
# 累积音频数据直到完整
# 简化实现:每次收到都发送进行识别
# 生产环境应该做VAD(语音活动检测)
asyncio.run(self._process_audio(ten_env, frame_data))
async def _process_audio(self, ten_env: TenEnv, audio_data: bytes) -> None:
"""
处理音频数据
参数:
ten_env: TEN运行时环境
audio_data: 音频数据
"""
if not self.session:
ten_env.log_error("HTTP session not initialized")
return
try:
# 发送识别请求
url = f"{self.api_base}/audio/transcriptions"
form_data = aiohttp.FormData()
form_data.add_field(
"model",
self.model,
)
form_data.add_field(
"file",
audio_data,
filename="audio.wav",
content_type="audio/wav"
)
async with self.session.post(url, data=form_data) as resp:
if resp.status != 200:
error = await resp.text()
ten_env.log_error(f"ASR API error: {error}")
return
result = await resp.json()
text = result.get("text", "")
if text:
ten_env.log_info(f"Recognized: {text}")
# 发送识别结果给LLM
ten_env.send_cmd({"text": text}, dest="llm")
else:
ten_env.log_warning("No text recognized")
except Exception as e:
ten_env.log_error(f"ASR processing error: {e}")
# 导出扩展
ten.add_extension("asr", ASRExtension)
TTS扩展
创建extensions/tts_extension.py:
"""
TTS扩展:文本转语音
使用OpenAI TTS API进行语音合成
"""
import base64
import aiohttp
from typing import Optional
import ten
from ten import TenEnv
class TTSExtension(TenExtension):
"""
TTS扩展
将文本转换为语音输出
"""
def __init__(self, config: dict):
super().__init__(config)
# API配置
self.api_base = config.get("api_base", "https://api.openai.com/v1")
self.api_key = config.get("api_key", "")
self.model = config.get("model", "tts-1")
self.voice = config.get("voice", "alloy")
self.output_format = config.get("output_format", "mp3")
# HTTP会话
self.session: Optional[aiohttp.ClientSession] = None
def on_start(self, ten_env: TenEnv) -> None:
"""启动扩展"""
ten_env.log_info("TTS extension starting")
headers = {
"Authorization": f"Bearer {self.api_key}"
}
self.session = aiohttp.ClientSession(headers=headers)
ten_env.on_start_done()
def on_stop(self, ten_env: TenEnv) -> None:
"""停止扩展"""
if self.session:
asyncio.create_task(self.session.close())
ten_env.log_info("TTS extension stopped")
ten_env.on_stop_done()
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
"""
接收需要转换为语音的文本
参数:
cmd_data: 包含text字段的数据字典
"""
text = cmd_data.get("text", "")
if not text:
return
ten_env.log_info(f"Synthesizing: {text[:50]}...")
# 异步合成
asyncio.run(self._synthesize(ten_env, text))
async def _synthesize(self, ten_env: TenEnv, text: str) -> None:
"""
调用TTS API合成语音
参数:
ten_env: TEN运行时环境
text: 要合成的文本
"""
if not self.session:
ten_env.log_error("HTTP session not initialized")
return
try:
url = f"{self.api_base}/audio/speech"
payload = {
"model": self.model,
"input": text,
"voice": self.voice,
"response_format": self.output_format
}
async with self.session.post(url, json=payload) as resp:
if resp.status != 200:
error = await resp.text()
ten_env.log_error(f"TTS API error: {error}")
return
# 获取音频数据
audio_data = await resp.read()
ten_env.log_info(f"Synthesized audio: {len(audio_data)} bytes")
# 发送音频数据
# 这里可以添加格式转换
ten_env.send_audio_frame(audio_data)
except Exception as e:
ten_env.log_error(f"TTS synthesis error: {e}")
# 导出扩展
ten.add_extension("tts", TTSExtension)
音频图配置
创建graphs/voice_chat_graph.json:
{
"graph": {
"name": "voice_chat_graph",
"version": "1.0.0"
},
"extensions": {
"audio_input": {
"type": "audio_input",
"config": {
"sample_rate": 16000,
"channels": 1,
"format": "pcm_s16le"
}
},
"asr": {
"type": "asr",
"config": {
"api_base": "https://api.openai.com/v1",
"api_key": "${OPENAI_API_KEY}",
"model": "whisper-1"
}
},
"llm": {
"type": "llm",
"config": {
"api_base": "https://api.openai.com/v1",
"api_key": "${OPENAI_API_KEY}",
"model": "gpt-3.5-turbo",
"system_prompt": "你是一个友好的语音助手。请用简洁的口语风格回答问题。"
}
},
"tts": {
"type": "tts",
"config": {
"api_base": "https://api.openai.com/v1",
"api_key": "${OPENAI_API_KEY}",
"model": "tts-1",
"voice": "alloy"
}
},
"audio_output": {
"type": "audio_output",
"config": {
"sample_rate": 24000,
"format": "mp3"
}
}
},
"connections": [
{
"from": "audio_input",
"to": "asr",
"port": {
"out": "audio",
"in": "audio_frame"
}
},
{
"from": "asr",
"to": "llm",
"port": {
"out": "cmd",
"in": "cmd"
}
},
{
"from": "llm",
"to": "tts",
"port": {
"out": "cmd",
"in": "cmd"
}
},
{
"from": "tts",
"to": "audio_output",
"port": {
"out": "audio_frame",
"in": "audio"
}
}
]
}
现在,数据流变成了:
用户音频 → audio_input → asr → llm → tts → audio_output → 用户播放
这是一个完整的语音对话pipeline!
扩展开发的最佳实践
经过前面的实战演练,你已经掌握了TEN扩展开发的基础。现在,让我们来总结一些扩展开发的最佳实践,这些都是从实际项目经验中提炼出来的。
错误处理与容错
健壮的错误处理是生产级应用的必备素质。
使用try-except包裹所有可能失败的代码:
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
try:
# 处理逻辑
result = self.process_input(cmd_data)
ten_env.send_cmd({"result": result})
except ValueError as e:
ten_env.log_error(f"Invalid input: {e}")
ten_env.send_cmd({"error": "Invalid input format"})
except Exception as e:
ten_env.log_error(f"Unexpected error: {e}")
ten_env.send_cmd({"error": "Internal error"})
实现重试机制:
import asyncio
async def call_api_with_retry(self, payload: dict, max_retries: int = 3) -> dict:
"""
带重试的API调用
参数:
payload: 请求数据
max_retries: 最大重试次数
返回:
API响应数据
"""
for attempt in range(max_retries):
try:
response = await self._call_api_once(payload)
return response
except RateLimitError:
wait_time = 2 ** attempt # 指数退避
ten_env.log_warning(f"Rate limited, waiting {wait_time}s")
await asyncio.sleep(wait_time)
except TemporaryError as e:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
raise
raise RuntimeError("Max retries exceeded")
实现熔断器模式:
class CircuitBreaker:
"""
熔断器实现
防止级联故障
"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half_open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
raise
性能优化
性能是实时应用的生命线。以下是一些优化技巧。
批量处理消息:
class BatchProcessor:
"""
批量处理器
将多条消息合并处理,减少API调用次数
"""
def __init__(self, batch_size: int = 10, batch_timeout: float = 0.5):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.buffer: List[dict] = []
self.last_process_time = time.time()
self.processing = False
def add(self, item: dict) -> None:
"""添加消息到缓冲区"""
self.buffer.append(item)
# 检查是否需要触发处理
if len(self.buffer) >= self.batch_size:
self.process()
elif time.time() - self.last_process_time > self.batch_timeout:
self.process()
def process(self) -> None:
"""处理缓冲区中的所有消息"""
if not self.buffer or self.processing:
return
self.processing = True
items = self.buffer
self.buffer = []
self.last_process_time = time.time()
try:
# 批量处理
self.batch_process(items)
finally:
self.processing = False
def batch_process(self, items: List[dict]) -> None:
"""子类实现的批量处理逻辑"""
raise NotImplementedError
使用异步I/O:
class AsyncExtension(TenExtension):
"""
异步扩展基类
所有I/O操作都应该异步化
"""
def __init__(self, config: dict):
super().__init__(config)
self._loop: Optional[asyncio.AbstractEventLoop] = None
def on_start(self, ten_env: TenEnv) -> None:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
ten_env.on_start_done()
def on_stop(self, ten_env: TenEnv) -> None:
if self._loop:
pending = asyncio.all_tasks(self._loop)
for task in pending:
task.cancel()
self._loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True)
)
self._loop.close()
ten_env.on_stop_done()
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
# 使用create_task异步处理,不要阻塞
asyncio.create_task(self._handle_cmd_async(ten_env, cmd_data))
async def _handle_cmd_async(self, ten_env: TenEnv, cmd_data: dict) -> None:
"""异步处理命令"""
try:
result = await self._process_async(cmd_data)
ten_env.send_cmd({"result": result})
except Exception as e:
ten_env.log_error(f"Error: {e}")
日志与调试
好的日志是调试的利器。
结构化日志:
def log_interaction(self, ten_env: TenEnv, user_id: str,
input_type: str, output_type: str,
duration_ms: float) -> None:
"""
记录交互日志
参数:
ten_env: TEN运行时环境
user_id: 用户标识
input_type: 输入类型(text/audio/image)
output_type: 输出类型
duration_ms: 处理耗时(毫秒)
"""
log_data = {
"event": "interaction",
"user_id": user_id,
"input_type": input_type,
"output_type": output_type,
"duration_ms": duration_ms,
"timestamp": datetime.now().isoformat()
}
ten_env.log_info(json.dumps(log_data))
分级日志:
class LoggedExtension(TenExtension):
"""
带日志功能的扩展基类
根据配置控制日志级别
"""
def __init__(self, config: dict):
super().__init__(config)
self.debug_mode = config.get("debug_mode", False)
def debug(self, ten_env: TenEnv, message: str) -> None:
"""调试日志"""
if self.debug_mode:
ten_env.log_info(f"[DEBUG] {message}")
def trace(self, ten_env: TenEnv, message: str, data: dict) -> None:
"""追踪日志,包含详细数据"""
if self.debug_mode:
ten_env.log_info(f"[TRACE] {message}: {json.dumps(data)}")
常见使用场景
TEN框架的设计非常灵活,可以适应多种使用场景。让我介绍几个典型的应用场景以及对应的架构设计。
场景一:智能客服机器人
智能客服是最常见的使用场景之一。用户可以通过文本或语音与企业客服系统对话,获得24/7的服务。
架构设计:
客户端 → WebSocket网关 → 意图识别LLM → 知识库检索 → 回复生成LLM → 客户端
↓
人工客服转接(当需要时)
关键扩展配置:
{
"extensions": {
"intent_classifier": {
"type": "llm",
"config": {
"system_prompt": "你是一个客服意图分类器。根据用户输入,分类其意图:\
1-产品咨询 2-投诉建议 3-订单查询 4-技术支持 5-其他。请只输出数字。"
}
},
"knowledge_base": {
"type": "knowledge_retrieval",
"config": {
"vector_db": "milvus",
"collection": "product_faq",
"top_k": 3
}
},
"response_generator": {
"type": "llm",
"config": {
"system_prompt": "你是一个专业的客服助手。根据检索到的知识库内容,\
用友好、专业的语气回答用户问题。"
}
}
}
}
场景二:实时翻译助手
实时翻译是另一个实用场景。结合ASR、MT(机器翻译)和TTS,可以实现语音实时翻译。
架构设计:
源语音 → ASR → 语言检测 → 机器翻译 → TTS → 目标语音
↓
文字显示
关键代码:
class TranslationPipeline(TenExtension):
"""
翻译处理管道
组合多个服务完成翻译任务
"""
def __init__(self, config: dict):
super().__init__(config)
# 支持的语言对
self.source_lang = config.get("source_lang", "auto")
self.target_lang = config.get("target_lang", "zh")
# 各环节的API配置
self.asr_config = config.get("asr", {})
self.mt_config = config.get("mt", {})
self.tts_config = config.get("tts", {})
async def translate(self, audio_data: bytes) -> bytes:
"""
完整翻译流程
参数:
audio_data: 源语言音频
返回:
目标语言音频
"""
# 1. 语音识别
source_text = await self.asr.recognize(audio_data)
self.debug(f"Recognized: {source_text}")
# 2. 语言检测(如果源语言是auto)
if self.source_lang == "auto":
detected_lang = await self.detect_language(source_text)
self.debug(f"Detected language: {detected_lang}")
# 3. 机器翻译
target_text = await self.mt.translate(
source_text,
source_lang=self.source_lang,
target_lang=self.target_lang
)
self.debug(f"Translated: {target_text}")
# 4. 语音合成
target_audio = await self.tts.synthesize(
target_text,
language=self.target_lang
)
return target_audio
场景三:AI陪伴应用
AI陪伴是一种新兴的应用形态,AI角色需要保持长期记忆、情感交互、个性化回复等能力。
架构设计:
用户输入 → 情感分析 → 记忆检索 → LLM生成 → 记忆存储 → 回复输出
↓
性格调整
记忆系统设计:
class MemorySystem:
"""
AI陪伴的记忆系统
实现短期记忆、长期记忆和人格记忆
"""
def __init__(self, user_id: str):
self.user_id = user_id
# 短期记忆:最近N轮对话
self.short_term: List[dict] = []
self.short_term_limit = 10
# 长期记忆:重要事件摘要
self.long_term: List[dict] = []
# 人格记忆:用户偏好、习惯等
self.persona: dict = {}
# 存储后端(这里用简单的内存存储,生产环境应该用数据库)
self.storage = MemoryStorage(user_id)
# 加载已有记忆
self._load()
def add_interaction(self, user_input: str, ai_response: str) -> None:
"""
添加一轮对话到记忆
参数:
user_input: 用户输入
ai_response: AI回复
"""
interaction = {
"user": user_input,
"ai": ai_response,
"timestamp": time.time()
}
# 添加到短期记忆
self.short_term.append(interaction)
# 超过限制,提取摘要后移入长期记忆
if len(self.short_term) > self.short_term_limit:
summary = self._summarize(self.short_term)
self.long_term.append(summary)
self.short_term = self.short_term[-self.short_term_limit:]
# 保存到持久化存储
self._save()
def get_context(self) -> str:
"""
获取用于LLM的上下文信息
返回:
格式化的上下文字符串
"""
parts = []
# 添加长期记忆
if self.long_term:
parts.append("=== 重要记忆 ===")
for memory in self.long_term[-3:]:
parts.append(f"- {memory['summary']}")
# 添加近期对话
if self.short_term:
parts.append("\n=== 最近对话 ===")
for interaction in self.short_term[-5:]:
parts.append(f"用户: {interaction['user']}")
parts.append(f"助手: {interaction['ai']}")
return "\n".join(parts)
def update_persona(self, key: str, value: any) -> None:
"""更新用户画像"""
self.persona[key] = value
self._save()
def _load(self) -> None:
"""从存储加载记忆"""
data = self.storage.load()
if data:
self.long_term = data.get("long_term", [])
self.persona = data.get("persona", {})
def _save(self) -> None:
"""保存记忆到存储"""
self.storage.save({
"long_term": self.long_term,
"persona": self.persona
})
def _summarize(self, interactions: List[dict]) -> dict:
"""
总结多轮对话,提炼关键信息
简化实现:提取关键词
生产环境应该调用LLM进行总结
"""
all_text = " ".join([
i["user"] + " " + i["ai"]
for i in interactions
])
# 简单的关键词提取
keywords = self._extract_keywords(all_text)
return {
"summary": f"讨论了: {', '.join(keywords)}",
"timestamp": time.time()
}
def _extract_keywords(self, text: str) -> List[str]:
"""提取关键词(简化实现)"""
# 这里应该使用NLP库进行关键词提取
# 简化版本只是按空格分词
words = text.split()
# 过滤停用词
stopwords = {"的", "了", "是", "在", "我", "有", "和", "就", "不", "人"}
keywords = [w for w in words if len(w) > 2 and w not in stopwords]
return list(set(keywords))[:5]
class MemoryStorage:
"""记忆存储基类"""
def __init__(self, user_id: str):
self.user_id = user_id
self.path = f"data/memory_{user_id}.json"
def load(self) -> dict:
"""加载记忆"""
if os.path.exists(self.path):
with open(self.path, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save(self, data: dict) -> None:
"""保存记忆"""
os.makedirs(os.path.dirname(self.path), exist_ok=True)
with open(self.path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
部署与运维
代码开发完成后,如何把它部署到生产环境呢?这一部分我们将讨论部署方案和运维实践。
Docker容器化部署
容器化是现代应用部署的标准做法。让我们为TEN应用创建一个Dockerfile。
# 基础镜像
FROM python:3.11-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
ffmpeg \
libsndfile1 \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建数据目录
RUN mkdir -p /app/data /app/logs
# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV LOG_LEVEL=info
# 暴露端口
EXPOSE 8080
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8080/health')"
# 启动命令
CMD ["python", "run_chat.py"]
创建docker-compose.yml来管理服务:
version: "3.8"
services:
ten-app:
build:
context: .
dockerfile: Dockerfile
ports:
- "8080:8080"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LOG_LEVEL=info
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
# 可选:Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
restart: unless-stopped
volumes:
redis-data:
Kubernetes部署
对于大规模部署,Kubernetes是更好的选择。
创建k8s/deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: ten-chat
labels:
app: ten-chat
spec:
replicas: 3
selector:
matchLabels:
app: ten-chat
template:
metadata:
labels:
app: ten-chat
spec:
containers:
- name: ten-app
image: your-registry/ten-chat:latest
ports:
- containerPort: 8080
name: websocket
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-api-key
- name: LOG_LEVEL
value: "info"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- ten-chat
topologyKey: "kubernetes.io/hostname"
---
apiVersion: v1
kind: Service
metadata:
name: ten-chat-service
spec:
type: LoadBalancer
selector:
app: ten-chat
ports:
- port: 80
targetPort: 8080
name: websocket
监控与告警
生产环境离不开监控。让我们为TEN应用添加监控能力。
Prometheus指标:
from prometheus_client import Counter, Histogram, Gauge, start_http_server
class MetricsExtension(TenExtension):
"""
监控指标扩展
暴露Prometheus格式的指标
"""
# 定义指标
request_count = Counter(
"ten_requests_total",
"Total number of requests",
["extension", "type"]
)
request_duration = Histogram(
"ten_request_duration_seconds",
"Request processing duration",
["extension", "type"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
active_connections = Gauge(
"ten_active_connections",
"Number of active connections"
)
error_count = Counter(
"ten_errors_total",
"Total number of errors",
["extension", "error_type"]
)
def __init__(self, config: dict):
super().__init__(config)
self.port = config.get("metrics_port", 9090)
def on_start(self, ten_env: TenEnv) -> None:
# 启动Prometheus HTTP服务器
start_http_server(self.port)
ten_env.log_info(f"Metrics server started on port {self.port}")
ten_env.on_start_done()
def record_request(self, extension: str, request_type: str,
duration: float) -> None:
"""记录请求"""
self.request_count.labels(extension, request_type).inc()
self.request_duration.labels(extension, request_type).observe(duration)
def record_error(self, extension: str, error_type: str) -> None:
"""记录错误"""
self.error_count.labels(extension, error_type).inc()
# 导出扩展
ten.add_extension("metrics", MetricsExtension)
Grafana仪表盘配置:
# grafana/dashboards/ten-overview.json
{
"dashboard": {
"title": "TEN Framework Overview",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(ten_requests_total[5m])",
"legendFormat": "{{extension}} - {{type}}"
}
]
},
{
"title": "Request Latency (p95)",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(ten_request_duration_seconds_bucket[5m]))",
"legendFormat": "{{extension}}"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(ten_errors_total[5m]) / rate(ten_requests_total[5m])",
"legendFormat": "{{extension}} - {{error_type}}"
}
]
},
{
"title": "Active Connections",
"type": "stat",
"targets": [
{
"expr": "ten_active_connections"
}
]
}
]
}
}
故障排查指南
在实际使用中,难免会遇到各种问题。以下是常见问题及其解决方案。
连接问题
WebSocket连接失败:
如果客户端无法连接到TEN服务器,首先检查以下几点:
# 1. 确认服务是否正在运行
ps aux | grep python
netstat -tlnp | grep 8080
# 2. 检查防火墙规则
sudo iptables -L -n | grep 8080
# 3. 查看服务日志
tail -f logs/ten.log | grep -i error
连接被重置:
这通常是由于超时设置引起的。调整WebSocket配置:
# 在ws_input扩展中添加超时配置
async def _handle_connection(self, websocket, path: str) -> None:
try:
async for message in websocket:
# 处理消息,保持连接活跃
await self.process_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"Connection error: {e}")
性能问题
延迟过高:
延迟过高可能由多个因素导致:
- 网络延迟:检查客户端与服务器之间的网络状况
- API响应时间:监控LLM API的响应时间,考虑使用更快的模型
- 处理瓶颈:使用profiling工具定位瓶颈
# 添加性能监控
import time
def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
start_time = time.time()
try:
result = self.process(cmd_data)
duration = time.time() - start_time
ten_env.log_info(f"Processed in {duration*1000:.2f}ms")
# 发送监控指标
ten_env.send_cmd({"metric": "latency", "value": duration}, dest="metrics")
except Exception as e:
ten_env.log_error(f"Processing error: {e}")
内存占用过高:
长期运行的应用可能出现内存泄漏。使用以下方法排查:
# 使用memory_profiler监控内存
pip install memory_profiler
python -m memory_profiler run_chat.py
# 生成内存快照
import tracemalloc
tracemalloc.start()
# 在应用运行中
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
扩展加载问题
扩展找不到:
确保扩展文件在正确的位置,并且名称正确:
# 检查扩展目录结构
ls -la extensions/
# 扩展文件应该有正确的命名
# extensions/my_extension.py 应该是 class MyExtension
# 在配置文件中引用正确的名称
{
"extensions": {
"my_extension": {
"type": "my_extension" # 这里的名字要与 ten.add_extension() 一致
}
}
}
扩展初始化失败:
检查扩展的__init__方法和生命周期方法:
class MyExtension(TenExtension):
def __init__(self, config: dict):
super().__init__(config) # 一定要调用父类的__init__
# 检查必要配置
required_fields = ["api_key", "model"]
for field in required_fields:
if field not in config:
raise ValueError(f"Missing required config: {field}")
def on_start(self, ten_env: TenEnv) -> None:
# 不要忘记调用 on_start_done()
try:
self.initialize()
ten_env.on_start_done()
except Exception as e:
ten_env.log_error(f"Initialization failed: {e}")
# 初始化失败也要通知TEN运行时
ten_env.on_start_done(error=str(e))
总结与资源推荐
经过这篇详尽的实战指南,你应该已经掌握了TEN框架的核心概念和开发技能。让我们来回顾一下学到的内容,然后探索更多学习资源。
核心要点回顾
架构理解:TEN框架采用扩展驱动的设计,核心系统精简,功能通过扩展实现。理解扩展、端口、图的概念是掌握TEN的关键。
开发模式:扩展开发遵循统一的模式——继承基类、实现生命周期方法、处理消息、发送输出。Python扩展开发门槛低,适合快速原型。
扩展类型:AI模型扩展(LLM、ASR、TTS)、通信扩展(WebSocket、WebRTC)、处理扩展(格式转换、过滤)是常见的扩展类型。
最佳实践:健壮的错误处理、性能优化、结构化日志是生产级应用的必备要素。
部署运维:Docker容器化、Kubernetes编排、监控告警构成了完整的运维体系。
相关资源
官方资源:
- TEN框架GitHub仓库:https://github.com/TEN-framework/ten-framework
- 官方文档:https://ten-framework.readthedocs.io
- 示例代码库:https://github.com/TEN-framework/ten-examples
社区资源:
- TEN Framework Discord社区
- GitHub Discussions
- 知乎专栏「TEN框架实战」
扩展生态:
- ten-extensions:社区维护的扩展集合
- ten-vector-store:向量数据库集成
- ten-speech:更多语音处理扩展
下一步学习建议
如果你想继续深入学习,以下是一些建议的学习方向:
C++扩展开发:虽然Python扩展上手快,但C++扩展能提供更好的性能。学习TEN框架的C++ SDK,编写高性能的原生扩展。
自定义通信协议:除了WebSocket,了解WebRTC、RTSP等实时通信协议的实现原理,为音视频应用打基础。
分布式架构:学习如何在Kubernetes环境中部署和扩展TEN应用,处理大规模并发连接。
AI模型集成:深入了解各类AI模型(LLM、ASR、TTS、Embedding)的特性和最佳实践,优化AI应用的效果。
监控与可观测性:学习OpenTelemetry、Jaeger等可观测性工具,构建完善的分布式追踪体系。
结语
TEN框架为实时AI应用开发提供了一个优雅、高效的解决方案。它的设计理念——扩展驱动、灵活组合——非常符合现代软件开发的原则。无论你是想快速原型验证,还是构建生产级系统,TEN框架都值得一试。
当然,没有任何框架是银弹。TEN框架也有其适用场景和局限性。在实际项目中,需要根据具体需求权衡选择。但我相信,掌握了TEN框架的技能,你在实时AI应用开发领域会更有底气。
最后,动手实践是最好的学习方式。建议你按照这篇指南的步骤,亲手搭建一个AI对话应用,在实践中加深理解。遇到问题时,积极参与社区讨论,你会有意想不到的收获。
祝你在TEN框架的学习之旅中有所收获!如果这篇文章对你有帮助,欢迎分享给需要的朋友。
相关AI项目推荐:
- LangChain:LLM应用开发框架,与TEN可以互补使用
- Vectorizer:向量数据库客户端集合,支持多种向量存储
- ** Whisper**:OpenAI开源的语音识别模型
- Coqui TTS:开源文本转语音引擎
- FastAPI:现代化的Python Web框架,适合构建API服务
- Celery:分布式任务队列,适合异步处理重负载任务
这些项目都与TEN框架有潜在的集成价值,可以帮助你构建更完整的AI应用。
评论区