别再为实时AI应用掉头发了,TEN框架让开发效率提升10倍的实战指南

别再为实时AI应用掉头发了,TEN框架让开发效率提升10倍的实战指南

别再为实时AI应用掉头发了,TEN框架让开发效率提升10倍的实战指南

前言

当我第一次看到TEN框架的代码仓库时,我的内心其实是拒绝的——又一个声称能改变游戏规则的AI框架?但当我真正深入了解并动手实践之后,我不得不承认:这个来自TEN团队的实时AI应用开发框架,确实解决了我在过去一年里踩过的无数坑。

作为一个常年在一线写代码的开发者,我深知实时AI应用开发的痛点:延迟太高、扩展性差、调试困难、多模态集成复杂……这些问题几乎每个AI项目都会遇到。而TEN框架,正是为了解决这些问题而生的。

今天,我将用这篇实战指南,带你从零开始掌握TEN框架。无论你是想快速搭建一个AI聊天机器人,还是想构建复杂的多模态实时交互系统,这篇文章都能给你提供可操作的方案。


为什么值得深入学习TEN框架

传统开发方式的核心痛点

在深入TEN框架之前,让我们先聊聊传统实时AI应用开发的几大难题。

第一个问题是延迟与实时性的矛盾。很多AI应用需要实时响应用户的输入,但传统的请求-响应模式往往伴随着秒级甚至更长的延迟。想象一下,用户说了一句话,要等上3-5秒才能得到回复,这种体验简直让人崩溃。

第二个问题是架构扩展的噩梦。当你需要支持更多用户、更多功能时,往往需要对整个系统进行重构。微服务架构虽然能解决部分问题,但引入的复杂性也不容小觑。

第三个问题是多模态集成的地狱。现代AI应用往往需要整合语音、图像、文本等多种模态,但每种模态都有不同的API、不同的协议、不同的处理逻辑,整合起来费时费力。

第四个问题是状态管理的混乱。AI对话需要维护上下文状态,传统的无状态HTTP请求根本满足不了需求,而自己实现状态管理又容易出bug。

TEN框架的核心价值主张

TEN框架的设计理念非常清晰:让实时AI应用的开发变得像搭积木一样简单

TEN的全称是Transport Extension Network,顾名思义,它是一个专注于数据传输与扩展的网络框架。但它又不仅仅是一个传输框架——它提供了完整的实时AI应用开发解决方案,包括但不限于:

  • 统一的扩展系统:无论是LLM、语音识别、图像处理还是其他AI能力,都可以通过扩展的形式无缝集成
  • 内置的状态管理:对话上下文、用户会话等状态信息由框架自动管理,开发者只需关注业务逻辑
  • 高效的实时通信:基于WebRTC和其他现代通信协议,确保低延迟的用户体验
  • 灵活的扩展机制:你可以使用C++编写高性能扩展,也可以使用Python进行快速原型开发

更重要的是,TEN是开源项目,这意味着你可以完全掌控自己的技术栈,不被任何封闭平台绑架。

社区与生态现状

TEN框架虽然相对年轻,但发展势头非常迅猛。项目在GitHub上已经获得了可观的关注,文档和示例也在不断完善。更重要的是,TEN团队提供了活跃的技术支持,社区中也有不少开发者分享自己的实践经验。


快速上手:环境搭建全流程

系统要求与依赖

在开始之前,让我们先确认你的开发环境是否满足要求。

TEN框架的主要组件使用C++编写,因此需要一台能够编译C++代码的机器。不过,得益于现代的构建系统和Docker支持,跨平台开发也变得非常简单。

基本要求:

  • 操作系统:Linux(推荐Ubuntu 20.04或更高版本)、macOS、Windows(通过WSL2)
  • 内存:至少8GB(推荐16GB或更多,用于运行AI模型)
  • 磁盘空间:至少20GB可用空间
  • C++编译器:GCC 9+ 或 Clang 10+
  • Python 3.8+(用于编写扩展)

开发工具:

  • Git
  • CMake 3.16+
  • Docker(可选,但强烈推荐)
  • Visual Studio Code(或其他编辑器,配合C++扩展使用效果更佳)

安装步骤详解

方式一:使用Docker(推荐新手)

Docker方式是最简单快捷的安装方式,特别适合刚入门的朋友。

第一步,拉取TEN框架的官方Docker镜像:

docker pull tenAILab/ten_framework:latest

这个镜像包含了编译好的TEN框架核心库以及常用的开发工具。开箱即用,省去了手动配置的麻烦。

第二步,创建一个容器实例:

docker run -it --name ten-dev \
    -v /path/to/your/project:/workspace \
    -p 8080:8080 \
    tenAILab/ten_framework:latest

这里我们将宿主机的项目目录挂载到容器内,方便在宿主机编辑代码,在容器内编译运行。端口8080用于暴露TEN框架的服务端口。

第三步,验证安装是否成功:

# 在容器内执行
ten-version

如果输出了TEN框架的版本号,说明安装成功。

方式二:源码编译安装

如果你需要定制化开发,或者想深入理解TEN框架的内部机制,源码编译是更好的选择。

首先,克隆TEN框架的代码仓库:

git clone https://github.com/TEN-framework/ten-framework.git
cd ten-framework

然后,创建构建目录并配置CMake:

mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release

CMake配置完成后,你可以看到详细的构建选项:

-- The C compiler identification is GNU 11.4.0
-- The CXX compiler identification is GNU 11.4.0
-- Check for working C/C++ compiler: /usr/bin/cc
-- Check for working C/C++ compiler: /usr/bin/cxx
-- Ten Framework version: 0.8.0
-- Found Git: /usr/bin/git
-- Found Python3: /usr/bin/python3

配置完成后,开始编译:

make -j$(nproc)

编译过程可能需要几分钟,取决于你的机器性能。编译完成后,安装到系统:

sudo make install

最后,验证安装:

ten-version

开发环境配置

安装完成后,我们需要配置一个趁手的开发环境。

IDE配置

推荐使用Visual Studio Code,配合以下扩展:

  • C/C++(微软官方)
  • CMake Tools
  • Python
  • GitLens

创建工作区配置文件.vscode/settings.json

{
    "cmake.configureOnOpen": true,
    "cmake.buildDirectory": "${workspaceFolder}/build",
    "C_Cpp.default.configurationProvider": "ms-vscode.cmake-tools",
    "files.associations": {
        "*.ten": "cpp"
    }
}

Python环境配置

TEN框架支持使用Python编写扩展,我们需要配置Python开发环境:

# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate

# 安装TEN Python SDK
pip install ten-sdk

# 验证安装
python -c "import ten; print(ten.__version__)"

核心概念与架构

TEN框架的整体架构

在动手写代码之前,我们需要先理解TEN框架的整体架构。这就像建房子要先看图纸一样,磨刀不误砍柴工。

TEN框架采用了扩展驱动的架构模式。核心系统非常精简,提供了基础的通信、调度和状态管理能力,而具体的功能则通过扩展来实现。

核心组件

TEN运行时(TEN Runtime)是整个框架的心脏,负责管理扩展的生命周期、处理消息路由、维护状态等核心功能。运行时采用事件驱动模型,确保高效的资源利用。

