告别聊天机器人开发噩梦,LangBot 让 AI 对话系统开发变得如此简单

告别聊天机器人开发噩梦,LangBot 让 AI 对话系统开发变得如此简单

告别聊天机器人开发噩梦,LangBot 让 AI 对话系统开发变得如此简单

你是否曾经想要快速构建一个智能对话系统,却发现自己陷入了 LangChain、Prompt Engineering、微调训练等复杂技术的泥潭?本文将向你展示一个革命性的解决方案——LangBot 项目,它让 AI 对话系统的开发变得前所未有的简单高效。


一、为什么 LangBot 值得关注

1.1 当前 AI 对话系统开发的困境

在 LangBot 出现之前,开发者构建一个功能完善的对话机器人面临着诸多挑战。传统的开发方式需要开发者同时掌握前端界面设计、后端服务架构、大语言模型调用、多轮对话状态管理、第三方平台集成等多个领域的知识。这意味着一个简单的聊天机器人项目可能需要数周甚至数月的开发时间。

更糟糕的是,当开发者好不容易完成基本功能后,往往会发现系统的扩展性极差。每添加一个新的对话场景,都需要修改大量代码。不同平台(微信、Slack、Discord 等)的接口差异也让人头疼不已。这种碎片化的开发方式不仅效率低下,而且维护成本高昂。

1.2 LangBot 的核心理念

LangBot 的诞生正是为了解决这些痛点。这个项目采用了模块化的设计思想,将复杂的对话系统拆解为多个独立但可组合的组件。开发者无需关心底层实现细节,只需要通过简单的配置文件和装饰器语法,就能快速构建出功能强大的对话机器人。

这个项目的设计哲学可以概括为三个关键词:简洁、灵活、可扩展。简洁意味着开发体验的流畅;灵活体现在对多种对话场景和平台的原生支持;可扩展则保证了系统能够随着业务需求的增长而不断演进。

1.3 项目亮点一览

LangBot 项目具有多个令人印象深刻的特点。首先,它提供了开箱即用的多平台适配能力,支持微信、Telegram、Slack、Discord 等主流即时通讯平台。其次,内置的对话管理机制支持复杂的多轮对话流程,让开发者能够轻松实现上下文理解、意图识别、实体提取等高级功能。

在技术层面,LangBot 完美兼容 LangChain 和 LangSmith 等主流框架,开发者可以在享受简洁 API 的同时,获得强大的扩展能力。项目采用异步架构设计,能够高效处理高并发场景。同时,完善的日志和调试系统让问题的排查变得轻而易举。


二、快速上手:环境搭建与基础配置

2.1 系统要求与依赖安装

在开始之前,我们需要确保开发环境满足基本要求。LangBot 支持 Python 3.10 及以上版本,推荐使用 3.11 或更高版本以获得最佳性能。以下是完整的依赖安装步骤。

# 首先创建并激活一个独立的虚拟环境(推荐做法)
python -m venv langbot-env
source langbot-env/bin/activate  # Linux/Mac
# langbot-env\Scripts\activate  # Windows

# 使用 pip 安装 LangBot 核心包
pip install langbot

# 安装完成后,验证安装是否成功
python -c "import langbot; print(langbot.__version__)"

如果你是从 GitHub 直接安装最新版本,可以使用以下命令:

# 安装最新的开发版本
pip install git+https://github.com/langbot-app/LangBot.git

# 或者先克隆仓库,然后以可编辑模式安装
git clone https://github.com/langbot-app/LangBot.git
cd LangBot
pip install -e .

2.2 创建第一个 LangBot 项目

环境搭建完成后,让我们创建一个基础项目结构。我推荐按照以下目录组织代码:

my-chatbot/
├── config/
│   ├── __init__.py
│   ├── settings.py        # 全局配置
│   └── prompts.py         # 提示词模板
├── plugins/
│   ├── __init__.py
│   ├── weather.py         # 天气查询插件
│   └── search.py          # 搜索插件
├── handlers/
│   ├── __init__.py
│   └── conversation.py    # 对话处理器
├── main.py                # 项目入口
└── requirements.txt       # 依赖清单

2.3 基础配置详解

现在让我们创建核心配置文件。以下是一个典型的配置示例:

# config/settings.py
from langbot.config import BaseSettings
from typing import Optional

class BotSettings(BaseSettings):
    """机器人全局配置类"""

    # 机器人名称,将显示在对话界面中
    bot_name: str = "LangBot Assistant"

    # 主人模式配置,开启后只有指定用户可以执行管理命令
    owner_id: Optional[str] = None

    # 日志级别,可选值:DEBUG, INFO, WARNING, ERROR
    log_level: str = "INFO"

    # 大语言模型配置
    llm_provider: str = "openai"  # 支持 openai, anthropic, huggingface 等
    llm_model: str = "gpt-4"
    llm_api_key: str = ""
    llm_base_url: Optional[str] = None  # 用于代理或自定义端点

    # 对话历史配置
    max_history_turns: int = 10  # 保留的最大对话轮次
    history_storage: str = "memory"  # memory, redis, database

    # 多轮对话超时设置(秒)
    session_timeout: int = 3600

    class Config:
        env_prefix = "LANGBOT_"
        # 这会自动将环境变量 LANGBOT_BOT_NAME 映射到 bot_name

# 创建配置实例
settings = BotSettings()

2.4 使用环境变量管理敏感信息

在实际部署中,我们不应该将 API Key 等敏感信息硬编码在代码里。LangBot 支持通过环境变量来管理这些配置:

# 在项目根目录创建 .env 文件
# .env(不要提交到版本控制系统)
LANGBOT_LLM_API_KEY=sk-your-api-key-here
LANGBOT_OWNER_ID=your-user-id
LANGBOT_LOG_LEVEL=DEBUG

# 在代码中加载环境变量
from dotenv import load_dotenv
load_dotenv()  # 这会读取 .env 文件并设置环境变量

三、核心功能深度解析

3.1 对话引擎架构

LangBot 的对话引擎是其核心组件,负责处理用户输入、路由到合适的处理器、调用大语言模型、并管理对话上下文。整个引擎的设计遵循事件驱动模式,每一个用户消息都被视为一个事件,系统根据预定义的规则进行分发和处理。

理解引擎的工作原理对于深入使用 LangBot 非常重要。当用户发送一条消息时,系统会依次经过以下流程:输入预处理(清洗、分词、特殊字符处理)→ 意图识别(确定用户想要执行什么操作)→ 处理器选择(找到对应的处理函数)→ 业务逻辑执行(调用相关服务)→ 响应生成(调用 LLM 生成回复)→ 输出后处理(格式化、敏感词过滤等)。

3.2 插件系统详解

LangBot 的插件系统是其最强大的特性之一。它采用装饰器模式,让开发者可以轻松地为机器人添加新功能。一个典型的插件结构如下:

# plugins/weather.py
from langbot import plugin, BotContext, ChannelAdapter
from langbot.decorators import on_message, command, scheduled
from typing import Optional
import aiohttp

@plugin(name="weather", version="1.0.0", description="天气查询插件")
class WeatherPlugin:
    """天气查询插件,提供实时天气信息和天气预报"""

    def __init__(self):
        self.api_key = None
        self.cache = {}  # 简单的内存缓存
        self.cache_ttl = 1800  # 缓存有效期(秒)

    @command(pattern=r"天气\s*(.+)?", description="查询指定城市的天气")
    async def get_weather(self, ctx: BotContext, city: Optional[str] = None):
        """
        处理天气查询命令

        Args:
            ctx: 机器人上下文对象,包含用户信息、会话状态等
            city: 要查询的城市名称,为空时使用用户所在城市
        """
        # 如果没有指定城市,尝试从上下文中获取用户位置
        if not city:
            city = ctx.user.get("location", "北京")

        # 检查缓存
        cache_key = f"weather_{city}"
        if cache_key in self.cache:
            return self.cache[cache_key]

        # 调用天气 API(这里以和风天气为例)
        weather_data = await self._fetch_weather(city)

        if weather_data:
            response = self._format_weather_response(weather_data)
            # 存入缓存
            self.cache[cache_key] = response
            return response
        else:
            return f"抱歉,暂未找到城市「{city}」的天气信息"

    @command(pattern=r"预报\s*(.+)?", description="查询天气预报")
    async def get_forecast(self, ctx: BotContext, city: Optional[str] = None):
        """查询未来几天的天气预报"""
        city = city or ctx.user.get("location", "北京")
        # 实现天气预报逻辑...
        return f"{city}未来三天的天气预报:..."

    @scheduled(cron="0 7 * * *")  # 每天早上7点执行
    async def daily_weather_report(self, bot: ChannelAdapter):
        """发送每日天气报告给订阅用户"""
        pass

    async def _fetch_weather(self, city: str) -> Optional[dict]:
        """内部方法:获取天气数据"""
        url = f"https://api.weather.example.com/v3/weather"
        params = {"city": city, "key": self.api_key}

        async with aiohttp.ClientSession() as session:
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    return await response.json()
        return None

    def _format_weather_response(self, data: dict) -> str:
        """格式化天气信息为友好的文本"""
        temp = data.get("temp", "N/A")
        condition = data.get("condition", "未知")
        humidity = data.get("humidity", "N/A")
        wind = data.get("wind", "N/A")

        return (
            f"🌤️ {city}当前天气\n"
            f"━━━━━━━━━━━━━\n"
            f"温度:{temp}°C\n"
            f"天气:{condition}\n"
            f"湿度:{humidity}%\n"
            f"风力:{wind}\n"
            f"━━━━━━━━━━━━━"
        )

3.3 对话流程管理

多轮对话是对话系统的核心挑战之一。LangBot 提供了强大的对话状态管理机制,让开发者能够轻松实现复杂的对话流程:

# handlers/conversation.py
from langbot import BotContext, ConversationState
from langbot.states import StateGroup
from enum import auto
from typing import Optional

class OrderStateGroup(StateGroup):
    """订餐系统的对话状态"""
    IDLE = auto()           # 空闲状态,等待用户开始
    SELECT_RESTAURANT = auto()  # 选择餐厅
    SELECT_DISH = auto()    # 选择菜品
    CONFIRM_ORDER = auto()  # 确认订单
    ADDRESS_CONFIRM = auto() # 确认地址
    PAYMENT = auto()        # 支付环节

class OrderConversation:
    """订餐对话处理器"""

    def __init__(self):
        # 可以在这里初始化数据库连接等
        self.restaurants = [
            {"id": 1, "name": "川味坊", "cuisine": "川菜", "rating": 4.5},
            {"id": 2, "name": "粤式茶餐厅", "cuisine": "粤菜", "rating": 4.7},
            {"id": 3, "name": "日式料理", "cuisine": "日料", "rating": 4.6},
        ]

    @ConversationState.Transition(from_=OrderStateGroup.IDLE, event="start_order")
    async def start_order(self, ctx: BotContext):
        """用户开始下单"""
        ctx.state.current_state = OrderStateGroup.SELECT_RESTAURANT

        restaurant_list = "\n".join([
            f"{r['id']}. {r['name']} ({r['cuisine']}) ⭐{r['rating']}"
            for r in self.restaurants
        ])

        return (
            "🍽️ 欢迎使用订餐服务!\n"
            "请选择您想吃的餐厅:\n\n"
            f"{restaurant_list}\n\n"
            "输入序号进行选择:"
        )

    @ConversationState.Transition(
        from_=OrderStateGroup.SELECT_RESTAURANT,
        event="select_restaurant"
    )
    async def handle_restaurant_selection(
        self, 
        ctx: BotContext, 
        selection: int
    ):
        """处理餐厅选择"""
        restaurant = next(
            (r for r in self.restaurants if r["id"] == selection), 
            None
        )

        if not restaurant:
            return "⚠️ 无效的选择,请重新输入餐厅序号"

        # 保存选择到会话上下文
        ctx.session["restaurant"] = restaurant
        ctx.state.current_state = OrderStateGroup.SELECT_DISH

        # 获取餐厅菜单
        menu = self._get_menu(restaurant["id"])
        menu_text = "\n".join([
            f"{dish['id']}. {dish['name']} ¥{dish['price']}"
            for dish in menu
        ])

        return (
            f"✅ 已选择:{restaurant['name']}\n\n"
            f"📋 菜单:\n{menu_text}\n\n"
            "请输入菜品序号(多个菜品用逗号分隔):"
        )

    @ConversationState.Transition(
        from_=OrderStateGroup.SELECT_DISH,
        event="select_dishes"
    )
    async def handle_dish_selection(
        self, 
        ctx: BotContext, 
        selections: str
    ):
        """处理菜品选择"""
        dish_ids = [int(s.strip()) for s in selections.split(",")]
        selected_dishes = []
        total_price = 0

        menu = self._get_menu(ctx.session["restaurant"]["id"])
        for dish_id in dish_ids:
            dish = next((d for d in menu if d["id"] == dish_id), None)
            if dish:
                selected_dishes.append(dish)
                total_price += dish["price"]

        if not selected_dishes:
            return "⚠️ 未选择任何菜品,请重新选择"

        ctx.session["dishes"] = selected_dishes
        ctx.session["total_price"] = total_price
        ctx.state.current_state = OrderStateGroup.CONFIRM_ORDER

        dishes_text = "\n".join([
            f"• {dish['name']} x1 ¥{dish['price']}"
            for dish in selected_dishes
        ])

        return (
            f"🛒 订单确认:\n\n"
            f"{dishes_text}\n"
            f"━━━━━━━━━━━━━━━\n"
            f"总计:¥{total_price}\n\n"
            "回复「确认」完成下单,或「取消」终止交易"
        )

    def _get_menu(self, restaurant_id: int) -> list:
        """模拟获取菜单数据"""
        return [
            {"id": 1, "name": "宫保鸡丁", "price": 38},
            {"id": 2, "name": "麻婆豆腐", "price": 28},
            {"id": 3, "name": "水煮鱼", "price": 68},
        ]

3.4 平台适配器机制

LangBot 的平台适配器系统是其多渠道支持的基石。每个平台(微信、Telegram、Slack 等)都有对应的适配器,负责处理平台特定的接口和协议。开发者可以使用统一的 API 与所有平台交互:

# adapters/platforms.py
from langbot.adapters import BaseAdapter, Message, User

class TelegramAdapter(BaseAdapter):
    """Telegram 平台适配器"""

    platform_name = "telegram"

    def __init__(self, bot_token: str):
        self.bot_token = bot_token
        self.api_base = f"https://api.telegram.org/bot{bot_token}"

    async def send_message(
        self, 
        chat_id: str, 
        text: str, 
        parse_mode: str = "Markdown",
        reply_markup: dict = None
    ) -> dict:
        """发送消息到 Telegram"""
        url = f"{self.api_base}/sendMessage"
        payload = {
            "chat_id": chat_id,
            "text": text,
            "parse_mode": parse_mode
        }
        if reply_markup:
            payload["reply_markup"] = reply_markup

        return await self._post(url, payload)

    async def send_keyboard(
        self, 
        chat_id: str, 
        text: str, 
        buttons: list[list[str]]
    ) -> dict:
        """发送带键盘按钮的消息"""
        keyboard = {
            "keyboard": [[{"text": btn} for btn in row] for row in buttons],
            "resize_keyboard": True,
            "one_time_keyboard": False
        }
        return await self.send_message(chat_id, text, reply_markup=keyboard)

    async def handle_update(self, update: dict) -> Message:
        """将 Telegram 更新转换为统一的 Message 对象"""
        message = update.get("message", {})
        return Message(
            id=message.get("message_id"),
            text=message.get("text", ""),
            sender=User(
                id=str(message.get("from", {}).get("id")),
                name=message.get("from", {}).get("first_name"),
                username=message.get("from", {}).get("username")
            ),
            chat_id=str(message.get("chat", {}).get("id")),
            raw_update=update
        )

四、实战教程:构建一个完整的智能助手

4.1 项目需求分析

现在让我们通过一个完整的实战项目来掌握 LangBot 的使用方法。我们的目标是构建一个功能丰富的智能助手,包含以下能力:智能问答、日程管理、提醒设置、天气查询、搜索服务。

4.2 项目初始化

首先创建项目结构并初始化配置:

# main.py
"""
智能助手主程序
功能:问答助手、日程管理、提醒设置
"""

import asyncio
import logging
from typing import Optional