扩展(Extension)是功能实现的基本单元。每个扩展都是一个独立的模块,可以访问TEN运行时提供的API,与其他扩展进行通信。每个扩展都有以下特性:

  • 独立的生命周期(初始化、运行、销毁)
  • 输入输出端口(用于接收和发送消息)
  • 状态存储空间
  • 配置参数

图(Graph)是扩展之间的连接关系。你可以把图理解为一张施工蓝图,定义了有哪些扩展,以及它们之间如何连接、如何通信。

端口(Port)是扩展之间通信的接口。每个扩展可以定义多个输入端口和输出端口,端口之间可以建立连接,形成数据流。

数据流模型

TEN框架的数据流模型非常直观:消息从源扩展的输出端口流出,经过图中的连接,到达目标扩展的输入端口

这种模型有几个关键优势:

解耦:扩展之间不需要知道彼此的实现细节,只需要通过端口进行交互。这大大简化了代码结构,也方便了模块的复用。

可组合:你可以轻松地创建新的扩展组合来实现新功能,而不需要修改已有的扩展代码。

可视化:由于数据流是显式定义的,整个应用的结构一目了然。这对于调试和维护都非常有帮助。

扩展的分类

TEN框架的扩展可以分为几大类:

AI模型扩展是最常用的类型,包括LLM扩展(用于接入大语言模型)、ASR扩展(语音识别)、TTS扩展(语音合成)、图像生成扩展等。这类扩展通常需要调用外部API或本地模型服务。

通信扩展负责与外部世界的数据交换。比如WebRTC扩展处理浏览器端的实时通信,WebSocket扩展提供HTTP协议的实时双向通信,gRPC扩展则适用于微服务架构。

处理扩展对数据进行转换和处理。比如格式转换扩展负责把音频从PCM转成Opus,过滤扩展根据规则丢弃某些消息,聚合扩展则把多个消息合并成一个。

工具扩展提供各种辅助功能。比如日志扩展记录运行信息,监控扩展收集性能指标,缓存扩展加速重复请求的处理。

状态管理机制

实时AI应用往往需要维护对话上下文、用户状态等信息。TEN框架提供了内置的状态管理机制,让这变得简单。

每个扩展都可以访问一个状态存储空间,这个空间是持久化的,不会因为消息处理的结束而丢失。你可以把状态空间理解为一个键值数据库,支持基本的读写操作:

# 读取状态
context = state.get("conversation_context", default="")

# 更新状态
context += f"\nUser: {user_input}"
state.set("conversation_context", context)

状态管理还支持事务操作,确保在并发场景下的数据一致性:

# 原子性更新
state.update_atomic("request_count", lambda x: x + 1)

实战教程:从零构建一个AI对话助手

现在你已经掌握了基础概念,是时候动手实践了。我们将一步步构建一个完整的AI对话助手,功能包括:接收用户文本输入、调用LLM生成回复、支持多轮对话上下文。

项目结构

首先,创建项目目录结构:

mkdir -p my-ai-assistant/{src,extensions,graphs,config}
cd my-ai-assistant

目录结构说明:

  • src/:存放TEN框架的C++源码(如果需要修改或扩展核心)
  • extensions/:存放自定义扩展的代码
  • graphs/:存放图配置文件,定义扩展之间的连接关系
  • config/:存放各种配置参数

对于这个简单项目,我们主要使用Python编写扩展,所以src/目录可以暂时为空。

编写第一个扩展:Echo扩展

让我们从最简单的扩展开始——一个回声扩展,收到什么就返回什么。这是理解扩展开发模式的最佳起点。

创建文件extensions/echo_extension.py

"""
Echo扩展:最简单的TEN扩展示例
收到输入后,原样返回
"""

import ten
from ten import TenEnv


class EchoExtension(TenExtension):
    """
    Echo扩展类
    继承自TenExtension基类
    """

    def __init__(self, config):
        """
        构造函数
        参数:
            config: 扩展配置字典
        """
        super().__init__(config)
        self.echo_prefix = config.get("echo_prefix", "")

    def on_start(self, ten_env: TenEnv) -> None:
        """
        扩展启动时调用
        这里可以做一些初始化工作
        """
        ten_env.log_info("Echo extension started")
        ten_env.on_start_done()

    def on_stop(self, ten_env: TenEnv) -> None:
        """
        扩展停止时调用
        这里可以清理资源
        """
        ten_env.log_info("Echo extension stopped")
        ten_env.on_stop_done()

    def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
        """
        处理接收到的命令
        参数:
            ten_env: TEN运行时环境
            cmd_data: 命令数据字典
        """
        # 获取输入文本
        text = cmd_data.get("text", "")

        # 添加前缀并构造回复
        response_text = f"{self.echo_prefix}{text}"

        # 发送回复
        response_cmd = {
            "text": response_text
        }
        ten_env.send_cmd(response_cmd)

    def on_audio_frame(self, ten_env: TenEnv, frame_data: bytes) -> None:
        """
        处理接收到的音频帧
        参数:
            ten_env: TEN运行时环境
            frame_data: 音频帧原始数据
        """
        # 对于echo扩展,直接原样返回音频数据
        ten_env.send_audio_frame(frame_data)


# 导出扩展类
# 这是TEN框架识别扩展的入口点
ten.add_extension("echo", EchoExtension)