# 导入 LangBot 核心组件
from langbot import (
    Bot,
    BotContext,
    Plugin,
    ChannelAdapter
)
from langbot.config import load_config
from langbot.adapters.telegram import TelegramAdapter
from langbot.llm import ChatGPT

# 导入自定义插件
from plugins.weather import WeatherPlugin
from plugins.schedule import SchedulePlugin
from plugins.qa import QAPlugin

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class SmartAssistant(Bot):
    """智能助手主类"""

    def __init__(self):
        super().__init__(
            name="SmartAssistant",
            version="1.0.0"
        )

        # 初始化 LLM
        self.llm = ChatGPT(
            model="gpt-4",
            api_key=self.config.llm_api_key,
            temperature=0.7  # 控制创造性,越低越确定性
        )

        # 注册插件
        self.register_plugin(WeatherPlugin())
        self.register_plugin(SchedulePlugin())
        self.register_plugin(QAPlugin())

        # 注册全局命令处理器
        self.register_command("help", self.show_help)
        self.register_command("status", self.show_status)

    async def on_message(self, ctx: BotContext) -> str:
        """
        消息处理主入口
        这是每个消息都会经过的核心方法
        """
        message = ctx.message.text.strip()
        user_id = ctx.user.id

        logger.info(f"收到消息 from {user_id}: {message}")

        # 检查是否是管理命令
        if message.startswith("/"):
            return await self.handle_command(ctx)

        # 检查是否触发了插件
        for plugin in self.plugins:
            result = await plugin.handle(ctx)
            if result:
                return result

        # 默认使用 LLM 生成回复
        return await self.generate_llm_response(ctx)

    async def generate_llm_response(self, ctx: BotContext) -> str:
        """
        调用大语言模型生成智能回复
        这个方法展示了如何集成 LangChain 等高级框架
        """
        # 构建提示词上下文
        system_prompt = self._build_system_prompt()
        conversation_history = self._get_conversation_history(ctx)

        try:
            response = await self.llm.chat(
                messages=[
                    {"role": "system", "content": system_prompt},
                    *conversation_history,
                    {"role": "user", "content": ctx.message.text}
                ],
                max_tokens=1000
            )
            return response
        except Exception as e:
            logger.error(f"LLM 调用失败: {e}")
            return "抱歉,我现在有点问题,请稍后再试~"

    def _build_system_prompt(self) -> str:
        """构建系统提示词"""
        return """你是一个友善、专业的智能助手,名叫小智。

你的特点:
1. 回答问题清晰、准确、易懂
2. 适当使用表情符号增加趣味性
3. 如果不确定答案,诚实告知用户
4. 善于引导用户完成复杂任务
5. 可以调用工具来帮助用户

当前支持的技能:
- 查询天气:说"天气+城市名"
- 管理日程:说"日程"开始使用
- 设置提醒:说"提醒+时间和内容"
- 搜索信息:直接提问即可"""

    def _get_conversation_history(
        self, 
        ctx: BotContext,
        max_turns: int = 5
    ) -> list:
        """获取对话历史"""
        history = ctx.session.get("history", [])
        return history[-max_turns * 2:]

    async def show_help(self, ctx: BotContext) -> str:
        """显示帮助信息"""
        help_text = """
🤖 *SmartAssistant 使用指南*

可用命令:
/start - 开始使用智能助手
/help - 显示此帮助信息
/status - 查看助手状态

常用功能:
• 天气查询:直接说"北京天气"
• 日程管理:说"我的日程"或"添加日程"
• 提醒设置:说"提醒我下午3点开会"
• 智能问答:直接提问即可

输入「退出」可结束当前对话场景
"""
        return help_text

    async def show_status(self, ctx: BotContext) -> str:
        """显示助手状态"""
        session_data = ctx.session

        return f"""
📊 *助手状态*

运行时间:在线
对话轮次:{session_data.get('turn_count', 0)}
当前插件:{', '.join(p.name for p in self.plugins)}
会话ID:{ctx.session_id[:8]}...

所有功能正常运行 ✅
"""

    async def handle_command(self, ctx: BotContext) -> str:
        """处理斜杠命令"""
        command = ctx.message.text.split()[0]

        commands = {
            "/start": "欢迎使用智能助手!输入 /help 查看所有功能",
            "/help": await self.show_help(ctx),
            "/status": await self.show_status(ctx)
        }

        return commands.get(command, f"未知命令:{command}")


# 启动函数
async def main():
    """主程序入口"""
    # 加载配置
    config = load_config()

    # 创建 Bot 实例
    bot = SmartAssistant()

    # 初始化平台适配器
    adapter = TelegramAdapter(
        bot_token=config.telegram_bot_token
    )

    # 启动机器人
    logger.info("🚀 智能助手启动中...")

    await bot.start(adapter)

    # 保持运行
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        logger.info("正在关闭助手...")
        await bot.stop()


if __name__ == "__main__":
    asyncio.run(main())

4.3 实现天气查询插件

现在让我们完善天气查询插件的实现:

# plugins/weather.py
"""
天气查询插件
功能:查询当前天气、空气质量、穿衣指数等
"""

import aiohttp
import json
from datetime import datetime, timedelta
from typing import Optional
from langbot import plugin, BotContext
from langbot.decorators import on_message, command, on_keyword

# 常用城市编码映射
CITY_CODES = {
    "北京": "101010100",
    "上海": "101020100",
    "广州": "101280101",
    "深圳": "101280601",
    "杭州": "101210101",
    "成都": "101270101",
    "重庆": "101040100",
    "武汉": "101200101",
    "西安": "101110101",
    "南京": "101190101",
}