这个扩展虽然简单,但展示了扩展开发的核心模式:

  • 继承TenExtension基类
  • 实现生命周期方法(on_starton_stop
  • 实现消息处理方法(on_cmdon_audio_frame
  • 调用ten_env的API发送消息或日志

编写LLM扩展

现在我们把Echo扩展升级成真正的AI对话扩展。这需要接入LLM API来完成智能回复。

创建文件extensions/llm_extension.py

"""
LLM扩展:接入大语言模型
使用OpenAI兼容API,支持自定义模型配置
"""

import json
import aiohttp
from typing import Optional, List, Dict
import ten
from ten import TenEnv


class Message:
    """对话消息结构"""

    def __init__(self, role: str, content: str):
        self.role = role
        self.content = content

    def to_dict(self) -> dict:
        return {"role": self.role, "content": self.content}


class LLMExtension(TenExtension):
    """
    LLM扩展类
    负责与大语言模型API交互
    """

    def __init__(self, config: dict):
        super().__init__(config)

        # API配置
        self.api_base = config.get("api_base", "https://api.openai.com/v1")
        self.api_key = config.get("api_key", "")
        self.model = config.get("model", "gpt-3.5-turbo")
        self.max_tokens = config.get("max_tokens", 1000)
        self.temperature = config.get("temperature", 0.7)

        # 对话历史
        self.conversation_history: List[Message] = []
        self.max_history = config.get("max_history", 10)

        # 系统提示词
        system_prompt = config.get("system_prompt", 
            "You are a helpful AI assistant.")
        self.conversation_history.append(Message("system", system_prompt))

        # HTTP会话
        self.session: Optional[aiohttp.ClientSession] = None

    def on_start(self, ten_env: TenEnv) -> None:
        """启动扩展"""
        ten_env.log_info(f"LLM extension starting with model: {self.model}")

        # 初始化HTTP会话
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        self.session = aiohttp.ClientSession(headers=headers)

        ten_env.on_start_done()

    def on_stop(self, ten_env: TenEnv) -> None:
        """停止扩展"""
        if self.session:
            import asyncio
            asyncio.create_task(self.session.close())

        ten_env.log_info("LLM extension stopped")
        ten_env.on_stop_done()

    def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
        """处理用户输入"""
        user_input = cmd_data.get("text", "")

        if not user_input:
            ten_env.log_warning("Received empty input")
            return

        ten_env.log_info(f"Processing input: {user_input[:50]}...")

        # 添加用户消息到历史
        self.conversation_history.append(Message("user", user_input))

        # 调用LLM
        import asyncio

        async def call_llm():
            try:
                response = await self._call_api(ten_env)
                return response
            except Exception as e:
                ten_env.log_error(f"LLM API error: {e}")
                return "抱歉,我现在无法回答这个问题。"

        # 在事件循环中执行
        loop = asyncio.get_event_loop()
        response_text = loop.run_until_complete(call_llm())

        # 添加助手回复到历史
        self.conversation_history.append(Message("assistant", response_text))

        # 维护历史长度
        while len(self.conversation_history) > self.max_history + 1:
            # 保留系统消息,删除最早的用户-助手对
            self.conversation_history.pop(1)

        # 发送回复
        response_cmd = {"text": response_text}
        ten_env.send_cmd(response_cmd)

    async def _call_api(self, ten_env: TenEnv) -> str:
        """
        调用LLM API
        参数:
            ten_env: TEN运行时环境
        返回:
            模型生成的回复文本
        """
        if not self.session:
            raise RuntimeError("HTTP session not initialized")

        # 构造请求
        messages = [msg.to_dict() for msg in self.conversation_history]
        payload = {
            "model": self.model,
            "messages": messages,
            "max_tokens": self.max_tokens,
            "temperature": self.temperature
        }

        # 发送请求
        url = f"{self.api_base}/chat/completions"
        async with self.session.post(url, json=payload) as resp:
            if resp.status != 200:
                error_text = await resp.text()
                raise RuntimeError(f"API returned {resp.status}: {error_text}")

            result = await resp.json()
            choices = result.get("choices", [])

            if not choices:
                return "抱歉,我没有得到有效的回复。"

            return choices[0]["message"]["content"]

    def on_reset(self, ten_env: TenEnv, cmd_data: dict) -> None:
        """
        重置对话历史
        当收到reset命令时调用
        """
        # 保留系统消息,清空对话历史
        system_msg = self.conversation_history[0]
        self.conversation_history = [system_msg]

        ten_env.log_info("Conversation history reset")

        # 发送确认
        ten_env.send_cmd({"status": "reset_done"})


# 导出扩展
ten.add_extension("llm", LLMExtension)

这个LLM扩展实现了以下功能:

  • 维护对话历史,支持多轮对话
  • 调用OpenAI兼容API进行文本生成
  • 自动管理历史长度,防止内存溢出
  • 支持自定义系统提示词

编写输入输出扩展

除了核心处理逻辑,我们还需要输入输出扩展来处理与外部系统的交互。

WebSocket输入扩展

创建extensions/ws_input_extension.py

"""
WebSocket输入扩展
接收来自WebSocket连接的文本输入
"""

import asyncio
import websockets
from typing import Optional
import ten
from ten import TenEnv


class WSInputExtension(TenExtension):
    """
    WebSocket输入扩展
    监听WebSocket连接,将收到的消息转发给下游扩展
    """

    def __init__(self, config: dict):
        super().__init__(config)

        # 监听配置
        self.host = config.get("host", "0.0.0.0")
        self.port = config.get("port", 8080)

        # 连接集合
        self.connections: set = set()

        # 服务器实例
        self.server: Optional[websockets.WebSocketServer] = None

    def on_start(self, ten_env: TenEnv) -> None:
        """启动WebSocket服务器"""
        ten_env.log_info(f"Starting WebSocket server on {self.host}:{self.port}")

        # 创建并启动服务器
        loop = asyncio.get_event_loop()
        loop.create_task(self._start_server())

        ten_env.on_start_done()

    async def _start_server(self) -> None:
        """异步启动WebSocket服务器"""
        self.server = await websockets.serve(
            self._handle_connection,
            self.host,
            self.port
        )
        ten_env = self.get_ten_env()
        ten_env.log_info("WebSocket server started")

    async def _handle_connection(self, websocket, path: str) -> None:
        """
        处理新的WebSocket连接
        参数:
            websocket: WebSocket连接对象
            path: 请求路径
        """
        self.connections.add(websocket)
        ten_env = self.get_ten_env()
        ten_env.log_info(f"New connection established: {path}")

        try:
            async for message in websocket:
                # 解析消息
                try:
                    data = json.loads(message)
                except json.JSONDecodeError:
                    # 如果不是JSON,当作文本处理
                    data = {"text": message}

                # 发送到下游扩展
                self.send_to_graph(data)

        except websockets.exceptions.ConnectionClosed:
            ten_env.log_info("Connection closed")
        finally:
            self.connections.remove(websocket)

    def send_to_graph(self, data: dict) -> None:
        """发送数据到图中其他扩展"""
        ten_env = self.get_ten_env()
        ten_env.send_cmd(data, dest="llm")

    def on_stop(self, ten_env: TenEnv) -> None:
        """停止服务器"""
        if self.server:
            self.server.close()
            asyncio.run(self.server.wait_closed())

        ten_env.log_info("WebSocket server stopped")
        ten_env.on_stop_done()


# 导出扩展
ten.add_extension("ws_input", WSInputExtension)

WebSocket输出扩展

创建extensions/ws_output_extension.py

"""
WebSocket输出扩展
接收处理结果,通过WebSocket发送给客户端
"""

import json
import asyncio
import websockets
from typing import Set
import ten
from ten import TenEnv


class WSOutputExtension(TenExtension):
    """
    WebSocket输出扩展
    从上游扩展接收结果,广播给所有连接的客户端
    """

    def __init__(self, config: dict):
        super().__init__(config)
        self.clients: Set[websockets.WebSocketClientProtocol] = set()

    def on_start(self, ten_env: TenEnv) -> None:
        """扩展启动"""
        ten_env.log_info("WebSocket output extension started")
        ten_env.on_start_done()

    def on_stop(self, ten_env: TenEnv) -> None:
        """扩展停止"""
        # 关闭所有客户端连接
        asyncio.run(self._close_all_clients())
        ten_env.on_stop_done()

    async def _close_all_clients(self) -> None:
        """关闭所有客户端连接"""
        for client in list(self.clients):
            try:
                await client.close()
            except Exception:
                pass
        self.clients.clear()

    def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
        """
        接收来自上游扩展的输出
        参数:
            cmd_data: 包含回复文本的数据字典
        """
        text = cmd_data.get("text", "")

        if not text:
            return

        ten_env.log_info(f"Broadcasting response: {text[:50]}...")

        # 广播给所有客户端
        asyncio.run(self._broadcast(text))

    async def _broadcast(self, text: str) -> None:
        """
        向所有客户端广播消息
        参数:
            text: 要广播的文本内容
        """
        if not self.clients:
            return

        message = json.dumps({"type": "response", "text": text})

        # 并发发送给所有客户端
        await asyncio.gather(
            *[client.send(message) for client in list(self.clients)],
            return_exceptions=True
        )


# 注册客户端连接(由外部调用)
def register_client(websocket) -> None:
    """注册新的WebSocket客户端"""
    global _ws_output_instance
    if _ws_output_instance:
        _ws_output_instance.clients.add(websocket)


def unregister_client(websocket) -> None:
    """取消注册WebSocket客户端"""
    global _ws_output_instance
    if _ws_output_instance:
        _ws_output_instance.clients.discard(websocket)


# 导出扩展
ten.add_extension("ws_output", WSOutputExtension)

# 用于跨模块访问的全局变量
_ws_output_instance: WSOutputExtension = None

定义图配置

扩展编写完成后,我们需要定义扩展之间的连接关系。TEN框架使用JSON格式的图配置文件。

创建graphs/chat_graph.json

{
    "graph": {
        "name": "chat_graph",
        "version": "1.0.0"
    },
    "extensions": {
        "ws_input": {
            "type": "ws_input",
            "config": {
                "host": "0.0.0.0",
                "port": 8080
            }
        },
        "llm": {
            "type": "llm",
            "config": {
                "api_base": "https://api.openai.com/v1",
                "api_key": "${OPENAI_API_KEY}",
                "model": "gpt-3.5-turbo",
                "max_tokens": 1000,
                "temperature": 0.7,
                "system_prompt": "你是一个友好的AI助手。请用简洁、有帮助的方式回答问题。"
            }
        },
        "ws_output": {
            "type": "ws_output",
            "config": {}
        }
    },
    "connections": [
        {
            "from": "ws_input",
            "to": "llm",
            "port": {
                "out": "cmd",
                "in": "cmd"
            }
        },
        {
            "from": "llm",
            "to": "ws_output",
            "port": {
                "out": "cmd",
                "in": "cmd"
            }
        }
    ]
}

配置说明:

  • extensions部分定义了图中使用的所有扩展及其配置
  • ${OPENAI_API_KEY}是环境变量引用,会在运行时被替换为实际值
  • connections部分定义了扩展之间的连接关系和数据流向

启动应用

创建启动脚本run_chat.py

#!/usr/bin/env python3
"""
聊天应用启动脚本
"""

import os
import sys
import signal
import argparse

import ten
from ten import TenEnv


def parse_args():
    """解析命令行参数"""
    parser = argparse.ArgumentParser(description="TEN Chat Application")
    parser.add_argument(
        "--config",
        "-c",
        type=str,
        default="graphs/chat_graph.json",
        help="Path to graph configuration file"
    )
    parser.add_argument(
        "--log-level",
        "-l",
        type=str,
        default="info",
        choices=["debug", "info", "warning", "error"],
        help="Logging level"
    )
    parser.add_argument(
        "--port",
        "-p",
        type=int,
        default=8080,
        help="WebSocket server port"
    )
    return parser.parse_args()


def main():
    """主函数"""
    args = parse_args()

    # 加载图配置
    if not os.path.exists(args.config):
        print(f"Error: Configuration file not found: {args.config}")
        sys.exit(1)

    print(f"Loading graph configuration from: {args.config}")

    # 创建TEN运行时
    ten_env = TenEnv.create(args.config)

    # 设置日志级别
    ten_env.set_log_level(args.log_level)

    # 注册信号处理
    def signal_handler(signum, frame):
        print("\nShutting down...")
        ten_env.stop()

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    print("Starting TEN Chat Application...")
    print(f"WebSocket server will listen on port: {args.port}")
    print("Press Ctrl+C to stop")

    # 启动TEN运行时
    ten_env.run()


if __name__ == "__main__":
    main()

运行应用前,需要设置环境变量:

export OPENAI_API_KEY="your-api-key-here"
python run_chat.py

如果一切正常,你应该能看到类似以下的输出:

Loading graph configuration from: graphs/chat_graph.json
Starting TEN Chat Application...
WebSocket server will listen on port: 8080
Press Ctrl+C to stop
[TEN] INFO: Extension 'ws_input' started
[TEN] INFO: Extension 'llm' started
[TEN] INFO: Extension 'ws_output' started

测试应用

应用启动后,我们可以用WebSocket客户端进行测试。

创建测试脚本test_client.py

#!/usr/bin/env python3
"""
WebSocket测试客户端
"""

import asyncio
import websockets
import json


async def test_chat():
    """测试聊天功能"""
    uri = "ws://localhost:8080"

    print(f"Connecting to {uri}...")

    try:
        async with websockets.connect(uri) as websocket:
            print("Connected! Start chatting (type 'quit' to exit):\n")

            # 接收消息任务
            async def receive():
                try:
                    async for message in websocket:
                        data = json.loads(message)
                        if data.get("type") == "response":
                            print(f"\nAI: {data['text']}\n")
                            print("You: ", end="", flush=True)
                except websockets.exceptions.ConnectionClosed:
                    print("\nConnection closed by server")

            # 启动接收任务
            receive_task = asyncio.create_task(receive())

            # 发送消息循环
            while True:
                user_input = input("You: ")

                if user_input.lower() in ["quit", "exit", "q"]:
                    break

                if not user_input.strip():
                    continue

                # 发送消息
                await websocket.send(json.dumps({"text": user_input}))

            # 取消接收任务
            receive_task.cancel()

    except websockets.exceptions.ConnectionRefused:
        print("Connection refused. Is the server running?")
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    asyncio.run(test_chat())

运行测试客户端:

python test_client.py

你应该能看到这样的对话:

Connecting to ws://localhost:8080...
Connected! Start chatting (type 'quit' to exit):

You: 你好请介绍一下你自己
AI: 你好我是...实际输出取决于你配置的API和模型

You: 刚才说的什么能再说一遍吗
AI: 当然可以...展示了多轮对话的能力

You: quit
Connection closed by server

进阶功能:语音交互

文本对话只是基础,让我们再添加语音交互能力。这需要用到ASR(自动语音识别)和TTS(文本转语音)扩展。

ASR扩展

创建extensions/asr_extension.py

"""
ASR扩展:自动语音识别
使用Whisper API进行语音识别
"""

import base64
import json
import aiohttp
from typing import Optional
import ten
from ten import TenEnv


class ASRExtension(TenExtension):
    """
    ASR扩展
    将接收到的音频转换为文本
    """

    def __init__(self, config: dict):
        super().__init__(config)

        # API配置
        self.api_base = config.get("api_base", "https://api.openai.com/v1")
        self.api_key = config.get("api_key", "")
        self.model = config.get("model", "whisper-1")

        # HTTP会话
        self.session: Optional[aiohttp.ClientSession] = None

    def on_start(self, ten_env: TenEnv) -> None:
        """启动扩展"""
        ten_env.log_info("ASR extension starting")

        headers = {
            "Authorization": f"Bearer {self.api_key}"
        }
        self.session = aiohttp.ClientSession(headers=headers)

        ten_env.on_start_done()

    def on_stop(self, ten_env: TenEnv) -> None:
        """停止扩展"""
        if self.session:
            asyncio.create_task(self.session.close())

        ten_env.log_info("ASR extension stopped")
        ten_env.on_stop_done()

    def on_audio_frame(self, ten_env: TenEnv, frame_data: bytes) -> None:
        """
        接收音频帧
        参数:
            ten_env: TEN运行时环境
            frame_data: 原始音频数据
        """
        ten_env.log_info(f"Received audio frame: {len(frame_data)} bytes")

        # 累积音频数据直到完整
        # 简化实现:每次收到都发送进行识别
        # 生产环境应该做VAD(语音活动检测)
        asyncio.run(self._process_audio(ten_env, frame_data))

    async def _process_audio(self, ten_env: TenEnv, audio_data: bytes) -> None:
        """
        处理音频数据
        参数:
            ten_env: TEN运行时环境
            audio_data: 音频数据
        """
        if not self.session:
            ten_env.log_error("HTTP session not initialized")
            return

        try:
            # 发送识别请求
            url = f"{self.api_base}/audio/transcriptions"

            form_data = aiohttp.FormData()
            form_data.add_field(
                "model",
                self.model,
            )
            form_data.add_field(
                "file",
                audio_data,
                filename="audio.wav",
                content_type="audio/wav"
            )

            async with self.session.post(url, data=form_data) as resp:
                if resp.status != 200:
                    error = await resp.text()
                    ten_env.log_error(f"ASR API error: {error}")
                    return

                result = await resp.json()
                text = result.get("text", "")

                if text:
                    ten_env.log_info(f"Recognized: {text}")
                    # 发送识别结果给LLM
                    ten_env.send_cmd({"text": text}, dest="llm")
                else:
                    ten_env.log_warning("No text recognized")

        except Exception as e:
            ten_env.log_error(f"ASR processing error: {e}")


# 导出扩展
ten.add_extension("asr", ASRExtension)

TTS扩展

创建extensions/tts_extension.py

"""
TTS扩展:文本转语音
使用OpenAI TTS API进行语音合成
"""

import base64
import aiohttp
from typing import Optional
import ten
from ten import TenEnv


class TTSExtension(TenExtension):
    """
    TTS扩展
    将文本转换为语音输出
    """

    def __init__(self, config: dict):
        super().__init__(config)

        # API配置
        self.api_base = config.get("api_base", "https://api.openai.com/v1")
        self.api_key = config.get("api_key", "")
        self.model = config.get("model", "tts-1")
        self.voice = config.get("voice", "alloy")
        self.output_format = config.get("output_format", "mp3")

        # HTTP会话
        self.session: Optional[aiohttp.ClientSession] = None

    def on_start(self, ten_env: TenEnv) -> None:
        """启动扩展"""
        ten_env.log_info("TTS extension starting")

        headers = {
            "Authorization": f"Bearer {self.api_key}"
        }
        self.session = aiohttp.ClientSession(headers=headers)

        ten_env.on_start_done()

    def on_stop(self, ten_env: TenEnv) -> None:
        """停止扩展"""
        if self.session:
            asyncio.create_task(self.session.close())

        ten_env.log_info("TTS extension stopped")
        ten_env.on_stop_done()

    def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
        """
        接收需要转换为语音的文本
        参数:
            cmd_data: 包含text字段的数据字典
        """
        text = cmd_data.get("text", "")

        if not text:
            return

        ten_env.log_info(f"Synthesizing: {text[:50]}...")

        # 异步合成
        asyncio.run(self._synthesize(ten_env, text))

    async def _synthesize(self, ten_env: TenEnv, text: str) -> None:
        """
        调用TTS API合成语音
        参数:
            ten_env: TEN运行时环境
            text: 要合成的文本
        """
        if not self.session:
            ten_env.log_error("HTTP session not initialized")
            return

        try:
            url = f"{self.api_base}/audio/speech"

            payload = {
                "model": self.model,
                "input": text,
                "voice": self.voice,
                "response_format": self.output_format
            }

            async with self.session.post(url, json=payload) as resp:
                if resp.status != 200:
                    error = await resp.text()
                    ten_env.log_error(f"TTS API error: {error}")
                    return

                # 获取音频数据
                audio_data = await resp.read()

                ten_env.log_info(f"Synthesized audio: {len(audio_data)} bytes")

                # 发送音频数据
                # 这里可以添加格式转换
                ten_env.send_audio_frame(audio_data)

        except Exception as e:
            ten_env.log_error(f"TTS synthesis error: {e}")


# 导出扩展
ten.add_extension("tts", TTSExtension)

音频图配置

创建graphs/voice_chat_graph.json

{
    "graph": {
        "name": "voice_chat_graph",
        "version": "1.0.0"
    },
    "extensions": {
        "audio_input": {
            "type": "audio_input",
            "config": {
                "sample_rate": 16000,
                "channels": 1,
                "format": "pcm_s16le"
            }
        },
        "asr": {
            "type": "asr",
            "config": {
                "api_base": "https://api.openai.com/v1",
                "api_key": "${OPENAI_API_KEY}",
                "model": "whisper-1"
            }
        },
        "llm": {
            "type": "llm",
            "config": {
                "api_base": "https://api.openai.com/v1",
                "api_key": "${OPENAI_API_KEY}",
                "model": "gpt-3.5-turbo",
                "system_prompt": "你是一个友好的语音助手。请用简洁的口语风格回答问题。"
            }
        },
        "tts": {
            "type": "tts",
            "config": {
                "api_base": "https://api.openai.com/v1",
                "api_key": "${OPENAI_API_KEY}",
                "model": "tts-1",
                "voice": "alloy"
            }
        },
        "audio_output": {
            "type": "audio_output",
            "config": {
                "sample_rate": 24000,
                "format": "mp3"
            }
        }
    },
    "connections": [
        {
            "from": "audio_input",
            "to": "asr",
            "port": {
                "out": "audio",
                "in": "audio_frame"
            }
        },
        {
            "from": "asr",
            "to": "llm",
            "port": {
                "out": "cmd",
                "in": "cmd"
            }
        },
        {
            "from": "llm",
            "to": "tts",
            "port": {
                "out": "cmd",
                "in": "cmd"
            }
        },
        {
            "from": "tts",
            "to": "audio_output",
            "port": {
                "out": "audio_frame",
                "in": "audio"
            }
        }
    ]
}

现在,数据流变成了:

用户音频 → audio_input → asr → llm → tts → audio_output → 用户播放

这是一个完整的语音对话pipeline!


扩展开发的最佳实践

经过前面的实战演练,你已经掌握了TEN扩展开发的基础。现在,让我们来总结一些扩展开发的最佳实践,这些都是从实际项目经验中提炼出来的。

错误处理与容错

健壮的错误处理是生产级应用的必备素质。

使用try-except包裹所有可能失败的代码

def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
    try:
        # 处理逻辑
        result = self.process_input(cmd_data)
        ten_env.send_cmd({"result": result})
    except ValueError as e:
        ten_env.log_error(f"Invalid input: {e}")
        ten_env.send_cmd({"error": "Invalid input format"})
    except Exception as e:
        ten_env.log_error(f"Unexpected error: {e}")
        ten_env.send_cmd({"error": "Internal error"})

实现重试机制

import asyncio

async def call_api_with_retry(self, payload: dict, max_retries: int = 3) -> dict:
    """
    带重试的API调用
    参数:
        payload: 请求数据
        max_retries: 最大重试次数
    返回:
        API响应数据
    """
    for attempt in range(max_retries):
        try:
            response = await self._call_api_once(payload)
            return response
        except RateLimitError:
            wait_time = 2 ** attempt  # 指数退避
            ten_env.log_warning(f"Rate limited, waiting {wait_time}s")
            await asyncio.sleep(wait_time)
        except TemporaryError as e:
            if attempt < max_retries - 1:
                await asyncio.sleep(1)
            else:
                raise

    raise RuntimeError("Max retries exceeded")

实现熔断器模式

class CircuitBreaker:
    """
    熔断器实现
    防止级联故障
    """

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half_open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half_open"
            else:
                raise CircuitOpenError("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            if self.state == "half_open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()

            if self.failures >= self.failure_threshold:
                self.state = "open"

            raise

性能优化

性能是实时应用的生命线。以下是一些优化技巧。

批量处理消息

class BatchProcessor:
    """
    批量处理器
    将多条消息合并处理,减少API调用次数
    """

    def __init__(self, batch_size: int = 10, batch_timeout: float = 0.5):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.buffer: List[dict] = []
        self.last_process_time = time.time()
        self.processing = False

    def add(self, item: dict) -> None:
        """添加消息到缓冲区"""
        self.buffer.append(item)

        # 检查是否需要触发处理
        if len(self.buffer) >= self.batch_size:
            self.process()
        elif time.time() - self.last_process_time > self.batch_timeout:
            self.process()

    def process(self) -> None:
        """处理缓冲区中的所有消息"""
        if not self.buffer or self.processing:
            return

        self.processing = True
        items = self.buffer
        self.buffer = []
        self.last_process_time = time.time()

        try:
            # 批量处理
            self.batch_process(items)
        finally:
            self.processing = False

    def batch_process(self, items: List[dict]) -> None:
        """子类实现的批量处理逻辑"""
        raise NotImplementedError

使用异步I/O

class AsyncExtension(TenExtension):
    """
    异步扩展基类
    所有I/O操作都应该异步化
    """

    def __init__(self, config: dict):
        super().__init__(config)
        self._loop: Optional[asyncio.AbstractEventLoop] = None

    def on_start(self, ten_env: TenEnv) -> None:
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)
        ten_env.on_start_done()

    def on_stop(self, ten_env: TenEnv) -> None:
        if self._loop:
            pending = asyncio.all_tasks(self._loop)
            for task in pending:
                task.cancel()

            self._loop.run_until_complete(
                asyncio.gather(*pending, return_exceptions=True)
            )
            self._loop.close()

        ten_env.on_stop_done()

    def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
        # 使用create_task异步处理,不要阻塞
        asyncio.create_task(self._handle_cmd_async(ten_env, cmd_data))

    async def _handle_cmd_async(self, ten_env: TenEnv, cmd_data: dict) -> None:
        """异步处理命令"""
        try:
            result = await self._process_async(cmd_data)
            ten_env.send_cmd({"result": result})
        except Exception as e:
            ten_env.log_error(f"Error: {e}")

日志与调试

好的日志是调试的利器。

结构化日志

def log_interaction(self, ten_env: TenEnv, user_id: str, 
                    input_type: str, output_type: str, 
                    duration_ms: float) -> None:
    """
    记录交互日志
    参数:
        ten_env: TEN运行时环境
        user_id: 用户标识
        input_type: 输入类型(text/audio/image)
        output_type: 输出类型
        duration_ms: 处理耗时(毫秒)
    """
    log_data = {
        "event": "interaction",
        "user_id": user_id,
        "input_type": input_type,
        "output_type": output_type,
        "duration_ms": duration_ms,
        "timestamp": datetime.now().isoformat()
    }

    ten_env.log_info(json.dumps(log_data))

分级日志

class LoggedExtension(TenExtension):
    """
    带日志功能的扩展基类
    根据配置控制日志级别
    """

    def __init__(self, config: dict):
        super().__init__(config)
        self.debug_mode = config.get("debug_mode", False)

    def debug(self, ten_env: TenEnv, message: str) -> None:
        """调试日志"""
        if self.debug_mode:
            ten_env.log_info(f"[DEBUG] {message}")

    def trace(self, ten_env: TenEnv, message: str, data: dict) -> None:
        """追踪日志,包含详细数据"""
        if self.debug_mode:
            ten_env.log_info(f"[TRACE] {message}: {json.dumps(data)}")

常见使用场景

TEN框架的设计非常灵活,可以适应多种使用场景。让我介绍几个典型的应用场景以及对应的架构设计。

场景一:智能客服机器人

智能客服是最常见的使用场景之一。用户可以通过文本或语音与企业客服系统对话,获得24/7的服务。

架构设计

客户端 → WebSocket网关 → 意图识别LLM → 知识库检索 → 回复生成LLM → 客户端
                ↓
           人工客服转接(当需要时)

关键扩展配置

{
    "extensions": {
        "intent_classifier": {
            "type": "llm",
            "config": {
                "system_prompt": "你是一个客服意图分类器。根据用户输入,分类其意图:\
                    1-产品咨询 2-投诉建议 3-订单查询 4-技术支持 5-其他。请只输出数字。"
            }
        },
        "knowledge_base": {
            "type": "knowledge_retrieval",
            "config": {
                "vector_db": "milvus",
                "collection": "product_faq",
                "top_k": 3
            }
        },
        "response_generator": {
            "type": "llm",
            "config": {
                "system_prompt": "你是一个专业的客服助手。根据检索到的知识库内容,\
                    用友好、专业的语气回答用户问题。"
            }
        }
    }
}

场景二:实时翻译助手

实时翻译是另一个实用场景。结合ASR、MT(机器翻译)和TTS,可以实现语音实时翻译。

架构设计

源语音 → ASR → 语言检测 → 机器翻译 → TTS → 目标语音
         ↓
      文字显示

关键代码

class TranslationPipeline(TenExtension):
    """
    翻译处理管道
    组合多个服务完成翻译任务
    """

    def __init__(self, config: dict):
        super().__init__(config)

        # 支持的语言对
        self.source_lang = config.get("source_lang", "auto")
        self.target_lang = config.get("target_lang", "zh")

        # 各环节的API配置
        self.asr_config = config.get("asr", {})
        self.mt_config = config.get("mt", {})
        self.tts_config = config.get("tts", {})

    async def translate(self, audio_data: bytes) -> bytes:
        """
        完整翻译流程
        参数:
            audio_data: 源语言音频
        返回:
            目标语言音频
        """
        # 1. 语音识别
        source_text = await self.asr.recognize(audio_data)
        self.debug(f"Recognized: {source_text}")

        # 2. 语言检测(如果源语言是auto)
        if self.source_lang == "auto":
            detected_lang = await self.detect_language(source_text)
            self.debug(f"Detected language: {detected_lang}")

        # 3. 机器翻译
        target_text = await self.mt.translate(
            source_text,
            source_lang=self.source_lang,
            target_lang=self.target_lang
        )
        self.debug(f"Translated: {target_text}")

        # 4. 语音合成
        target_audio = await self.tts.synthesize(
            target_text,
            language=self.target_lang
        )

        return target_audio

场景三:AI陪伴应用

AI陪伴是一种新兴的应用形态,AI角色需要保持长期记忆、情感交互、个性化回复等能力。

架构设计

用户输入 → 情感分析 → 记忆检索 → LLM生成 → 记忆存储 → 回复输出
              ↓
           性格调整

记忆系统设计

class MemorySystem:
    """
    AI陪伴的记忆系统
    实现短期记忆、长期记忆和人格记忆
    """

    def __init__(self, user_id: str):
        self.user_id = user_id

        # 短期记忆:最近N轮对话
        self.short_term: List[dict] = []
        self.short_term_limit = 10

        # 长期记忆:重要事件摘要
        self.long_term: List[dict] = []

        # 人格记忆:用户偏好、习惯等
        self.persona: dict = {}

        # 存储后端(这里用简单的内存存储,生产环境应该用数据库)
        self.storage = MemoryStorage(user_id)

        # 加载已有记忆
        self._load()

    def add_interaction(self, user_input: str, ai_response: str) -> None:
        """
        添加一轮对话到记忆
        参数:
            user_input: 用户输入
            ai_response: AI回复
        """
        interaction = {
            "user": user_input,
            "ai": ai_response,
            "timestamp": time.time()
        }

        # 添加到短期记忆
        self.short_term.append(interaction)

        # 超过限制,提取摘要后移入长期记忆
        if len(self.short_term) > self.short_term_limit:
            summary = self._summarize(self.short_term)
            self.long_term.append(summary)
            self.short_term = self.short_term[-self.short_term_limit:]

            # 保存到持久化存储
            self._save()

    def get_context(self) -> str:
        """
        获取用于LLM的上下文信息
        返回:
            格式化的上下文字符串
        """
        parts = []

        # 添加长期记忆
        if self.long_term:
            parts.append("=== 重要记忆 ===")
            for memory in self.long_term[-3:]:
                parts.append(f"- {memory['summary']}")

        # 添加近期对话
        if self.short_term:
            parts.append("\n=== 最近对话 ===")
            for interaction in self.short_term[-5:]:
                parts.append(f"用户: {interaction['user']}")
                parts.append(f"助手: {interaction['ai']}")

        return "\n".join(parts)

    def update_persona(self, key: str, value: any) -> None:
        """更新用户画像"""
        self.persona[key] = value
        self._save()

    def _load(self) -> None:
        """从存储加载记忆"""
        data = self.storage.load()
        if data:
            self.long_term = data.get("long_term", [])
            self.persona = data.get("persona", {})

    def _save(self) -> None:
        """保存记忆到存储"""
        self.storage.save({
            "long_term": self.long_term,
            "persona": self.persona
        })

    def _summarize(self, interactions: List[dict]) -> dict:
        """
        总结多轮对话,提炼关键信息
        简化实现:提取关键词
        生产环境应该调用LLM进行总结
        """
        all_text = " ".join([
            i["user"] + " " + i["ai"] 
            for i in interactions
        ])

        # 简单的关键词提取
        keywords = self._extract_keywords(all_text)

        return {
            "summary": f"讨论了: {', '.join(keywords)}",
            "timestamp": time.time()
        }

    def _extract_keywords(self, text: str) -> List[str]:
        """提取关键词(简化实现)"""
        # 这里应该使用NLP库进行关键词提取
        # 简化版本只是按空格分词
        words = text.split()
        # 过滤停用词
        stopwords = {"的", "了", "是", "在", "我", "有", "和", "就", "不", "人"}
        keywords = [w for w in words if len(w) > 2 and w not in stopwords]
        return list(set(keywords))[:5]


class MemoryStorage:
    """记忆存储基类"""

    def __init__(self, user_id: str):
        self.user_id = user_id
        self.path = f"data/memory_{user_id}.json"

    def load(self) -> dict:
        """加载记忆"""
        if os.path.exists(self.path):
            with open(self.path, "r", encoding="utf-8") as f:
                return json.load(f)
        return {}

    def save(self, data: dict) -> None:
        """保存记忆"""
        os.makedirs(os.path.dirname(self.path), exist_ok=True)
        with open(self.path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

部署与运维

代码开发完成后,如何把它部署到生产环境呢?这一部分我们将讨论部署方案和运维实践。

Docker容器化部署

容器化是现代应用部署的标准做法。让我们为TEN应用创建一个Dockerfile。

# 基础镜像
FROM python:3.11-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    ffmpeg \
    libsndfile1 \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建数据目录
RUN mkdir -p /app/data /app/logs

# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV LOG_LEVEL=info

# 暴露端口
EXPOSE 8080

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8080/health')"

# 启动命令
CMD ["python", "run_chat.py"]

创建docker-compose.yml来管理服务:

version: "3.8"

services:
  ten-app:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8080:8080"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - LOG_LEVEL=info
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  # 可选:Redis缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    restart: unless-stopped

volumes:
  redis-data:

Kubernetes部署

对于大规模部署,Kubernetes是更好的选择。

创建k8s/deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ten-chat
  labels:
    app: ten-chat
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ten-chat
  template:
    metadata:
      labels:
        app: ten-chat
    spec:
      containers:
      - name: ten-app
        image: your-registry/ten-chat:latest
        ports:
        - containerPort: 8080
          name: websocket
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-api-key
        - name: LOG_LEVEL
          value: "info"
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 10
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - ten-chat
              topologyKey: "kubernetes.io/hostname"

---
apiVersion: v1
kind: Service
metadata:
  name: ten-chat-service
spec:
  type: LoadBalancer
  selector:
    app: ten-chat
  ports:
  - port: 80
    targetPort: 8080
    name: websocket

监控与告警

生产环境离不开监控。让我们为TEN应用添加监控能力。

Prometheus指标

from prometheus_client import Counter, Histogram, Gauge, start_http_server


class MetricsExtension(TenExtension):
    """
    监控指标扩展
    暴露Prometheus格式的指标
    """

    # 定义指标
    request_count = Counter(
        "ten_requests_total",
        "Total number of requests",
        ["extension", "type"]
    )

    request_duration = Histogram(
        "ten_request_duration_seconds",
        "Request processing duration",
        ["extension", "type"],
        buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
    )

    active_connections = Gauge(
        "ten_active_connections",
        "Number of active connections"
    )

    error_count = Counter(
        "ten_errors_total",
        "Total number of errors",
        ["extension", "error_type"]
    )

    def __init__(self, config: dict):
        super().__init__(config)
        self.port = config.get("metrics_port", 9090)

    def on_start(self, ten_env: TenEnv) -> None:
        # 启动Prometheus HTTP服务器
        start_http_server(self.port)
        ten_env.log_info(f"Metrics server started on port {self.port}")
        ten_env.on_start_done()

    def record_request(self, extension: str, request_type: str, 
                       duration: float) -> None:
        """记录请求"""
        self.request_count.labels(extension, request_type).inc()
        self.request_duration.labels(extension, request_type).observe(duration)

    def record_error(self, extension: str, error_type: str) -> None:
        """记录错误"""
        self.error_count.labels(extension, error_type).inc()


# 导出扩展
ten.add_extension("metrics", MetricsExtension)

Grafana仪表盘配置

# grafana/dashboards/ten-overview.json
{
  "dashboard": {
    "title": "TEN Framework Overview",
    "panels": [
      {
        "title": "Request Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(ten_requests_total[5m])",
            "legendFormat": "{{extension}} - {{type}}"
          }
        ]
      },
      {
        "title": "Request Latency (p95)",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(ten_request_duration_seconds_bucket[5m]))",
            "legendFormat": "{{extension}}"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(ten_errors_total[5m]) / rate(ten_requests_total[5m])",
            "legendFormat": "{{extension}} - {{error_type}}"
          }
        ]
      },
      {
        "title": "Active Connections",
        "type": "stat",
        "targets": [
          {
            "expr": "ten_active_connections"
          }
        ]
      }
    ]
  }
}