@plugin(name="weather", version="1.0.0", description="天气查询插件")
class WeatherPlugin:
    """
    天气查询插件

    支持功能:
    - 当前天气查询
    - 空气质量查询
    - 未来天气预报
    - 生活指数查询
    """

    def __init__(self):
        self.api_key = None  # 请替换为你的 API Key
        self.api_base = "https://api.weather.example.com"
        self.cache = {}
        self.cache_duration = timedelta(minutes=30)

        # 内置回复模板
        self.emoji_map = {
            "晴": "☀️",
            "多云": "⛅",
            "阴": "☁️",
            "小雨": "🌧️",
            "中雨": "🌧️",
            "大雨": "⛈️",
            "雷阵雨": "⛈️",
            "小雪": "🌨️",
            "中雪": "❄️",
            "大雪": "❄️",
            "雾": "🌫️",
            "霾": "😷",
        }

    async def handle(self, ctx: BotContext) -> Optional[str]:
        """
        插件入口方法
        当消息触发本插件时自动调用
        """
        text = ctx.message.text

        # 检查缓存
        cache_key = self._get_cache_key(text)
        if cache_key in self.cache:
            cached = self.cache[cache_key]
            if datetime.now() - cached["time"] < self.cache_duration:
                return cached["response"]

        # 路由到具体处理方法
        if any(kw in text for kw in ["空气", "AQI", "污染"]):
            return await self._handle_air_quality(ctx)
        elif "预报" in text or "未来" in text:
            return await self._handle_forecast(ctx)
        elif "天气" in text or any(city in text for city in CITY_CODES):
            return await self._handle_current_weather(ctx)

        return None

    @command(pattern=r"天气\s*(.+)?", description="查询天气")
    async def query_weather(
        self, 
        ctx: BotContext, 
        city: Optional[str] = None
    ) -> str:
        """处理天气查询命令"""
        # 提取城市名
        city = self._extract_city(city or ctx.message.text)

        if not city:
            return "请告诉我您想查询哪个城市的天气~\n例如:「北京天气」或「上海天气」"

        # 获取天气数据
        weather_data = await self._fetch_weather(city)

        if weather_data:
            return self._format_weather_message(city, weather_data)
        else:
            return f"抱歉,暂时无法获取「{city}」的天气信息\n请稍后重试或尝试其他城市"

    async def _handle_current_weather(self, ctx: BotContext) -> str:
        """处理当前天气查询"""
        city = self._extract_city(ctx.message.text)
        return await self.query_weather(ctx, city)

    async def _handle_air_quality(self, ctx: BotContext) -> str:
        """处理空气质量查询"""
        city = self._extract_city(ctx.message.text)

        if not city:
            # 尝试从用户资料获取
            city = ctx.user.get("city", "北京")

        aqi_data = await self._fetch_air_quality(city)

        if aqi_data:
            return self._format_aqi_message(city, aqi_data)

        return f"暂时无法获取「{city}」的空气质量信息"

    async def _handle_forecast(self, ctx: BotContext) -> str:
        """处理天气预报查询"""
        city = self._extract_city(ctx.message.text)

        if not city:
            city = ctx.user.get("city", "北京")

        forecast_data = await self._fetch_forecast(city)

        if forecast_data:
            return self._format_forecast_message(city, forecast_data)

        return f"暂时无法获取「{city}」的天气预报"

    def _extract_city(self, text: str) -> Optional[str]:
        """从文本中提取城市名"""
        # 移除常见前缀
        text = text.replace("查询", "").replace("看看", "").replace("帮我查", "")

        for city in CITY_CODES.keys():
            if city in text:
                return city

        # 尝试匹配带"市"后缀的城市名
        for city in CITY_CODES.keys():
            if f"{city}市" in text:
                return city

        return None

    async def _fetch_weather(self, city: str) -> Optional[dict]:
        """获取天气数据"""
        city_code = CITY_CODES.get(city)

        if not city_code:
            return None

        # 这里应该调用真实的天气 API
        # 为了演示,返回模拟数据
        return {
            "temp": 22,
            "feels_like": 24,
            "condition": "多云",
            "humidity": 65,
            "wind_speed": 12,
            "wind_dir": "东南风",
            "visibility": 10,
            "uv_index": 5,
        }

    async def _fetch_air_quality(self, city: str) -> Optional[dict]:
        """获取空气质量数据"""
        # 模拟数据
        return {
            "aqi": 78,
            "level": "良",
            "pm25": 45,
            "pm10": 102,
            "so2": 10,
            "no2": 35,
            "co": 0.8,
            "o3": 120,
        }

    async def _fetch_forecast(self, city: str) -> Optional[list]:
        """获取天气预报数据"""
        # 模拟7天预报数据
        forecasts = []
        conditions = ["晴", "多云", "小雨", "多云", "晴", "阴", "小雨"]

        for i in range(7):
            date = datetime.now() + timedelta(days=i)
            forecasts.append({
                "date": date.strftime("%m-%d"),
                "weekday": ["周一", "周二", "周三", "周四", "周五", "周六", "周日"][date.weekday()],
                "condition": conditions[i],
                "temp_high": 25 + i,
                "temp_low": 16 + i,
            })

        return forecasts

    def _format_weather_message(self, city: str, data: dict) -> str:
        """格式化天气消息"""
        emoji = self.emoji_map.get(data["condition"], "🌤️")

        return f"""
{city}天气预报
━━━━━━━━━━━━━━━━

{emoji} {data["condition"]}
🌡️ 温度:{data["temp"]}°C(体感 {data["feels_like"]}°C)
💧 湿度:{data["humidity"]}%
🌬️ 风力:{data["wind_dir"]} {data["wind_speed"]}
👁️ 能见度:{data["visibility"]}km
☀️ 紫外线:{data["uv_index"]}

生活建议:
{self._get_suggestion(data)}
"""

    def _format_aqi_message(self, city: str, data: dict) -> str:
        """格式化空气质量消息"""
        aqi = data["aqi"]

        if aqi <= 50:
            color_emoji = "🟢"
            health = "空气质量优秀,可以正常活动"
        elif aqi <= 100:
            color_emoji = "🟡"
            health = "空气质量良好,敏感人群注意防护"
        elif aqi <= 150:
            color_emoji = "🟠"
            health = "轻度污染,减少户外活动"
        elif aqi <= 200:
            color_emoji = "🔴"
            health = "中度污染,建议戴口罩"
        else:
            color_emoji = "⚫"
            health = "严重污染,避免外出"

        return f"""
{city}空气质量
━━━━━━━━━━━━━━━━

{color_emoji} AQI:{aqi}{data["level"]}

详细数据:
• PM2.5:{data["pm25"]} μg/m³
• PM10:{data["pm10"]} μg/m³
• SO₂:{data["so2"]} μg/m³
• NO₂:{data["no2"]} μg/m³
• CO:{data["co"]} mg/m³
• O₃:{data["o3"]} μg/m³

健康建议:{health}
"""

    def _format_forecast_message(self, city: str, forecasts: list) -> str:
        """格式化天气预报消息"""
        lines = [f"{city}未来7天预报", "━━━━━━━━━━━━━━━━"]

        for fc in forecasts[:5]:  # 显示前5天
            emoji = self.emoji_map.get(fc["condition"], "🌤️")
            lines.append(
                f"{fc['date']} {fc['weekday']}{emoji}{fc['condition']} "
                f"{fc['temp_low']}~{fc['temp_high']}°C"
            )

        return "\n".join(lines)

    def _get_suggestion(self, weather: dict) -> str:
        """根据天气数据生成生活建议"""
        suggestions = []

        temp = weather["temp"]
        condition = weather["condition"]
        uv = weather.get("uv_index", 0)

        # 温度建议
        if temp < 10:
            suggestions.append("🧥 天气较凉,记得添加外套")
        elif temp > 28:
            suggestions.append("🧴 天气炎热,注意防暑降温")

        # 天气状况建议
        if "雨" in condition:
            suggestions.append("☂️ 记得带伞,小心路滑")
        if "雪" in condition:
            suggestions.append("🧤 路面可能有积雪,注意出行安全")
        if condition in ["雾", "霾"]:
            suggestions.append("😷 能见度较低,外出注意安全")

        # 紫外线建议
        if uv >= 6:
            suggestions.append("🧴 紫外线强,记得涂防晒霜")

        return "\n".join(suggestions) if suggestions else "今日天气适宜出行"

    def _get_cache_key(self, text: str) -> str:
        """生成缓存键"""
        return text[:20]  # 使用前20个字符作为缓存键

4.4 实现日程管理插件

接下来是日程管理插件的实现:

# plugins/schedule.py
"""
日程管理插件
功能:添加、查看、删除日程,设置提醒
"""

from datetime import datetime, timedelta
from typing import Optional, List
from dataclasses import dataclass, field
from langbot import plugin, BotContext
from langbot.decorators import command, on_message


@dataclass
class ScheduleItem:
    """日程项数据类"""
    id: str
    title: str
    description: str
    start_time: datetime
    end_time: Optional[datetime]
    reminder: Optional[timedelta] = None
    completed: bool = False
    created_at: datetime = field(default_factory=datetime.now)