故障排查指南

在实际使用中,难免会遇到各种问题。以下是常见问题及其解决方案。

连接问题

WebSocket连接失败

如果客户端无法连接到TEN服务器,首先检查以下几点:

# 1. 确认服务是否正在运行
ps aux | grep python
netstat -tlnp | grep 8080

# 2. 检查防火墙规则
sudo iptables -L -n | grep 8080

# 3. 查看服务日志
tail -f logs/ten.log | grep -i error

连接被重置

这通常是由于超时设置引起的。调整WebSocket配置:

# 在ws_input扩展中添加超时配置
async def _handle_connection(self, websocket, path: str) -> None:
    try:
        async for message in websocket:
            # 处理消息,保持连接活跃
            await self.process_message(websocket, message)
    except websockets.exceptions.ConnectionClosed:
        logger.info("Client disconnected")
    except Exception as e:
        logger.error(f"Connection error: {e}")

性能问题

延迟过高

延迟过高可能由多个因素导致:

  1. 网络延迟:检查客户端与服务器之间的网络状况
  2. API响应时间:监控LLM API的响应时间,考虑使用更快的模型
  3. 处理瓶颈:使用profiling工具定位瓶颈
# 添加性能监控
import time

def on_cmd(self, ten_env: TenEnv, cmd_data: dict) -> None:
    start_time = time.time()

    try:
        result = self.process(cmd_data)
        duration = time.time() - start_time

        ten_env.log_info(f"Processed in {duration*1000:.2f}ms")

        # 发送监控指标
        ten_env.send_cmd({"metric": "latency", "value": duration}, dest="metrics")

    except Exception as e:
        ten_env.log_error(f"Processing error: {e}")