@plugin(
    name="schedule", 
    version="1.0.0", 
    description="日程管理插件"
)
class SchedulePlugin:
    """
    日程管理插件

    支持功能:
    - 添加日程:说"添加日程 下午3点开会"
    - 查看日程:说"我的日程"或"今日日程"
    - 删除日程:说"删除日程 1"
    - 标记完成:说"完成日程 1"
    - 提醒设置:说"提醒我30分钟后喝水"
    """

    def __init__(self):
        # 使用字典存储日程,按用户ID分组
        self.schedules: dict[str, List[ScheduleItem]] = {}
        # 待处理的操作状态
        self.pending_actions: dict[str, dict] = {}

    async def handle(self, ctx: BotContext) -> Optional[str]:
        """插件入口方法"""
        text = ctx.message.text.strip()
        user_id = ctx.user.id

        # 初始化用户日程列表
        if user_id not in self.schedules:
            self.schedules[user_id] = []

        # 检查是否有待处理的操作
        if user_id in self.pending_actions:
            return await self._handle_pending_action(ctx)

        # 路由到具体处理方法
        if any(kw in text for kw in ["日程", "计划", "安排"]):
            return await self._handle_schedule(ctx)
        elif any(kw in text for kw in ["提醒", "闹钟", "记得"]):
            return await self._handle_reminder(ctx)
        elif text.startswith("删除") or text.startswith("取消"):
            return await self._handle_delete(ctx)
        elif text.startswith("完成"):
            return await self._handle_complete(ctx)

        return None

    @command(pattern=r"添加日程\s*(.+)", description="添加新日程")
    async def add_schedule(
        self, 
        ctx: BotContext, 
        content: str
    ) -> str:
        """添加新日程"""
        user_id = ctx.user.id

        # 解析日程内容
        parsed = self._parse_schedule_content(content)

        if not parsed["title"]:
            return "请提供日程内容~\n例如:「添加日程 下午3点开会讨论项目」"

        # 创建日程项
        schedule = ScheduleItem(
            id=f"{user_id}_{datetime.now().timestamp()}",
            title=parsed["title"],
            description=parsed.get("description", ""),
            start_time=parsed.get("time", datetime.now()),
            end_time=parsed.get("end_time"),
            reminder=parsed.get("reminder")
        )

        self.schedules[user_id].append(schedule)

        response = f"✅ 已添加日程:\n📝 {schedule.title}\n"

        if schedule.start_time:
            response += f"🕐 时间:{schedule.start_time.strftime('%Y-%m-%d %H:%M')}\n"

        if schedule.reminder:
            minutes = int(schedule.reminder.total_seconds() / 60)
            response += f"⏰ 提醒:提前{minutes}分钟\n"

        return response

    @command(pattern=r"(我的)?日程", description="查看日程列表")
    async def list_schedules(self, ctx: BotContext, _: str = "") -> str:
        """列出所有日程"""
        user_id = ctx.user.id
        schedules = self.schedules.get(user_id, [])

        if not schedules:
            return "📅 您还没有添加任何日程~\n\n说「添加日程」来创建新日程吧!"

        # 分类显示
        upcoming = []
        today = []
        completed = []

        now = datetime.now()
        today_start = now.replace(hour=0, minute=0, second=0)
        today_end = today_start + timedelta(days=1)

        for s in schedules:
            if s.completed:
                completed.append(s)
            elif s.start_time and today_start <= s.start_time < today_end:
                today.append(s)
            elif s.start_time and s.start_time >= now:
                upcoming.append(s)

        lines = ["📅 您的日程安排\n━━━━━━━━━━━━━━━━\n"]

        if today:
            lines.append("📆 今日日程:")
            for i, s in enumerate(today, 1):
                status = "✅" if s.completed else "○"
                time_str = s.start_time.strftime("%H:%M")
                lines.append(f"  {status} {i}. {s.title} {time_str}")
            lines.append("")

        if upcoming:
            lines.append("📆 即将到来:")
            for i, s in enumerate(upcoming[:5], 1):
                status = "✅" if s.completed else "○"
                date_str = s.start_time.strftime("%m-%d %H:%M")
                lines.append(f"  {status} {i}. {s.title} {date_str}")
            lines.append("")

        if completed:
            lines.append(f"✅ 已完成:{len(completed)}项")

        lines.append("\n💡 输入「完成日程 序号」标记完成")
        lines.append("💡 输入「删除日程 序号」删除日程")

        return "\n".join(lines)

    @command(pattern=r"今日日程", description="查看今日日程")
    async def today_schedules(self, ctx: BotContext) -> str:
        """查看今日日程"""
        user_id = ctx.user.id
        schedules = self.schedules.get(user_id, [])

        now = datetime.now()
        today_start = now.replace(hour=0, minute=0, second=0)
        today_end = today_start + timedelta(days=1)

        today_items = [
            s for s in schedules
            if s.start_time and today_start <= s.start_time < today_end
        ]

        if not today_items:
            return "📅 今天没有日程安排~\n好好享受今天的时光吧!🌟"

        lines = [f"📅 {now.strftime('%Y年%m月%d日')} 日程\n━━━━━━━━━━━━━━━━\n"]

        for i, s in enumerate(sorted(today_items, key=lambda x: x.start_time), 1):
            status = "✅" if s.completed else "○"
            time_str = s.start_time.strftime("%H:%M")
            lines.append(f"{status} {i}. {time_str} - {s.title}")

        return "\n".join(lines)

    async def _handle_schedule(self, ctx: BotContext) -> str:
        """处理日程相关请求"""
        text = ctx.message.text

        if "添加" in text or "新建" in text:
            # 提取添加内容
            content = text.replace("添加", "").replace("新建", "").replace("日程", "").strip()
            return await self.add_schedule(ctx, content)
        elif "今日" in text or "今天" in text:
            return await self.today_schedules(ctx)
        else:
            return await self.list_schedules(ctx, "")

    async def _handle_delete(self, ctx: BotContext) -> str:
        """处理删除日程"""
        user_id = ctx.user.id
        text = ctx.message.text

        # 提取序号
        import re
        match = re.search(r"\d+", text)

        if not match:
            return "请指定要删除的日程序号~\n例如:「删除日程 1」"

        index = int(match.group()) - 1
        schedules = self.schedules.get(user_id, [])

        if index < 0 or index >= len(schedules):
            return "⚠️ 无效的日程序号"

        deleted = schedules.pop(index)
        return f"🗑️ 已删除日程:「{deleted.title}」"

    async def _handle_complete(self, ctx: BotContext) -> str:
        """处理完成日程"""
        user_id = ctx.user.id
        text = ctx.message.text

        import re
        match = re.search(r"\d+", text)

        if not match:
            return "请指定要完成的日程序号~\n例如:「完成日程 1」"

        index = int(match.group()) - 1
        schedules = self.schedules.get(user_id, [])

        if index < 0 or index >= len(schedules):
            return "⚠️ 无效的日程序号"

        schedules[index].completed = True
        return f"✅ 已完成:「{schedules[index].title}」"

    async def _handle_reminder(self, ctx: BotContext) -> str:
        """处理提醒设置"""
        text = ctx.message.text

        # 解析提醒内容
        reminder = self._parse_reminder(text)

        if not reminder:
            return (
                "⏰ 设置提醒~\n\n"
                "可以说:「提醒我30分钟后喝水」\n"
                "或:「提醒我下午3点开会」\n"
                "或:「1小时后提醒我做报告」"
            )

        # 创建提醒日程
        user_id = ctx.user.id
        schedule = ScheduleItem(
            id=f"{user_id}_{datetime.now().timestamp()}",
            title=reminder["title"],
            description="提醒",
            start_time=reminder["time"],
            end_time=None,
            reminder=reminder.get("advance")
        )

        self.schedules[user_id].append(schedule)

        time_str = reminder["time"].strftime("%H:%M")
        return f"⏰ 已设置提醒:{schedule.title}\n🕐 提醒时间:{time_str}"

    def _parse_schedule_content(self, content: str) -> dict:
        """解析日程内容"""
        import re

        result = {
            "title": "",
            "description": "",
            "time": None,
            "reminder": None
        }

        # 提取时间表达式
        time_patterns = [
            (r"(\d{1,2})点(\d{1,2})?分?", "%H:%M"),
            (r"今天下午(\d{1,2})点(\d{1,2})?分?", "today PM"),
            (r"今天上午(\d{1,2})点(\d{1,2})?分?", "today AM"),
            (r"今天(\d{1,2})点(\d{1,2})?分?", "today"),
        ]

        for pattern, _ in time_patterns:
            match = re.search(pattern, content)
            if match:
                result["time"] = self._parse_time_expression(match.group())
                content = content.replace(match.group(), "").strip()
                break

        if not result["time"]:
            result["time"] = datetime.now() + timedelta(hours=1)

        # 提取提醒设置
        reminder_patterns = [
            r"提前(\d+)分钟提醒",
            r"提前(\d+)小时提醒",
        ]

        for pattern in reminder_patterns:
            match = re.search(pattern, content)
            if match:
                value = int(match.group(1))
                if "分钟" in pattern:
                    result["reminder"] = timedelta(minutes=value)
                else:
                    result["reminder"] = timedelta(hours=value)
                break

        result["title"] = content or "新日程"

        return result

    def _parse_reminder(self, text: str) -> Optional[dict]:
        """解析提醒设置"""
        import re

        result = {"title": "", "time": None}

        # 提取时间
        now = datetime.now()

        # 相对时间:X分钟后、X小时后
        relative_patterns = [
            (r"(\d+)分钟后", timedelta(minutes=1)),
            (r"(\d+)小时后", timedelta(hours=1)),
        ]

        for pattern, delta in relative_patterns:
            match = re.search(pattern, text)
            if match:
                value = int(match.group(1))
                result["time"] = now + delta * value
                break

        # 绝对时间:下午X点
        if not result["time"]:
            if "下午" in text or "晚上" in text:
                match = re.search(r"(\d{1,2})点", text)
                if match:
                    hour = int(match.group(1))
                    if hour < 12:
                        hour += 12
                    result["time"] = now.replace(hour=hour, minute=0, second=0)

        # 提取提醒内容
        content = text
        for pattern in [r"\d+分钟后?", r"\d+小时后?", r"提醒我", r"提醒"]:
            content = re.sub(pattern, "", content).strip()

        result["title"] = content or "提醒"

        if not result["time"] or not result["title"]:
            return None

        return result

    def _parse_time_expression(self, expr: str) -> datetime:
        """解析时间表达式"""
        import re

        now = datetime.now()

        # 今天 X点
        match = re.search(r"(\d{1,2})点(\d{1,2})?", expr)
        if match:
            hour = int(match.group(1))
            minute = int(match.group(2) or "0")

            if "下午" in expr or "晚上" in expr:
                if hour < 12:
                    hour += 12

            result = now.replace(hour=hour, minute=minute, second=0)

            # 如果时间已过,安排到明天
            if result < now:
                result += timedelta(days=1)

            return result

        return now + timedelta(hours=1)

4.5 智能问答插件实现

最后实现智能问答插件:

# plugins/qa.py
"""
智能问答插件
功能:通用知识问答、上下文理解、对话生成
"""

from typing import Optional, List, Dict
from langbot import plugin, BotContext
from langbot.decorators import on_message, command


@plugin(
    name="qa", 
    version="1.0.0", 
    description="智能问答插件"
)
class QAPlugin:
    """
    智能问答插件

    提供通用对话和问答能力
    支持上下文理解和多轮对话
    """

    def __init__(self):
        # 知识库(简单版本,可扩展为向量数据库)
        self.knowledge_base: Dict[str, str] = {
            "hello": "你好!有什么我可以帮助你的吗?",
            "hi": "嗨~很高兴见到你!",
            "你是谁": "我是一个智能助手,可以帮你查询天气、管理日程、回答问题等~",
            "有什么用": "我可以帮你:\n• 查询天气(说「天气+城市」)\n• 管理日程(说「添加日程」)\n• 设置提醒(说「提醒我」)\n• 回答问题(直接问我)",
            "谢谢": "不客气!😊 还有什么需要帮忙的吗?",
            "再见": "再见!有问题随时找我~👋",
        }

        # 敏感话题的预设回复
        self.sensitive_responses = {
            "政治": "这个话题我不太擅长讨论,不如我们聊聊别的吧~",
            "宗教": "每个人都有自己信仰的自由呢,我们可以聊聊其他话题吗?",
            "暴力": "这类内容我不太关注,让我们聊点有趣的吧!",
        }

    async def handle(self, ctx: BotContext) -> Optional[str]:
        """插件入口方法"""
        text = ctx.message.text.strip().lower()

        # 检查是否是直接命令
        if text.startswith("/") or text.startswith("天气") or text.startswith("日程"):
            return None

        # 检查知识库
        for key, response in self.knowledge_base.items():
            if key in text:
                return response

        # 检查敏感话题
        for topic, response in self.sensitive_responses.items():
            if topic in text:
                return response

        # 检查是否是简单问答
        if self._is_simple_question(text):
            return self._answer_simple_question(text)

        # 其他问题交给 LLM 处理
        return None

    def _is_simple_question(self, text: str) -> bool:
        """判断是否是简单问题"""
        simple_indicators = [
            "是什么", "是谁", "在哪", "怎么", "如何",
            "多少", "几个", "为什么", "会不会"
        ]
        return any(indicator in text for indicator in simple_indicators)

    def _answer_simple_question(self, text: str) -> str:
        """回答简单问题"""
        # 这里可以接入简单的规则引擎或知识图谱
        # 为了演示,返回引导用户深入交流的回复

        answers = {
            "是什么": "这个问题比较有趣,让我来想想怎么回答你~你具体想了解哪方面呢?",
            "是谁": "关于这个问题,我需要思考一下~能说得更具体些吗?",
            "在哪": "这个问题涉及的具体位置是什么呢?",
            "怎么": "这个操作其实不难,让我一步步教你~",
            "如何": "这是个好问题!让我来帮你解答~",
            "多少": "这个需要根据具体情况来分析~能提供更多细节吗?",
            "为什么": "原因可能有几个方面,让我来分析一下~",
        }

        for key, answer in answers.items():
            if key in text:
                return answer

        return "这个问题我需要思考一下,不如我们换个方式聊聊?"

    @command(pattern=r"教我\s*(.+)", description="学习新知识")
    async def teach(self, ctx: BotContext, topic: str) -> str:
        """学习新知识(将问题-回答存入知识库)"""
        # 这个功能可以扩展为让用户教机器人新知识
        return (
            f"好的,你想学习「{topic}」~\n\n"
            "请继续告诉我:\n"
            "• 这个话题的主要内容是什么?\n"
            "• 有什么重要的知识点?"
        )

    @command(pattern=r"忘记\s*(.+)", description="删除记忆")
    async def forget(self, ctx: BotContext, topic: str) -> str:
        """删除机器人的记忆"""
        for key in list(self.knowledge_base.keys()):
            if topic in key:
                del self.knowledge_base[key]

        return f"好的,我已经忘记关于「{topic}」的内容了"

五、常见使用场景与案例

5.1 企业客服机器人

LangBot 非常适合用于构建企业客服机器人。通过集成知识库和常见问题解答系统,可以实现 7×24 小时自动客服:

# examples/customer_service.py
"""
企业客服机器人示例
功能:自动回复常见问题、转人工服务、工单提交
"""

from langbot import plugin, BotContext
from langbot.decorators import command
from typing import Optional


@plugin(name="customer_service", version="1.0.0")
class CustomerServicePlugin:
    """客服插件"""

    def __init__(self):
        # FAQ 知识库
        self.faqs = {
            "退货": "您好,退货政策如下:\n1. 7天内可申请退货\n2. 商品需保持原包装\n3. 请填写退货单后寄回",
            "快递": "您的订单将在1-3个工作日内发货,快递时效约3-5天",
            "支付": "我们支持微信、支付宝、银行卡支付",
            "会员": "会员享有积分返利、生日优惠、专属客服等权益",
        }

        # 转人工关键词
        self.human_keywords = ["人工", "客服", "投诉", "升级", "真人在吗"]

    async def handle(self, ctx: BotContext) -> Optional[str]:
        """处理客服消息"""
        text = ctx.message.text

        # 检查是否需要转人工
        if any(kw in text for kw in self.human_keywords):
            return self._transfer_to_human(ctx)

        # 匹配 FAQ
        for keyword, answer in self.faqs.items():
            if keyword in text:
                return self._format_faq_response(keyword, answer)

        return None

    def _format_faq_response(self, keyword: str, answer: str) -> str:
        """格式化 FAQ 回复"""
        return f"""
💬 您的问题:{keyword}

{answer}

---
还有其他问题吗?输入「转人工」联系客服
"""

    def _transfer_to_human(self, ctx: BotContext) -> str:
        """转人工服务"""
        # 生成转接请求
        ticket_id = f"TF{datetime.now().strftime('%Y%m%d%H%M%S')}"

        return f"""
👤 正在为您转接人工客服...

请稍候,客服人员将尽快为您服务。
您的排队编号:{ticket_id}

💡 同时您也可以:
• 描述具体问题,等待自动回复
• 拨打客服热线:400-xxx-xxxx
"""

5.2 个人助手应用

个人用户可以使用 LangBot 构建专属的智能助手:

# examples/personal_assistant.py
"""
个人助手示例
功能:日程管理、提醒、记账、健康追踪
"""

from langbot import plugin, BotContext
from langbot.decorators import command
from dataclasses import dataclass
from datetime import datetime


@dataclass
class Expense:
    """支出记录"""
    amount: float
    category: str
    note: str
    timestamp: datetime