内存占用过高

长期运行的应用可能出现内存泄漏。使用以下方法排查:

# 使用memory_profiler监控内存
pip install memory_profiler
python -m memory_profiler run_chat.py

# 生成内存快照
import tracemalloc
tracemalloc.start()

# 在应用运行中
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
    print(stat)

扩展加载问题

扩展找不到

确保扩展文件在正确的位置,并且名称正确:

# 检查扩展目录结构
ls -la extensions/

# 扩展文件应该有正确的命名
# extensions/my_extension.py 应该是 class MyExtension

# 在配置文件中引用正确的名称
{
    "extensions": {
        "my_extension": {
            "type": "my_extension"  # 这里的名字要与 ten.add_extension() 一致
        }
    }
}

扩展初始化失败

检查扩展的__init__方法和生命周期方法:

class MyExtension(TenExtension):
    def __init__(self, config: dict):
        super().__init__(config)  # 一定要调用父类的__init__

        # 检查必要配置
        required_fields = ["api_key", "model"]
        for field in required_fields:
            if field not in config:
                raise ValueError(f"Missing required config: {field}")

    def on_start(self, ten_env: TenEnv) -> None:
        # 不要忘记调用 on_start_done()
        try:
            self.initialize()
            ten_env.on_start_done()
        except Exception as e:
            ten_env.log_error(f"Initialization failed: {e}")
            # 初始化失败也要通知TEN运行时
            ten_env.on_start_done(error=str(e))