@plugin(name="personal_assistant", version="1.0.0")
class PersonalAssistantPlugin:
    """个人助手插件"""

    def __init__(self):
        self.expenses: list[Expense] = []
        self.budget = 5000  # 月预算
        self.water_goal = 2000  # 每日饮水目标(ml)
        self.water_today = 0

    @command(pattern=r"记账\s*(\d+)\s*(.+)", description="记录支出")
    async def record_expense(
        self, 
        ctx: BotContext, 
        amount: str, 
        category: str
    ) -> str:
        """记录一笔支出"""
        expense = Expense(
            amount=float(amount),
            category=category,
            note="",
            timestamp=datetime.now()
        )
        self.expenses.append(expense)

        return (
            f"✅ 已记录支出\n"
            f"💰 金额:¥{amount}\n"
            f"📂 分类:{category}\n"
            f"⏰ 时间:{expense.timestamp.strftime('%H:%M')}"
        )

    @command(pattern=r"本月消费", description="查看本月消费")
    async def monthly_expenses(self, ctx: BotContext) -> str:
        """查看本月消费统计"""
        now = datetime.now()
        month_expenses = [
            e for e in self.expenses 
            if e.timestamp.month == now.month
        ]

        if not month_expenses:
            return "本月暂无消费记录"

        total = sum(e.amount for e in month_expenses)
        remaining = self.budget - total

        # 按分类统计
        by_category = {}
        for e in month_expenses:
            by_category[e.category] = by_category.get(e.category, 0) + e.amount

        lines = [
            f"📊 {now.strftime('%Y年%m月')} 消费统计\n",
            "━━━━━━━━━━━━━━━━\n",
            f"💰 总消费:¥{total:.2f}\n",
            f"📈 预算:¥{self.budget:.2f}\n",
            f"📉 剩余:¥{remaining:.2f}\n",
            "\n分类明细:"
        ]

        for cat, amount in sorted(by_category.items(), key=lambda x: -x[1]):
            pct = amount / total * 100
            lines.append(f"• {cat}:¥{amount:.2f} ({pct:.1f}%)")

        return "\n".join(lines)

    @command(pattern=r"喝水\s*(\d+)", description="记录饮水量")
    async def record_water(
        self, 
        ctx: BotContext, 
        amount: str
    ) -> str:
        """记录饮水量"""
        ml = int(amount)
        self.water_today += ml

        progress = self.water_today / self.water_goal * 100

        return (
            f"💧 已记录饮水:{ml}ml\n"
            f"📊 今日累计:{self.water_today}ml / {self.water_goal}ml\n"
            f"{'🟢' if progress >= 100 else '🟡'} 目标完成度:{progress:.0f}%"
        )

5.3 社区论坛机器人

社区论坛可以使用 LangBot 实现自动管理:

# examples/community_bot.py
"""
社区论坛机器人
功能:新用户欢迎、内容审核、自动标签、定时推送
"""

from langbot import plugin, BotContext
from langbot.decorators import command, on_join, on_post
from typing import Optional


@plugin(name="community_bot", version="1.0.0")
class CommunityBotPlugin:
    """社区论坛机器人"""

    def __init__(self):
        # 欢迎消息池
        self.welcome_messages = [
            "欢迎 @{} 加入我们的社区!🎉",
            "新朋友 {} 来了,大家欢迎~ 👏",
            "热烈欢迎 {}!期待你的精彩发言 🌟",
        ]

        # 敏感词列表
        self.sensitive_words = ["spam", "广告", "xxx"]  # 实际应使用更完整的词库

        # 版块关键词映射
        self.section_keywords = {
            "技术": ["代码", "编程", "开发", "Python", "Java"],
            "产品": ["设计", "需求", "原型", "UX"],
            "运营": ["增长", "用户", "活动", "转化"],
        }

    @on_join
    async def welcome_new_member(self, ctx: BotContext) -> str:
        """新成员加入时自动发送欢迎"""
        import random
        username = ctx.user.get("name", "新用户")
        message = random.choice(self.welcome_messages).format(username)

        return message + "\n\n📌 社区规则:\n• 发帖前先阅读置顶帖\n• 友善交流,禁止广告\n• 技术问题请发到对应版块"

    @on_post
    async def auto_tag_section(self, ctx: BotContext) -> Optional[str]:
        """自动为帖子添加标签"""
        title = ctx.message.text[:50]  # 取前50字符

        matched_sections = []
        for section, keywords in self.section_keywords.items():
            if any(kw in title for kw in keywords):
                matched_sections.append(section)

        if matched_sections:
            return f"📌 推荐版块:{' / '.join(matched_sections)}"

        return None

    @on_post
    async def content_moderation(self, ctx: BotContext) -> Optional[str]:
        """内容审核"""
        text = ctx.message.text

        for word in self.sensitive_words:
            if word in text.lower():
                return (
                    "⚠️ 您的发帖包含可能不当的内容\n"
                    "请修改后重新发布\n"
                    "如有疑问请联系管理员"
                )

        return None

    @command(pattern=r"社区规则", description="查看社区规则")
    async def show_rules(self, ctx: BotContext) -> str:
        """显示社区规则"""
        return """
📜 社区规则

1️⃣ 发帖规范
• 标题清晰,描述完整
• 选择合适的版块
• 技术问题请附上代码和错误信息

2️⃣ 交流礼仪
• 友善待人,尊重他人
• 欢迎点赞、关注、收藏
• 有帮助的回复请标记为「最佳答案」

3️⃣ 禁止内容
• 广告、推广信息
• 违法违规内容
• 人身攻击、恶意刷屏

违反规则将被警告或禁言处理
"""

六、进阶技巧与最佳实践

6.1 性能优化

在生产环境中,合理的性能优化至关重要:

# optimization/performance.py
"""
性能优化示例
包含:缓存策略、并发处理、资源池化
"""

import asyncio
from functools import lru_cache
from typing import Optional
from datetime import datetime, timedelta


class OptimizedWeatherPlugin:
    """优化后的天气插件"""

    def __init__(self):
        # 使用 LRU 缓存
        self._weather_cache = {}
        self._cache_ttl = timedelta(minutes=30)

        # 批量请求限制
        self._semaphore = asyncio.Semaphore(5)  # 最多5个并发请求

        # 连接池
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        """获取或创建 HTTP 会话"""
        if self._session is None or self._session.closed:
            connector = aiohttp.TCPConnector(
                limit=100,  # 连接池大小
                ttl_dns_cache=300  # DNS 缓存时间
            )
            self._session = aiohttp.ClientSession(connector=connector)
        return self._session

    async def fetch_weather_optimized(self, city: str) -> Optional[dict]:
        """优化后的天气获取"""
        cache_key = f"weather_{city}"

        # 检查缓存
        if cache_key in self._weather_cache:
            cached = self._weather_cache[cache_key]
            if datetime.now() - cached["time"] < self._cache_ttl:
                return cached["data"]

        # 使用信号量限制并发
        async with self._semaphore:
            try:
                data = await self._do_fetch_weather(city)

                # 更新缓存
                self._weather_cache[cache_key] = {
                    "data": data,
                    "time": datetime.now()
                }

                return data
            except Exception as e:
                # 错误处理
                return None

    async def _do_fetch_weather(self, city: str) -> dict:
        """实际执行 HTTP 请求"""
        session = await self._get_session()
        # ... 请求逻辑
        pass

    async def close(self):
        """关闭资源"""
        if self._session and not self._session.closed:
            await self._session.close()

6.2 安全加固

生产环境部署时必须考虑安全问题:

# security/bot_security.py
"""
安全加固示例
包含:输入验证、速率限制、权限控制
"""

import hashlib
import time
from collections import defaultdict
from typing import Optional


class SecurityMiddleware:
    """安全中间件"""

    def __init__(self):
        # 速率限制:用户ID -> (请求时间戳列表)
        self.rate_limits: dict[str, list] = defaultdict(list)

        # 配置
        self.max_requests_per_minute = 20
        self.max_message_length = 2000
        self.blocked_users: set = set()

        # 危险命令白名单
        self.admin_commands = {
            "/ban", "/unban", "/kick", 
            "/shutdown", "/restart", "/config"
        }

    async def process_message(self, ctx: BotContext) -> Optional[str]:
        """处理消息前的安全检查"""
        user_id = ctx.user.id

        # 检查用户是否被封禁
        if user_id in self.blocked_users:
            return "您已被禁止使用此机器人"

        # 速率限制检查
        if not self._check_rate_limit(user_id):
            return "请求过于频繁,请稍后再试"

        # 消息长度检查
        if len(ctx.message.text) > self.max_message_length:
            return f"消息过长,请控制在{self.max_message_length}字以内"

        # 危险命令权限检查
        if ctx.message.text in self.admin_commands:
            if not self._is_admin(user_id):
                return "此命令需要管理员权限"

        # 输入净化
        cleaned_text = self._sanitize_input(ctx.message.text)
        ctx.message.text = cleaned_text

        return None  # 通过检查

    def _check_rate_limit(self, user_id: str) -> bool:
        """检查速率限制"""
        now = time.time()
        window = 60  # 1分钟窗口

        # 清理过期记录
        self.rate_limits[user_id] = [
            t for t in self.rate_limits[user_id]
            if now - t < window
        ]

        # 检查是否超限
        if len(self.rate_limits[user_id]) >= self.max_requests_per_minute:
            return False

        # 记录本次请求
        self.rate_limits[user_id].append(now)
        return True

    def _sanitize_input(self, text: str) -> str:
        """输入净化"""
        import re

        # 移除控制字符
        text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)

        # 移除潜在的恶意链接
        text = re.sub(
            r'https?://[^\s]+\.exe[^\s]*', 
            '[链接已屏蔽]', 
            text,
            flags=re.IGNORECASE
        )

        # 移除 HTML 标签
        text = re.sub(r'<[^>]+>', '', text)

        return text.strip()

    def _is_admin(self, user_id: str) -> bool:
        """检查是否为管理员"""
        # 从配置读取管理员列表
        admin_ids = {"admin_user_id_1", "admin_user_id_2"}
        return user_id in admin_ids

    def block_user(self, user_id: str):
        """封禁用户"""
        self.blocked_users.add(user_id)

    def unblock_user(self, user_id: str):
        """解封用户"""
        self.blocked_users.discard(user_id)

6.3 测试策略

完善的测试是保证项目质量的关键:

# tests/test_plugins.py
"""
测试用例示例
包含:单元测试、集成测试、Mock 测试
"""

import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime


class TestWeatherPlugin:
    """天气插件测试类"""

    @pytest.fixture
    def plugin(self):
        """创建插件实例"""
        from plugins.weather import WeatherPlugin
        return WeatherPlugin()

    @pytest.fixture
    def mock_context(self):
        """创建模拟上下文"""
        ctx = MagicMock()
        ctx.message.text = "北京天气"
        ctx.user.id = "test_user"
        ctx.user.get = MagicMock(return_value="北京")
        return ctx

    @pytest.mark.asyncio
    async def test_extract_city(self, plugin):
        """测试城市名提取"""
        assert plugin._extract_city("北京天气") == "北京"
        assert plugin._extract_city("查询上海天气") == "上海"
        assert plugin._extract_city("广州的天气怎么样") == "广州"
        assert plugin._extract_city("今天天气不错") is None

    @pytest.mark.asyncio
    async def test_query_weather_success(self, plugin, mock_context):
        """测试天气查询成功"""
        # Mock API 返回
        with patch.object(
            plugin, 
            '_fetch_weather', 
            return_value={
                "temp": 22,
                "condition": "晴",
                "humidity": 50,
                "wind_speed": 3,
                "wind_dir": "东南风",
                "feels_like": 24,
                "visibility": 10,
                "uv_index": 5
            }
        ):
            result = await plugin.query_weather(mock_context, "北京")

            assert "北京" in result
            assert "晴" in result
            assert "22" in result

    @pytest.mark.asyncio
    async def test_query_weather_no_city(self, plugin, mock_context):
        """测试未指定城市"""
        result = await plugin.query_weather(mock_context, None)
        assert "请告诉我" in result

    @pytest.mark.asyncio
    async def test_format_weather_message(self, plugin):
        """测试消息格式化"""
        data = {
            "temp": 25,
            "condition": "多云",
            "humidity": 60,
            "wind_speed": 4,
            "wind_dir": "东风",
            "feels_like": 27,
            "visibility": 8,
            "uv_index": 6
        }

        result = plugin._format_weather_message("上海", data)

        assert "上海" in result
        assert "25" in result
        assert "多云" in result
        assert "60" in result


class TestSchedulePlugin:
    """日程插件测试类"""

    @pytest.fixture
    def plugin(self):
        from plugins.schedule import SchedulePlugin
        return SchedulePlugin()

    @pytest.fixture
    def mock_context(self):
        ctx = MagicMock()
        ctx.user.id = "test_user"
        return ctx

    def test_add_schedule(self, plugin, mock_context):
        """测试添加日程"""
        # 同步方法测试
        result = plugin.add_schedule(
            mock_context, 
            "下午3点开会讨论项目"
        )

        assert "已添加" in result
        assert "开会" in result
        assert mock_context.user.id in plugin.schedules
        assert len(plugin.schedules[mock_context.user.id]) == 1

    def test_list_empty_schedules(self, plugin, mock_context):
        """测试空日程列表"""
        result = plugin.list_schedules(mock_context, "")

        assert "没有" in result or "添加任何" in result

    def test_delete_schedule(self, plugin, mock_context):
        """测试删除日程"""
        # 先添加一个日程
        plugin.add_schedule(mock_context, "测试日程")

        # 删除它
        mock_context.message.text = "删除日程 1"
        result = plugin._handle_delete(mock_context)

        assert "删除" in result
        assert len(plugin.schedules[mock_context.user.id]) == 0


# 运行测试
if __name__ == "__main__":
    pytest.main([__file__, "-v"])

6.4 部署与运维

生产环境的部署建议:

# deploy/docker-compose.yml
"""
Docker 部署配置示例
"""

version: '3.8'

services:
  langbot:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: langbot-assistant
    restart: unless-stopped
    environment:
      - LANGBOT_LLM_API_KEY=${LANGBOT_LLM_API_KEY}
      - LANGBOT_TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
      - LANGBOT_LOG_LEVEL=INFO
      - REDIS_URL=redis://redis:6379/0
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    depends_on:
      - redis
    networks:
      - langbot-network

  redis:
    image: redis:7-alpine
    container_name: langbot-redis
    restart: unless-stopped
    volumes:
      - redis-data:/data
    networks:
      - langbot-network

  nginx:
    image: nginx:alpine
    container_name: langbot-nginx
    restart: unless-stopped
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - langbot
    networks:
      - langbot-network

networks:
  langbot-network:
    driver: bridge

volumes:
  redis-data:

七、总结与资源链接

7.1 项目总结

通过本文的学习,你应该已经掌握了以下技能:

学会了如何快速搭建 LangBot 开发环境,包括 Python 环境配置、依赖安装和基础配置。掌握了 LangBot 的核心架构,包括对话引擎、插件系统、平台适配器的设计与实现。完成了多个实战项目,从天气查询、日程管理到智能问答、客服机器人,涵盖了常见的使用场景。了解了性能优化、安全加固、测试策略等生产环境必备知识。

LangBot 的设计理念是让开发者专注于业务逻辑,而非底层实现。通过模块化的插件系统和清晰的 API 接口,即使是没有 AI 背景的开发者也能快速构建出功能强大的对话系统。

7.2 学习资源

以下是一些可以帮助你深入学习 LangBot 的资源:

LangBot 官方文档提供了完整的 API 参考和教程,建议仔细阅读各个模块的说明。LangChain 官方文档可以帮助你了解如何集成更强大的 LLM 能力。Python 异步编程指南对于理解 LangBot 的并发模型很有帮助。各类即时通讯平台的开发者文档则有助于深入理解平台适配器的实现。

7.3 相关项目推荐

如果你对 AI 对话系统感兴趣,以下开源项目也值得关注:

ChatGPT-Next-Web 是一个优雅的 ChatGPT 网页应用,适合学习前端与 AI 的结合。LangChain 是构建 LLM 应用的流行框架,与 LangBot 可以很好地配合使用。LlamaIndex 专注于知识检索增强生成,适合构建知识库问答系统。Dify 是一个开源的 LLM 应用开发平台,提供了可视化的应用编排能力。

7.4 参与贡献

LangBot 是一个开源项目,欢迎各位开发者参与贡献。你可以通过以下方式参与:提交 Bug 报告和功能建议,帮助改进项目。编写文档和教程,帮助更多人上手使用。贡献代码,修复问题或添加新功能。分享你的使用经验和最佳实践。


希望这篇教程能帮助你快速上手 LangBot,构建出属于自己的智能对话系统。如果你有任何问题或建议,欢迎在评论区留言交流!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注