总结与资源推荐

经过这篇详尽的实战指南,你应该已经掌握了TEN框架的核心概念和开发技能。让我们来回顾一下学到的内容,然后探索更多学习资源。

核心要点回顾

架构理解:TEN框架采用扩展驱动的设计,核心系统精简,功能通过扩展实现。理解扩展、端口、图的概念是掌握TEN的关键。

开发模式:扩展开发遵循统一的模式——继承基类、实现生命周期方法、处理消息、发送输出。Python扩展开发门槛低,适合快速原型。

扩展类型:AI模型扩展(LLM、ASR、TTS)、通信扩展(WebSocket、WebRTC)、处理扩展(格式转换、过滤)是常见的扩展类型。

最佳实践:健壮的错误处理、性能优化、结构化日志是生产级应用的必备要素。

部署运维:Docker容器化、Kubernetes编排、监控告警构成了完整的运维体系。

相关资源

官方资源

  • TEN框架GitHub仓库:https://github.com/TEN-framework/ten-framework
  • 官方文档:https://ten-framework.readthedocs.io
  • 示例代码库:https://github.com/TEN-framework/ten-examples

社区资源

  • TEN Framework Discord社区
  • GitHub Discussions
  • 知乎专栏「TEN框架实战」

扩展生态

  • ten-extensions:社区维护的扩展集合
  • ten-vector-store:向量数据库集成
  • ten-speech:更多语音处理扩展

下一步学习建议

如果你想继续深入学习,以下是一些建议的学习方向:

C++扩展开发:虽然Python扩展上手快,但C++扩展能提供更好的性能。学习TEN框架的C++ SDK,编写高性能的原生扩展。

自定义通信协议:除了WebSocket,了解WebRTC、RTSP等实时通信协议的实现原理,为音视频应用打基础。

分布式架构:学习如何在Kubernetes环境中部署和扩展TEN应用,处理大规模并发连接。

AI模型集成:深入了解各类AI模型(LLM、ASR、TTS、Embedding)的特性和最佳实践,优化AI应用的效果。

监控与可观测性:学习OpenTelemetry、Jaeger等可观测性工具,构建完善的分布式追踪体系。

结语

TEN框架为实时AI应用开发提供了一个优雅、高效的解决方案。它的设计理念——扩展驱动、灵活组合——非常符合现代软件开发的原则。无论你是想快速原型验证,还是构建生产级系统,TEN框架都值得一试。

当然,没有任何框架是银弹。TEN框架也有其适用场景和局限性。在实际项目中,需要根据具体需求权衡选择。但我相信,掌握了TEN框架的技能,你在实时AI应用开发领域会更有底气。

最后,动手实践是最好的学习方式。建议你按照这篇指南的步骤,亲手搭建一个AI对话应用,在实践中加深理解。遇到问题时,积极参与社区讨论,你会有意想不到的收获。

祝你在TEN框架的学习之旅中有所收获!如果这篇文章对你有帮助,欢迎分享给需要的朋友。


相关AI项目推荐

  • LangChain:LLM应用开发框架,与TEN可以互补使用
  • Vectorizer:向量数据库客户端集合,支持多种向量存储
  • ** Whisper**:OpenAI开源的语音识别模型
  • Coqui TTS:开源文本转语音引擎
  • FastAPI:现代化的Python Web框架,适合构建API服务
  • Celery:分布式任务队列,适合异步处理重负载任务

这些项目都与TEN框架有潜在的集成价值,可以帮助你构建更完整的AI应用。

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注