别再纠结团队沟通工具了,Rocket.Chat 开源解决方案让企业协作效率翻倍

别再纠结团队沟通工具了,Rocket.Chat 开源解决方案让企业协作效率翻倍

别再纠结团队沟通工具了,Rocket.Chat 开源解决方案让企业协作效率翻倍

从痛点到解法,一文搞懂如何搭建私有化团队协作平台


为什么值得关注:开源通讯领域的新标杆

在当今数字化办公时代,团队沟通工具已经成为企业运转不可或缺的基础设施。从 Slack 到 Microsoft Teams,从 Discord 到飞书、钉钉,闭源解决方案固然功能强大,但高昂的订阅费用、数据安全隐患以及定制化限制等问题也让众多企业和团队望而却步。就在这样的背景下,Rocket.Chat 作为一款完全开源的团队协作平台,正在以其强大的功能、灵活的部署方式和活跃的社区支持,成为越来越多组织打造私有化通讯解决方案的首选。

Rocket.Chat 不仅仅是一个简单的聊天工具,它是一个功能完备的企业级通讯平台,支持即时消息、群组对话、音视频通话、屏幕共享、文件分享、主题定制、机器人集成等丰富特性。更重要的是,作为开源项目,Rocket.Chat 允许用户完全掌控自己的数据,部署在私有服务器上,这对于金融、医疗、政府等对数据安全有严格要求的行业来说尤为重要。根据 GitHub 官方数据,Rocket.Chat 仓库已获得超过 36,000 颗星标和 4,800 多个分支,被全球超过 5,000 万用户使用,这充分证明了其在开源社区的影响力和受欢迎程度。

选择 Rocket.Chat 的核心理由可以归纳为以下几点:首先,它是真正的开源软件,代码透明可审计,用户无需担心后门或数据泄露风险;其次,支持 Docker 一键部署,无需繁琐的配置即可快速搭建生产环境;第三,提供丰富的 API 接口和 Webhook 机制,便于与企业现有系统集成;第四,支持多种部署方式,包括本地服务器、云平台甚至树莓派等边缘设备;最后,拥有活跃的社区和详尽的文档资源,学习曲线相对平缓。对于希望摆脱商业软件束缚、追求数据主权和技术自由的团队而言,Rocket.Chat 无疑是一个值得深入了解的解决方案。


环境搭建:从零开始构建 Rocket.Chat 运行环境

系统要求与前置准备

在开始安装 Rocket.Chat 之前,我们需要确保系统满足最低运行要求。Rocket.Chat 对硬件资源的需求取决于预期的用户规模和使用场景。对于小型团队(不超过 50 并发用户),建议至少配置 2 核 CPU、4GB 内存和 40GB 可用存储空间;中型团队(50-200 用户)建议 4 核 CPU、8GB 内存和 80GB 存储;大规模部署则需要 8 核以上 CPU、16GB 以上内存和 100GB 以上的 SSD 存储。操作系统方面,官方推荐使用 Ubuntu 20.04 LTS 或 22.04 LTS,其他支持的 Linux 发行版包括 Debian、CentOS 和 Fedora。

在软件依赖方面,Rocket.Chat 基于 Node.js 构建,需要 MongoDB 作为数据存储,对于需要支持文件上传和媒体处理的场景,还需要 ImageMagick 和GraphicsMagick 等图像处理工具。Docker 和 Docker Compose 是推荐的部署方式,它们能够简化依赖管理和环境隔离,让部署过程更加可靠和可重复。如果你的目标是在本地机器上进行开发测试,那么只需要确保 Docker Desktop(Windows/Mac)或 Docker Engine(Linux)正常运行即可。

使用 Docker Compose 快速部署

Docker Compose 是部署 Rocket.Chat 最简便的方式,它通过一个声明式的配置文件定义所有需要的服务,一次命令即可启动完整的运行环境。下面是一个标准的 docker-compose.yml 配置文件示例,它包含 Rocket.Chat 主应用、MongoDB 数据库以及可选的 MongoDB 备份服务。

# Rocket.Chat Docker Compose 配置文件
version: '3.8'

services:
  # Rocket.Chat 应用主服务
  rocketchat:
    image: rocketchat/rocket.chat:latest
    container_name: rocketchat
    restart: unless-stopped
    ports:
      - "3000:3000"  # 应用访问端口
    environment:
      - ROOT_URL=http://localhost:3000  # 应用根 URL
      - MONGO_URL=mongodb://mongo:27017/rocketchat  # MongoDB 连接字符串
      - MONGO_OPLOG_URL=mongodb://mongo:27017/local  # MongoDB 操作日志
      - PORT=3000  # 应用监听端口
      - REG_TOKEN=your-optional-registry-token  # 可选的镜像仓库令牌
    depends_on:
      - mongo
    networks:
      - rocket-network

  # MongoDB 数据库服务
  mongo:
    image: mongo:5.0
    container_name: rocketchat-mongo
    restart: unless-stopped
    volumes:
      - mongo-data:/data/db  # 持久化存储
      - mongo-init:/docker-entrypoint-initdb.d  # 初始化脚本目录
    networks:
      - rocket-network
    command: mongod --replSet rs0 --oplogSize 128  # 启用副本集模式

  # MongoDB 副本集初始化服务(仅首次启动时运行)
  mongo-init-replica:
    image: mongo:5.0
    container_name: rocketchat-mongo-init
    depends_on:
      - mongo
    networks:
      - rocket-network
    command: >
      mongosh --host mongo:27017 --eval
      "
      rs.initiate({
        _id: 'rs0',
        members: [{ _id: 0, host: 'mongo:27017' }]
      });
      "
    restart: on-failure

# 数据卷定义
volumes:
  mongo-data:
    driver: local

# 网络定义
networks:
  rocket-network:
    driver: bridge

将上述配置保存为 docker-compose.yml 文件后,在终端中进入该文件所在目录,执行以下命令即可启动 Rocket.Chat:

# 拉取最新镜像并启动所有服务
docker-compose up -d

# 查看服务启动状态
docker-compose ps

# 查看实时日志输出
docker-compose logs -f rocketchat

服务启动后,等待大约 30 秒让 Rocket.Chat 完成初始化,然后打开浏览器访问 http://localhost:3000。如果一切正常,你应该能看到 Rocket.Chat 的欢迎界面和设置向导。整个首次启动过程通常不超过五分钟,即使是没有 Docker 经验的初学者也能轻松完成。

传统方式安装:从源码构建

对于希望深入了解 Rocket.Chat 架构、进行二次开发或需要更精细控制的开发者,从源码构建是更好的选择。以下是在 Ubuntu 系统上从源码安装的完整步骤。首先需要安装 Node.js 环境,Rocket.Chat 5.x 版本需要 Node.js 18.x LTS 版本:

# 安装 Node.js 18.x LTS
curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -
sudo apt-get install -y nodejs

# 验证 Node.js 和 npm 版本
node --version  # 应显示 v18.x.x
npm --version   # 应显示 9.x.x

# 安装 MongoDB 5.0
sudo apt-get install gnupg curl
curl -fsSL https://www.mongodb.org/static/pgp/server-5.0.asc | sudo gpg --dearmor -o /usr/share/keyrings/mongodb-server-5.0.gpg
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-5.0.gpg ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list
sudo apt-get update
sudo apt-get install -y mongodb-org

# 启动 MongoDB 服务
sudo systemctl start mongod
sudo systemctl enable mongod

# 创建 Rocket.Chat 数据目录
sudo mkdir -p /opt/rocketchat
cd /opt/rocketchat

# 克隆最新稳定版源码
sudo git clone https://github.com/RocketChat/Rocket.Chat.git .
sudo git checkout stable

# 安装依赖包
npm install

# 配置环境变量
export ROOT_URL="http://localhost:3000"
export MONGO_URL="mongodb://localhost:27017/rocketchat"
export MONGO_OPLOG_URL="mongodb://localhost:27017/local"
export PORT=3000

# 构建生产版本
npm run build

# 使用 PM2 启动服务
sudo npm install -g pm2
pm2 start main.js --name rocketchat

# 配置 PM2 开机自启
pm2 startup
pm2 save

从源码安装的优势在于可以获得完整的开发工具链支持,包括热重载、调试功能和源码级别的错误追踪。同时,开发者可以直接修改源码实现自定义功能,这对于需要深度定制的企业用户非常有价值。

生产环境部署建议

将 Rocket.Chat 部署到生产环境需要考虑更多的安全和性能因素。以下是一些关键的配置建议和最佳实践:

反向代理配置 —— 在生产环境中,强烈建议使用 Nginx 或 Caddy 作为反向代理服务器,这不仅能提供 SSL/TLS 终止、负载均衡和缓存等高级功能,还能隐藏后端服务的真实端口,增强安全性。下面是一个标准的 Nginx 配置示例:

# /etc/nginx/sites-available/rocketchat
upstream rocketchat_backend {
    server 127.0.0.1:3000;
    keepalive 64;
}

server {
    listen 80;
    server_name your-domain.com;
    return 301 https://$server_name$request_uri;  # 强制重定向到 HTTPS
}

server {
    listen 443 ssl http2;
    server_name your-domain.com;

    # SSL 证书配置
    ssl_certificate /etc/letsencrypt/live/your-domain.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/your-domain.com/privkey.pem;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;
    ssl_prefer_server_ciphers on;

    # 安全头配置
    add_header X-Frame-Options "SAMEORIGIN" always;
    add_header X-XSS-Protection "1; mode=block" always;
    add_header X-Content-Type-Options "nosniff" always;
    add_header Referrer-Policy "no-referrer-when-downgrade" always;

    location / {
        proxy_pass http://rocketchat_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $http_host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Nginx-Proxy true;

        # 超时配置
        proxy_connect_timeout 90s;
        proxy_send_timeout 90s;
        proxy_read_timeout 90s;

        # 文件上传大小限制
        client_max_body_size 100M;
    }

    # WebSocket 支持
    location /websocket {
        proxy_pass http://rocketchat_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $http_host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

数据库优化 —— 对于高并发场景,MongoDB 的配置优化至关重要。建议启用副本集模式以实现数据冗余和故障转移,定期执行索引维护并监控慢查询。可以在 MongoDB 配置文件中添加以下优化参数:

// /etc/mongod.conf 相关配置片段
storage:
  dbPath: /var/lib/mongodb
  journal:
    enabled: true
  engine: wiredTiger
  wiredTiger:
    engineConfig:
      cacheSizeGB: 2  # 根据可用内存调整
      journalCompressor: snappy
    collectionConfig:
      blockCompressor: snappy
    indexConfig:
      prefixCompression: true

# 网络配置
net:
  port: 27017
  bindIp: 127.0.0.1
  maxIncomingConnections: 65536

# 操作分析
operationProfiling:
  mode: slowOp
  slowOpThresholdMs: 100

核心功能详解:深入探索 Rocket.Chat 的强大能力

即时通讯与消息管理

Rocket.Chat 的消息系统是其最核心的功能模块,支持一对一私聊、群组讨论、公开频道等多种对话形式。消息类型方面,平台支持纯文本、富文本(支持 Markdown 语法)、代码片段(语法高亮)、文件附件、音频消息、视频消息、位置共享以及结构化消息模板。每条消息都具备完善的元数据记录,包括发送时间、已读状态、编辑历史和反应表情。

消息线程功能允许用户在主消息下创建回复分支,特别适合在繁忙频道中组织讨论。线程视图将相关回复折叠收起,避免刷屏的同时保持对话的连贯性。用户可以通过点击消息的「回复」按钮或使用「!」快捷命令启动新线程。消息提及功能支持 @用户名 和 @频道名 语法,被提及的用户会收到即时通知,这对于协调跨团队工作非常有用。

消息格式化示例 —— Rocket.Chat 支持完整的 Markdown 语法,以下是常用格式的演示:

# 标题文本(一到六个 # 分别对应不同级别)

**粗体文本** 或 __粗体文本__

*斜体文本*_斜体文本_

~~删除线文本~~

`行内代码`

代码块(支持语法高亮)
def hello_world():
print(“Hello, Rocket.Chat!”)

> 引用文本块

- 无序列表项
1. 有序列表项

[链接文本](https://example.com)

:smile:  表情符号支持标准 Emoji

@username  提及特定用户
#channel  提及频道

团队协作与组织管理

Rocket.Chat 提供了完善的团队组织结构管理功能。在管理员面板中,管理员可以创建、编辑和删除部门,分配用户到不同部门,设置部门负责人和成员关系。这种层级化的组织架构设计非常适合大型企业,能够实现精细的权限控制和资源隔离。

频道管理是团队协作的核心。Rocket.Chat 支持多种频道类型:公共频道对所有成员可见,任何人都可以加入;私有群组需要邀请才能加入,适合敏感讨论;讨论组是围绕特定话题创建的临时协作空间;直接消息则用于一对一的私密交流。管理员可以设置频道的存档、只读或禁言状态,在需要冻结某项讨论时非常有用。

权限管理机制 —— Rocket.Chat 采用基于角色的权限控制系统,每个角色包含一组预定义的权限,管理员可以将角色分配给用户。以下是权限管理相关的 API 调用示例:

"""
Rocket.Chat 权限管理示例
使用 Rocket.Chat REST API 进行权限和角色管理
"""
import requests
import json

class RocketChatPermissionManager:
    def __init__(self, base_url, api_key):
        """
        初始化权限管理器
        base_url: Rocket.Chat 服务器地址
        api_key: 管理员 API 密钥
        """
        self.base_url = base_url.rstrip('/')
        self.headers = {
            'X-Auth-Token': api_key,
            'Content-Type': 'application/json'
        }

    def list_roles(self):
        """获取所有角色列表"""
        response = requests.get(
            f"{self.base_url}/api/v1/roles.list",
            headers=self.headers
        )
        return response.json()

    def create_role(self, name, description="", scope="Users"):
        """
        创建新角色
        name: 角色名称
        description: 角色描述
        scope: 角色作用域(Users/Rooms)
        """
        payload = {
            "name": name,
            "description": description,
            "scope": scope
        }
        response = requests.post(
            f"{self.base_url}/api/v1/roles.create",
            headers=self.headers,
            json=payload
        )
        return response.json()

    def add_user_to_role(self, role_name, username):
        """将用户添加到指定角色"""
        payload = {
            "roleName": role_name,
            "username": username
        }
        response = requests.post(
            f"{self.base_url}/api/v1/roles.addUserToRole",
            headers=self.headers,
            json=payload
        )
        return response.json()

    def get_channel_roles(self, room_id):
        """获取频道中所有用户的角色信息"""
        response = requests.get(
            f"{self.base_url}/api/v1/channels.roles",
            headers=self.headers,
            params={"roomId": room_id}
        )
        return response.json()

    def set_user_role_in_room(self, room_id, username, role):
        """设置用户在特定房间的角色"""
        payload = {
            "roomId": room_id,
            "username": username,
            "role": role
        }
        response = requests.post(
            f"{self.base_url}/api/v1/roles.setUserRole",
            headers=self.headers,
            json=payload
        )
        return response.json()


# 使用示例
if __name__ == "__main__":
    # 初始化管理器
    manager = RocketChatPermissionManager(
        base_url="http://localhost:3000",
        api_key="your-admin-auth-token"
    )

    # 创建新角色
    result = manager.create_role(
        name="project-lead",
        description="项目负责人角色,拥有管理项目的特殊权限"
    )
    print("创建角色结果:", json.dumps(result, indent=2, ensure_ascii=False))

    # 将用户添加到角色
    add_result = manager.add_user_to_role("project-lead", "zhang_san")
    print("添加用户到角色结果:", json.dumps(add_result, indent=2, ensure_ascii=False))

    # 查看所有角色
    roles = manager.list_roles()
    print("当前角色列表:", json.dumps(roles, indent=2, ensure_ascii=False))

音视频通话与屏幕共享

Rocket.Chat 内置了 WebRTC 技术的实时音视频通讯能力,用户可以直接在聊天界面中发起一对一或群组视频通话。无需安装额外的插件或软件,只要浏览器支持 WebRTC(现代主流浏览器均支持),就可以实现流畅的音视频体验。对于私有部署场景,可以通过配置 TURN/STUN 服务器来解决 NAT 穿越问题,确保在复杂网络环境下的通话质量。

屏幕共享功能支持用户将自己的屏幕、窗口或特定应用分享给通话中的其他成员。这一功能对于远程会议、技术演示和在线培训场景非常实用。在共享时,用户可以选择共享整个屏幕、特定窗口或浏览器标签页,接收方可以看到共享内容并支持同步录制。

配置 TURN 服务器以优化通话质量 —— 在大规模部署或企业内网环境中,WebRTC 通话可能受到网络限制。配置 TURN 服务器可以显著改善这种情况:

// Rocket.Chat 管理面板 ->  Administration -> Rooms -> Video Conference
// 或通过环境变量配置

// 环境变量配置示例
export RC_SERVICES_TURN_UDP="true"
export RC_SERVICES_TURN_TLS="true"
export RC_SERVICES_TURN_MIN_PORT=50000
export RC_SERVICES_TURN_MAX_PORT=60000

// 手动配置 TURN 服务器信息
// 在 Rocket.Chat 设置中找到 "Video Conference" 相关配置项:
// - TURN Server: your-turn-server.example.com
// - TURN Server Username: turn_user
// - TURN Server Credential: secure_password
// - TURN Server Port: 3478
// - TURN Server TLS Port: 5349

集成与扩展能力

Rocket.Chat 的开放性是其最重要的特性之一。通过 REST API 和 WebSocket 实时事件接口,开发者可以将 Rocket.Chat 与几乎任何其他系统集成。官方提供了多种官方集成方式,包括 Slack 桥接(实现与 Slack 频道的双向同步)、Zapier 自动化集成、GitLab/GitHub 集成(代码提交、MR/PR 事件通知)、Jira/Linear 集成以及数百个通过 API 实现的第三方工具。

Outgoing Webhook 配置示例 —— 通过配置 Webhook,可以将 Rocket.Chat 中的消息事件发送到外部系统进行处理:

"""
Rocket.Chat Outgoing Webhook 处理服务示例
当 Rocket.Chat 中发生特定事件时,此服务接收并处理 webhook 通知
"""
from flask import Flask, request, jsonify
import hashlib
import hmac
import time

app = Flask(__name__)

# 验证 Rocket.Chat webhook 请求的签名
def verify_signature(request_data, signature, timestamp, secret):
    """
    验证请求是否来自 Rocket.Chat
    signature 格式: sha256=xxxx
    """
    signature_key = hmac.new(
        secret.encode(),
        f"{timestamp}{request_data}".encode(),
        hashlib.sha256
    ).hexdigest()

    expected_signature = f"sha256={signature_key}"
    return hmac.compare_digest(expected_signature, signature)


@app.route('/webhook/rocketchat', methods=['POST'])
def handle_rocketchat_event():
    """
    处理 Rocket.Chat 发送的 webhook 事件

    可处理的事件类型:
    - sendMessage: 发送消息时触发
    - editMessage: 编辑消息时触发
    - deleteMessage: 删除消息时触发
    - reactMessage: 添加反应时触发
    - joinChannel: 加入频道时触发
    - leaveChannel: 离开频道时触发
    """
    # 获取签名验证所需的 header
    signature = request.headers.get('X-RocketChat-Signature', '')
    timestamp = request.headers.get('X-RocketChat-User-Id', '')

    # 验证签名(在生产环境中务必启用!)
    secret = "your-webhook-secret"
    request_data = request.get_data(as_text=True)

    # === 实际生产环境应启用以下验证 ===
    # if not verify_signature(request_data, signature, timestamp, secret):
    #     return jsonify({"error": "Invalid signature"}), 401
    # ===================================

    payload = request.get_json()

    # 解析事件类型
    event_type = payload.get('event')

    if event_type == 'sendMessage':
        # 处理新消息事件
        message = payload.get('message', {})
        sender = message.get('u', {}).get('username', 'unknown')
        text = message.get('msg', '')
        room_id = payload.get('room', {}).get('_id', '')

        print(f"[收到消息] 用户: {sender}, 频道: {room_id}")
        print(f"[消息内容] {text}")

        # === 这里添加你的业务逻辑 ===
        # 例如:保存消息到数据库、分析情感、处理命令等

        # 如果需要回复消息
        # response = send_reply_to_rocketchat(room_id, "消息已收到!")

    elif event_type == 'reactMessage':
        # 处理反应事件
        message_id = payload.get('message', {}).get('_id')
        emoji = payload.get('message', {}).get('reactions', {})
        user = payload.get('user', {}).get('username')
        print(f"[收到反应] 用户: {user}, 消息: {message_id}, 反应: {emoji}")

    return jsonify({"status": "ok"}), 200


@app.route('/webhook/health', methods=['GET'])
def health_check():
    """健康检查端点"""
    return jsonify({
        "status": "healthy",
        "timestamp": int(time.time())
    }), 200


if __name__ == "__main__":
    # 启动 webhook 服务
    # 建议使用 gunicorn 等生产级 WSGI 服务器
    app.run(host='0.0.0.0', port=5000, debug=False)

Incoming Webhook 实现 —— 通过 Incoming Webhook,外部系统可以向 Rocket.Chat 频道推送消息:

"""
Rocket.Chat Incoming Webhook 消息发送示例
用于从外部系统向 Rocket.Chat 频道推送消息和通知
"""
import requests
import json
from datetime import datetime


class RocketChatWebhookNotifier:
    """Rocket.Chat Incoming Webhook 通知器"""

    def __init__(self, webhook_url):
        """
        初始化通知器
        webhook_url: Rocket.Chat Incoming Webhook 的完整 URL
        """
        self.webhook_url = webhook_url

    def send_text_message(self, text, channel=None):
        """发送纯文本消息"""
        payload = {
            "text": text,
            "channel": channel  # 可指定频道,如 #general
        }
        response = requests.post(
            self.webhook_url,
            json=payload
        )
        return response.status_code == 200

    def send_rich_message(self, text, attachments=None, 
                          emoji=None, username="Bot"):
        """
        发送富文本消息

        attachments 支持的格式:
        {
            "color": "#ff0000",  # 颜色条
            "text": "附加说明文本",
            "author_name": "作者名",
            "author_link": "作者主页",
            "author_icon": "作者头像 URL",
            "title": "附件标题",
            "title_link": "标题链接",
            "image_url": "图片 URL",
            "thumb_url": "缩略图 URL",
            "footer": "页脚文本",
            "ts": "时间戳"
        }
        """
        payload = {
            "text": text,
            "username": username,
            "attachments": attachments or []
        }

        if emoji:
            payload["icon_emoji"] = emoji

        response = requests.post(
            self.webhook_url,
            json=payload
        )
        return response.json()

    def send_code_deployment_notification(self, project, version, 
                                           status, deployer):
        """
        发送代码部署通知(实际场景示例)
        """
        status_emoji = {
            "success": ":white_check_mark:",
            "failed": ":x:",
            "rollback": ":leftwards_arrow_with_hook:"
        }.get(status, ":question:")

        color = {
            "success": "#36a64f",
            "failed": "#ff0000",
            "rollback": "#ffa500"
        }.get(status, "#808080")

        attachments = [{
            "color": color,
            "title": f"部署通知 - {project}",
            "fields": [
                {
                    "title": "版本",
                    "value": version,
                    "short": True
                },
                {
                    "title": "状态",
                    "value": f"{status_emoji} {status.upper()}",
                    "short": True
                },
                {
                    "title": "部署人",
                    "value": deployer,
                    "short": True
                },
                {
                    "title": "时间",
                    "value": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "short": True
                }
            ],
            "footer": "自动化部署系统",
            "ts": int(datetime.now().timestamp())
        }]

        return self.send_rich_message(
            text=f"*{project}* 部署完成",
            attachments=attachments,
            username="Deploy Bot",
            emoji=":rocket:"
        )

    def send_alert(self, severity, title, description, source):
        """
        发送告警通知(实际场景示例)
        """
        severity_config = {
            "critical": {"emoji": ":rotating_light:", "color": "#ff0000"},
            "warning": {"emoji": ":warning:", "color": "#ffa500"},
            "info": {"emoji": ":information_source:", "color": "#808080"}
        }

        config = severity_config.get(severity, severity_config["info"])

        attachments = [{
            "color": config["color"],
            "title": f"{config['emoji']} {title}",
            "text": description,
            "fields": [
                {
                    "title": "告警级别",
                    "value": severity.upper(),
                    "short": True
                },
                {
                    "title": "来源",
                    "value": source,
                    "short": True
                },
                {
                    "title": "时间",
                    "value": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "short": True
                }
            ]
        }]

        return self.send_rich_message(
            text=title,
            attachments=attachments,
            username="Alert System",
            emoji=config["emoji"]
        )


# 使用示例
if __name__ == "__main__":
    notifier = RocketChatWebhookNotifier(
        webhook_url="https://your-rocketchat.com/hooks/xxxxx/xxxxx"
    )

    # 发送普通文本消息
    notifier.send_text_message(
        text="这是一条来自外部系统的测试消息"
    )

    # 发送部署通知
    notifier.send_code_deployment_notification(
        project="my-awesome-app",
        version="v2.1.0",
        status="success",
        deployer="zhang_san"
    )

    # 发送告警
    notifier.send_alert(
        severity="critical",
        title="服务器 CPU 使用率过高",
        description="生产服务器 cpu_usage 指标超过 90%,请及时处理!",
        source="Prometheus Monitoring"
    )

自定义表情与资源管理

Rocket.Chat 支持自定义表情包(Emoji),管理员可以上传自定义表情到服务器供所有用户使用。表情包支持 PNG 和 GIF 格式,单个文件大小限制可通过管理面板配置。自定义表情按照类别组织,用户可以在表情选择器中快速找到并使用。

文件管理方面,Rocket.Chat 提供了统一的文件存储系统,支持多种存储后端适配器。默认使用本地文件系统存储,对于需要高可用和分布式存储的生产环境,可以配置 Amazon S3、Google Cloud Storage、Microsoft Azure Blob Storage 或 WebDAV 等云存储服务。每个上传的文件都会生成唯一的访问链接,并支持设置过期时间和访问权限。


逐步实战教程:通过 REST API 构建自动化工作流

API 认证与基础设置

在深入实战之前,首先需要了解 Rocket.Chat API 的认证机制。Rocket.Chat 支持两种认证方式:用户凭证认证(用户名+密码登录后获取 Token)和 Personal Access Token(PAT)认证。后者更适合开发场景和自动化脚本,可以避免频繁输入密码。以下是获取认证 Token 的方法:

"""
Rocket.Chat API 认证模块
演示如何获取和管理 API 认证 Token
"""
import requests
import json
import time
from typing import Optional, Dict, Any


class RocketChatAuth:
    """Rocket.Chat 认证管理器"""

    def __init__(self, base_url: str):
        self.base_url = base_url.rstrip('/')
        self.auth_token: Optional[str] = None
        self.user_id: Optional[str] = None

    def login_with_credentials(self, username: str, password: str) -> Dict[str, Any]:
        """
        使用用户名密码登录并获取认证 Token

        返回的 JSON 包含:
        - status: 登录状态
        - data: 包含 userId 和 authToken 的数据
        """
        endpoint = f"{self.base_url}/api/v1/login"
        payload = {
            "user": username,
            "password": password
        }

        response = requests.post(endpoint, json=payload)
        result = response.json()

        if result.get('status') == 'success':
            self.auth_token = result['data']['authToken']
            self.user_id = result['data']['userId']
            print(f"登录成功! 用户: {username}")
            print(f"User ID: {self.user_id}")
            print(f"Auth Token: {self.auth_token[:20]}...")  # 隐藏敏感信息
        else:
            print(f"登录失败: {result.get('message', '未知错误')}")

        return result

    def create_personal_access_token(self, token_name: str, 
                                      username: str = None) -> Dict[str, Any]:
        """
        创建 Personal Access Token (PAT)
        需要管理员权限或通过用户密码认证后操作
        """
        endpoint = f"{self.base_url}/api/v1/users.createToken"

        # 如果已登录,直接使用当前用户创建 token
        if self.auth_token and self.user_id:
            payload = {"userId": self.user_id}
        elif username:
            payload = {"username": username}
        else:
            raise ValueError("需要提供 username 或先登录")

        headers = {}
        if self.auth_token:
            headers['X-Auth-Token'] = self.auth_token
        if self.user_id:
            headers['X-User-Id'] = self.user_id

        response = requests.post(endpoint, json=payload, headers=headers)
        result = response.json()

        if result.get('success'):
            token = result['data']['authToken']
            print(f"PAT 创建成功!")
            print(f"Token 名称: {token_name}")
            print(f"Token: {token}")
            print("请妥善保存此 Token,它不会再次显示!")

        return result

    def get_auth_headers(self) -> Dict[str, str]:
        """获取用于 API 请求的认证 Headers"""
        if not self.auth_token or not self.user_id:
            raise ValueError("尚未认证,请先登录")
        return {
            'X-Auth-Token': self.auth_token,
            'X-User-Id': self.user_id,
            'Content-Type': 'application/json'
        }


# 使用示例
if __name__ == "__main__":
    # 初始化认证管理器
    auth = RocketChatAuth("http://localhost:3000")

    # 方式一:使用用户名密码登录
    auth.login_with_credentials(
        username="admin",
        password="your-password"
    )

    # 方式二:如果已认证,创建 PAT
    if auth.auth_token:
        pat_result = auth.create_personal_access_token(
            token_name="my-automation-script",
            username="admin"
        )

    # 使用 PAT 初始化(适合持久化使用)
    print("\n" + "="*50)
    print("使用 PAT 认证")
    print("="*50)

    pat_token = "your-personal-access-token"
    pat_user_id = "your-user-id"

    headers = {
        'X-Auth-Token': pat_token,
        'X-User-Id': pat_user_id,
        'Content-Type': 'application/json'
    }

    # 验证 PAT 是否有效
    verify_response = requests.get(
        "http://localhost:3000/api/v1/me",
        headers=headers
    )
    print("PAT 验证结果:", verify_response.json())

实战一:自动化消息处理机器人

现在让我们构建一个完整的消息处理机器人,它能够响应特定命令、执行自动化任务并与用户互动:

"""
Rocket.Chat 智能消息处理机器人
功能:
- 响应 !help, !time, !weather 等命令
- 记录用户消息到数据库
- 定时发送日报摘要
"""
import requests
import json
import time
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import threading


class RocketChatBot:
    """Rocket.Chat 消息处理机器人"""

    def __init__(self, base_url: str, auth_token: str, user_id: str):
        self.base_url = base_url.rstrip('/')
        self.auth_token = auth_token
        self.user_id = user_id
        self.headers = {
            'X-Auth-Token': auth_token,
            'X-User-Id': user_id,
            'Content-Type': 'application/json'
        }
        self.db_path = "bot_messages.db"
        self.init_database()
        self.command_handlers = {
            '!help': self.cmd_help,
            '!time': self.cmd_time,
            '!info': self.cmd_info,
            '!stats': self.cmd_stats,
            '!user': self.cmd_user_info,
            '!channel': self.cmd_channel_info
        }

    def init_database(self):
        """初始化 SQLite 数据库用于存储消息记录"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                message_id TEXT UNIQUE,
                username TEXT,
                channel TEXT,
                content TEXT,
                timestamp DATETIME,
                processed BOOLEAN DEFAULT 0
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS commands (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT,
                command TEXT,
                result TEXT,
                timestamp DATETIME
            )
        ''')

        conn.commit()
        conn.close()
        print("[数据库] 初始化完成")

    def save_message(self, message_id: str, username: str, 
                     channel: str, content: str):
        """保存消息到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT OR IGNORE INTO messages 
            (message_id, username, channel, content, timestamp)
            VALUES (?, ?, ?, ?, ?)
        ''', (message_id, username, channel, content, datetime.now()))

        conn.commit()
        conn.close()

    # === 机器人命令处理器 ===

    def cmd_help(self, args: List[str], username: str) -> str:
        """处理 !help 命令"""
        help_text = """
🤖 *机器人可用命令:*

`!help` - 显示此帮助信息
`!time` - 显示当前时间
`!info` - 显示机器人信息
`!stats` - 显示消息统计
`!user <username>` - 查询用户信息
`!channel <channel>` - 查询频道信息

📌 *快捷操作:*
- 回复消息使用 @botname
- 使用 :emoji: 添加表情
- 支持 Markdown 格式
"""
        return help_text.strip()

    def cmd_time(self, args: List[str], username: str) -> str:
        """处理 !time 命令"""
        now = datetime.now()
        return f"🕐 *当前时间:* `{now.strftime('%Y-%m-%d %H:%M:%S')}`"

    def cmd_info(self, args: List[str], username: str) -> str:
        """处理 !info 命令"""
        return """
📋 *机器人信息:*

名称:RocketChat Bot v1.0
平台:Rocket.Chat
开发者:Your Team
功能:消息处理、命令响应、自动化任务
运行时间:24/7 在线
"""

    def cmd_stats(self, args: List[str], username: str) -> str:
        """处理 !stats 命令"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # 获取今日消息数
        today = datetime.now().date().isoformat()
        cursor.execute(
            'SELECT COUNT(*) FROM messages WHERE date(timestamp) = ?',
            (today,)
        )
        today_count = cursor.fetchone()[0]

        # 获取总消息数
        cursor.execute('SELECT COUNT(*) FROM messages')
        total_count = cursor.fetchone()[0]

        # 获取最活跃用户
        cursor.execute('''
            SELECT username, COUNT(*) as cnt 
            FROM messages 
            GROUP BY username 
            ORDER BY cnt DESC 
            LIMIT 5
        ''')
        top_users = cursor.fetchall()

        conn.close()

        top_users_text = '\n'.join(
            f"  • @{user}: {count} 条" for user, count in top_users
        ) or "  暂无数据"

        return f"""
📊 *消息统计:*

今日消息:{today_count}
总消息数:{total_count}

🏆 *活跃用户 Top 5:*
{top_users_text}
"""

    def cmd_user_info(self, args: List[str], username: str) -> str:
        """处理 !user 命令,查询用户信息"""
        if not args:
            return "❌ 请提供用户名,例如:`!user zhang_san`"

        target_username = args[0]

        response = requests.get(
            f"{self.base_url}/api/v1/users.info",
            headers=self.headers,
            params={"username": target_username}
        )

        result = response.json()

        if not result.get('success'):
            return f"❌ 未找到用户:{target_username}"

        user_data = result['user']

        return f"""
👤 *用户信息:*

用户名:@{user_data['username']}
显示名称:{user_data.get('name', '未设置')}
邮箱:{user_data.get('emails', [{}])[0].get('address', '未公开')}
状态:{user_data.get('status', 'offline')}
最后活跃:{user_data.get('lastLogin', '未知')}
创建时间:{user_data.get('createdAt', '未知')}
"""

    def cmd_channel_info(self, args: List[str], username: str) -> str:
        """处理 !channel 命令,查询频道信息"""
        if not args:
            return "❌ 请提供频道名,例如:`!channel general`"

        channel_name = args[0]
        if not channel_name.startswith('#'):
            channel_name = '#' + channel_name

        response = requests.get(
            f"{self.base_url}/api/v1/channels.info",
            headers=self.headers,
            params={"roomName": channel_name.lstrip('#')}
        )

        result = response.json()

        if not result.get('success'):
            return f"❌ 未找到频道:{channel_name}"

        room_data = result['channel']

        return f"""
📁 *频道信息:*

频道名:{room_data.get('name', '')}
话题:{room_data.get('topic', '未设置') or '无'}
成员数:{room_data.get('membersCount', 0)}
消息数:{room_data.get('messagesCount', 0)}
类型:{'公开频道' if room_data.get('t') == 'c' else '私有群组'}
创建时间:{room_data.get('createdAt', '未知')}
"""

    def process_message(self, message: Dict[str, Any]) -> Optional[str]:
        """
        处理收到的消息,返回回复内容或 None

        message 格式(来自 Rocket.Chat WebSocket):
        {
            "_id": "消息ID",
            "msg": "消息内容",
            "rid": "房间ID",
            "u": {"_id": "用户ID", "username": "用户名"},
            "ts": "时间戳"
        }
        """
        content = message.get('msg', '').strip()
        username = message.get('u', {}).get('username', 'unknown')
        room_id = message.get('rid', '')
        message_id = message.get('_id', '')

        # 忽略机器人自己的消息
        user_info = self.get_current_user_info()
        if user_info.get('username') == username:
            return None

        # 保存消息到数据库
        self.save_message(message_id, username, room_id, content)

        # 检查是否是命令
        if content.startswith('!'):
            parts = content.split()
            command = parts[0].lower()
            args = parts[1:]

            # 查找对应的处理器
            handler = self.command_handlers.get(command)
            if handler:
                print(f"[命令] 用户 {username} 执行: {command}")

                # 记录命令执行
                result = handler(args, username)
                self.log_command(username, command, result)

                return result

        return None

    def log_command(self, username: str, command: str, result: str):
        """记录命令执行日志"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO commands (username, command, result, timestamp)
            VALUES (?, ?, ?, ?)
        ''', (username, command, result[:500], datetime.now()))

        conn.commit()
        conn.close()

    def get_current_user_info(self) -> Dict[str, Any]:
        """获取当前认证用户的信息"""
        response = requests.get(
            f"{self.base_url}/api/v1/me",
            headers=self.headers
        )
        return response.json().get('data', {})

    def send_message(self, room_id: str, text: str) -> Dict[str, Any]:
        """发送消息到指定房间"""
        payload = {
            "roomId": room_id,
            "text": text
        }

        response = requests.post(
            f"{self.base_url}/api/v1/chat.sendMessage",
            headers=self.headers,
            json=payload
        )

        return response.json()

    def run(self):
        """运行机器人的主循环"""
        print("="*50)
        print("Rocket.Chat 消息机器人启动中...")
        print("="*50)

        # 获取当前用户信息
        current_user = self.get_current_user_info()
        print(f"机器人账号: @{current_user.get('username')}")
        print(f"机器人 ID: {current_user.get('_id')}")
        print("-"*50)

        # 订阅消息流(使用 Streaming API)
        # 注意:这是一个简化版本,生产环境应使用 WebSocket
        print("开始监听消息...")
        print("按 Ctrl+C 停止")

        try:
            while True:
                # 简化实现:每5秒检查一次新消息
                # 生产环境推荐使用 WebSocket 实现实时响应
                time.sleep(5)

                # 可以在这里添加轮询逻辑或 WebSocket 监听
                # 由于篇幅限制,这里仅展示核心处理逻辑

        except KeyboardInterrupt:
            print("\n机器人已停止")


# 独立的 WebSocket 监听实现(推荐用于生产环境)
import websocket
import rel


class RocketChatWebSocketListener:
    """
    使用 WebSocket 连接到 Rocket.Chat 实时消息流
    推荐用于需要实时响应的生产环境
    """

    def __init__(self, base_url: str, auth_token: str, user_id: str):
        self.base_url = base_url
        self.auth_token = auth_token
        self.user_id = user_id
        self.ws = None
        self.bot = RocketChatBot(base_url, auth_token, user_id)

    def on_message(self, ws, message):
        """处理接收到的 WebSocket 消息"""
        try:
            data = json.loads(message)

            # 解析消息类型
            msg_type = data.get('msg', '')

            if msg_type == 'connected':
                print("[WebSocket] 已连接")
                # 订阅房间消息
                self.subscribe_to_messages()

            elif msg_type == 'result':
                # 处理 API 调用结果
                pass

            elif msg_type == 'changed':
                # 处理数据变更(消息)
                fields = data.get('fields', {})
                collection = fields.get('collection', '')

                if collection == 'stream-room-messages':
                    args = fields.get('args', [])
                    if args:
                        message_data = args[0]
                        # 处理消息
                        reply = self.bot.process_message(message_data)
                        if reply:
                            room_id = message_data.get('rid')
                            self.bot.send_message(room_id, reply)

        except json.JSONDecodeError:
            print(f"[WebSocket] 解析消息失败: {message[:100]}")
        except Exception as e:
            print(f"[WebSocket] 处理消息时出错: {e}")

    def on_error(self, ws, error):
        print(f"[WebSocket] 错误: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        print(f"[WebSocket] 连接关闭: {close_status_code} - {close_msg}")

    def on_open(self, ws):
        """WebSocket 连接建立时的处理"""
        # 发送认证信息
        auth_data = {
            "msg": "connect",
            "version": "1",
            "support": ["1"]
        }
        ws.send(json.dumps(auth_data))

        # 发送登录信息
        login_data = {
            "msg": "method",
            "method": "login",
            "id": "1",
            "params": [
                {
                    "user": {"id": self.user_id},
                    "tokens": [self.auth_token]
                }
            ]
        }
        ws.send(json.dumps(login_data))

    def subscribe_to_messages(self):
        """订阅所有房间的消息"""
        # 订阅公共频道
        subscribe_data = {
            "msg": "sub",
            "id": "subscribe-rooms",
            "name": "stream-room-messages",
            "params": [
                "__rooms",
                False
            ]
        }
        self.ws.send(json.dumps(subscribe_data))

    def connect(self):
        """建立 WebSocket 连接"""
        # 构建 WebSocket URL
        ws_url = self.base_url.replace('http', 'ws') + '/websocket'

        print(f"[WebSocket] 连接到 {ws_url}")

        self.ws = websocket.WebSocketApp(
            ws_url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            on_open=self.on_open
        )

        # 运行 WebSocket 客户端(自动重连)
        self.ws.run_forever(
            dispatch_timeout_ms=10000,
            reconnect=5
        )


# 启动示例
if __name__ == "__main__":
    # 配置参数
    BASE_URL = "http://localhost:3000"
    AUTH_TOKEN = "your-auth-token"
    USER_ID = "your-user-id"

    # 方式一:简单轮询模式
    # bot = RocketChatBot(BASE_URL, AUTH_TOKEN, USER_ID)
    # bot.run()

    # 方式二:WebSocket 实时模式(推荐)
    listener = RocketChatWebSocketListener(BASE_URL, AUTH_TOKEN, USER_ID)
    listener.connect()

实战二:构建 GitLab 集成实现自动化部署通知

接下来让我们构建一个实际可用的 GitLab 集成系统,当代码提交、合并请求或流水线状态发生变化时,自动在 Rocket.Chat 频道中发送通知:

"""
GitLab Rocket.Chat 集成系统
功能:
- 监听 GitLab Webhook 事件
- 自动向 Rocket.Chat 发送构建状态通知
- 支持代码提交、MR/PR、CI/CD 流水线等事件
"""
import requests
import json
import hashlib
import hmac
import os
from datetime import datetime
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum


class BuildStatus(Enum):
    """构建状态枚举"""
    SUCCESS = "success"
    FAILED = "failed"
    RUNNING = "running"
    PENDING = "pending"
    CANCELED = "canceled"

    def get_emoji(self) -> str:
        emoji_map = {
            "success": ":white_check_mark:",
            "failed": ":x:",
            "running": ":hourglass_flowing_sand:",
            "pending": ":clock1:",
            "canceled": ":no_entry_sign:"
        }
        return emoji_map.get(self.value, ":question:")

    def get_color(self) -> str:
        color_map = {
            "success": "#36a64f",
            "failed": "#ff0000",
            "running": "#ffa500",
            "pending": "#808080",
            "canceled": "#808080"
        }
        return color_map.get(self.value, "#808080")


@dataclass
class GitLabEvent:
    """GitLab Webhook 事件数据结构"""
    object_kind: str
    project_name: str
    project_url: str
    branch: str
    commit_sha: str
    commit_message: str
    commit_author: str
    build_status: Optional[str] = None
    build_duration: Optional[float] = None
    merge_request: Optional[Dict] = None


class GitLabRocketChatNotifier:
    """GitLab 到 Rocket.Chat 的通知转发器"""

    def __init__(self, rocketchat_webhook_url: str, 
                 gitlab_secret_token: str = None):
        self.rocketchat_webhook_url = rocketchat_webhook_url
        self.gitlab_secret_token = gitlab_secret_token

    def verify_gitlab_token(self, request_headers: Dict, 
                            request_body: bytes) -> bool:
        """
        验证 GitLab Webhook 请求的签名
        GitLab 使用 X-Gitlab-Token header 传递 secret token
        """
        if not self.gitlab_secret_token:
            return True  # 未配置 token 时跳过验证

        received_token = request_headers.get('X-Gitlab-Token', '')
        return hmac.compare_digest(received_token, self.gitlab_secret_token)

    def parse_gitlab_event(self, event_type: str, 
                           payload: Dict) -> GitLabEvent:
        """解析 GitLab Webhook payload 为统一的事件对象"""
        project = payload.get('project', {})

        event = GitLabEvent(
            object_kind=event_type,
            project_name=project.get('name', 'Unknown'),
            project_url=project.get('web_url', ''),
            branch=self._get_branch(payload),
            commit_sha=self._get_sha(payload),
            commit_message=self._get_message(payload),
            commit_author=self._get_author(payload)
        )

        # 解析构建状态(用于 pipeline 和 build 事件)
        if 'build_status' in payload:
            event.build_status = payload['build_status']
        elif 'object_attributes' in payload:
            attrs = payload['object_attributes']
            event.build_status = attrs.get('status')

        # 解析构建时长
        if 'build_duration' in payload:
            event.build_duration = payload['build_duration']

        # 解析合并请求信息
        if 'object_kind' in payload and payload['object_kind'] == 'merge_request':
            event.merge_request = {
                'id': payload.get('object_attributes', {}).get('id'),
                'title': payload.get('object_attributes', {}).get('title'),
                'source': payload.get('source', {}).get('git_http_url', ''),
                'target': payload.get('target', {}).get('git_http_url', ''),
                'action': payload.get('object_attributes', {}).get('action')
            }

        return event

    def _get_branch(self, payload: Dict) -> str:
        """提取分支名称"""
        if 'ref' in payload:
            return payload['ref'].replace('refs/heads/', '')
        if 'object_attributes' in payload:
            return payload['object_attributes'].get('target_branch', '')
        if 'checkout_sha' in payload:
            return payload.get('checkout_sha', '')[:8]
        return 'unknown'

    def _get_sha(self, payload: Dict) -> str:
        """提取提交 SHA"""
        return payload.get('checkout_sha', 
                          payload.get('sha', ''))[:8]

    def _get_message(self, payload: Dict) -> str:
        """提取提交消息"""
        commits = payload.get('commits', [])
        if commits:
            return commits[0].get('message', '').split('\n')[0][:100]
        if 'object_attributes' in payload:
            return payload['object_attributes'].get('last_commit', {}).get(
                'message', '').split('\n')[0][:100]
        return 'No message'

    def _get_author(self, payload: Dict) -> str:
        """提取提交作者"""
        commits = payload.get('commits', [])
        if commits:
            return commits[0].get('author', {}).get('name', 'Unknown')
        if 'user_name' in payload:
            return payload.get('user_name', 'Unknown')
        if 'object_attributes' in payload:
            return payload['object_attributes'].get('last_commit', {}).get(
                'author', {}).get('name', 'Unknown')
        return 'Unknown'

    def format_push_notification(self, event: GitLabEvent) -> Dict:
        """格式化代码推送通知"""
        commit_link = f"{event.project_url}/-/commit/{event.commit_sha}"

        return {
            "text": f"📤 *{event.project_name}* 有新提交",
            "attachments": [
                {
                    "color": "#36a64f",
                    "title": f"新提交: {event.commit_message}",
                    "title_link": commit_link,
                    "fields": [
                        {
                            "title": "分支",
                            "value": f"`{event.branch}`",
                            "short": True
                        },
                        {
                            "title": "提交",
                            "value": f"`{event.commit_sha}`",
                            "short": True
                        },
                        {
                            "title": "作者",
                            "value": event.commit_author,
                            "short": True
                        }
                    ],
                    "footer": f"{event.project_name} | GitLab",
                    "ts": int(datetime.now().timestamp())
                }
            ],
            "username": "GitLab Bot",
            "icon_emoji": ":gitlab:"
        }

    def format_pipeline_notification(self, event: GitLabEvent) -> Dict:
        """格式化流水线状态通知"""
        status = BuildStatus(event.build_status)
        pipeline_link = f"{event.project_url}/-/pipelines"

        # 构建持续时间显示
        duration_text = ""
        if event.build_duration:
            minutes = int(event.build_duration // 60)
            seconds = int(event.build_duration % 60)
            duration_text = f"\n⏱️ 耗时: {minutes}{seconds}秒"

        return {
            "text": f"🔄 *{event.project_name}* 流水线更新",
            "attachments": [
                {
                    "color": status.get_color(),
                    "title": f"{status.get_emoji()} 构建{status.value.upper()}",
                    "fields": [
                        {
                            "title": "分支",
                            "value": f"`{event.branch}`",
                            "short": True
                        },
                        {
                            "title": "状态",
                            "value": f"{status.get_emoji()} {status.value}",
                            "short": True
                        },
                        {
                            "title": "提交",
                            "value": f"`{event.commit_sha}`",
                            "short": True
                        }
                    ],
                    "text": duration_text,
                    "footer": f"{event.project_name} | GitLab Pipeline",
                    "ts": int(datetime.now().timestamp())
                }
            ],
            "username": "GitLab Bot",
            "icon_emoji": ":gitlab:"
        }

    def format_mr_notification(self, event: GitLabEvent) -> Dict:
        """格式化合并请求通知"""
        if not event.merge_request:
            return {}

        mr = event.merge_request
        action_emoji = {
            "open": ":new:",
            "merge": ":tada:",
            "close": ":wave:",
            "update": ":pencil:"
        }

        action_text = {
            "open": "新建了合并请求",
            "merge": "合并了合并请求",
            "close": "关闭了合并请求",
            "update": "更新了合并请求"
        }

        emoji = action_emoji.get(mr['action'], ":information_source:")
        text = action_text.get(mr['action'], f"操作了合并请求")

        return {
            "text": f"📝 *{event.project_name}* {text}",
            "attachments": [
                {
                    "color": "#0077cc",
                    "title": f"{emoji} {mr['title']}",
                    "fields": [
                        {
                            "title": "源分支",
                            "value": f"`{event.branch}`",
                            "short": True
                        },
                        {
                            "title": "作者",
                            "value": event.commit_author,
                            "short": True
                        },
                        {
                            "title": "动作",
                            "value": mr['action'],
                            "short": True
                        }
                    ],
                    "footer": f"{event.project_name} | GitLab MR",
                    "ts": int(datetime.now().timestamp())
                }
            ],
            "username": "GitLab Bot",
            "icon_emoji": ":gitlab:"
        }

    def send_notification(self, payload: Dict) -> bool:
        """发送通知到 Rocket.Chat"""
        try:
            response = requests.post(
                self.rocketchat_webhook_url,
                json=payload,
                timeout=10
            )
            return response.status_code == 200
        except requests.RequestException as e:
            print(f"发送通知失败: {e}")
            return False

    def handle_webhook(self, event_type: str, payload: Dict,
                       headers: Dict) -> tuple[bool, str]:
        """
        处理 GitLab Webhook 请求的主入口

        返回: (success, message)
        """
        # 验证 token
        # 注意:实际使用时应验证,这里简化为打印
        print(f"[Webhook] 收到 {event_type} 事件")

        # 解析事件
        try:
            event = self.parse_gitlab_event(event_type, payload)
        except Exception as e:
            return False, f"解析事件失败: {e}"

        # 根据事件类型格式化通知
        if event_type == 'push':
            notification = self.format_push_notification(event)
        elif event_type in ('pipeline', 'build'):
            notification = self.format_pipeline_notification(event)
        elif event_type == 'merge_request':
            notification = self.format_mr_notification(event)
        elif event_type == 'tag_push':
            # 处理标签推送(发布版本)
            notification = {
                "text": f"🏷️ *{event.project_name}* 发布了新版本",
                "attachments": [
                    {
                        "color": "#9932cc",
                        "title": f"新标签: {event.branch}",
                        "fields": [
                            {"title": "提交", "value": event.commit_sha, "short": True}
                        ],
                        "footer": f"{event.project_name} | GitLab Release"
                    }
                ],
                "username": "GitLab Bot",
                "icon_emoji": ":gitlab:"
            }
        elif event_type == 'note':
            # 处理评论事件
            notification = {
                "text": f"💬 *{event.project_name}* 有新评论",
                "attachments": [
                    {
                        "color": "#0077cc",
                        "title": event.commit_message,
                        "fields": [
                            {"title": "作者", "value": event.commit_author, "short": True}
                        ],
                        "footer": f"{event.project_name} | GitLab Comment"
                    }
                ],
                "username": "GitLab Bot",
                "icon_emoji": ":gitlab:"
            }
        else:
            return False, f"不支持的事件类型: {event_type}"

        # 发送通知
        success = self.send_notification(notification)

        if success:
            return True, f"通知发送成功"
        else:
            return False, "通知发送失败"


# Flask Web 服务封装(用于接收 GitLab Webhook)
from flask import Flask, request, jsonify

app = Flask(__name__)

# 初始化通知器
notifier = GitLabRocketChatNotifier(
    rocketchat_webhook_url="https://your-rocketchat.com/hooks/xxxxx/xxxxx",
    gitlab_secret_token="your-gitlab-webhook-token"  # 可选的安全 token
)


@app.route('/webhook/gitlab', methods=['POST'])
def handle_gitlab_webhook():
    """
    GitLab Webhook 接收端点

    需要在 GitLab 项目的 Settings -> Webhooks 中配置:
    - URL: https://your-server.com/webhook/gitlab
    - Secret token: your-gitlab-webhook-token
    - Trigger: Push events, Merge request events, Pipeline events
    """
    # 获取事件类型
    event_type = request.headers.get('X-Gitlab-Event', '').lower()

    # GitLab 使用 "GitLab" 前缀,如 "Push Hook", "Merge Request Hook"
    if event_type.startswith('gitlab'):
        event_type = event_type.replace(' hook', '').replace(' hook', '').lower()

    # 映射事件类型名称
    event_mapping = {
        'push': 'push',
        'merge request': 'merge_request',
        'pipeline': 'pipeline',
        'tag push': 'tag_push',
        'note': 'note'
    }

    normalized_event_type = event_mapping.get(event_type, event_type)

    # 获取 payload
    payload = request.get_json()

    # 处理 webhook
    success, message = notifier.handle_webhook(
        normalized_event_type,
        payload,
        dict(request.headers)
    )

    if success:
        return jsonify({"status": "ok", "message": message}), 200
    else:
        return jsonify({"status": "error", "message": message}), 400


@app.route('/health', methods=['GET'])
def health():
    """健康检查端点"""
    return jsonify({
        "status": "healthy",
        "service": "gitlab-rocketchat-notifier",
        "timestamp": datetime.now().isoformat()
    })


if __name__ == "__main__":
    # 启动 Flask 服务
    # 生产环境应使用 gunicorn: gunicorn -w 4 -b 0.0.0.0:5000 app:app
    app.run(host='0.0.0.0', port=5000, debug=False)

实战三:用户管理与自动化工作流

最后一个实战案例将展示如何利用 Rocket.Chat API 实现复杂的用户管理和自动化工作流:

"""
Rocket.Chat 用户管理与自动化系统
功能:
- 自动创建和配置用户账号
- 用户生命周期管理(入职/离职自动化)
- 基于部门的权限分配
- 用户活跃度分析和报告
"""
import requests
import json
import csv
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
import hashlib


@dataclass
class User:
    """用户数据模型"""
    username: str
    email: str
    name: str = ""
    password: str = ""
    roles: List[str] = field(default_factory=list)
    department: str = ""
    active: bool = True
    user_id: str = ""

    def to_dict(self) -> Dict[str, Any]:
        return {
            "username": self.username,
            "email": self.email,
            "name": self.name,
            "password": self.password,
            "roles": self.roles,
            "active": self.active
        }


class RocketChatUserManager:
    """Rocket.Chat 用户管理系统"""

    def __init__(self, base_url: str, admin_token: str, admin_user_id: str):
        self.base_url = base_url.rstrip('/')
        self.admin_token = admin_token
        self.admin_user_id = admin_user_id
        self.headers = {
            'X-Auth-Token': admin_token,
            'X-User-Id': admin_user_id,
            'Content-Type': 'application/json'
        }
        self.db_path = "user_management.db"
        self.init_database()

    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # 用户同步日志表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS sync_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT,
                action TEXT,
                status TEXT,
                details TEXT,
                timestamp DATETIME
            )
        ''')

        # 用户信息缓存表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS user_cache (
                username TEXT PRIMARY KEY,
                email TEXT,
                name TEXT,
                user_id TEXT,
                department TEXT,
                last_synced DATETIME
            )
        ''')

        conn.commit()
        conn.close()
        print("[数据库] 用户管理系统初始化完成")

    def log_action(self, username: str, action: str, 
                   status: str, details: str = ""):
        """记录操作日志"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO sync_log (username, action, status, details, timestamp)
            VALUES (?, ?, ?, ?, ?)
        ''', (username, action, status, details, datetime.now()))

        conn.commit()
        conn.close()

    # === 用户管理核心方法 ===

    def create_user(self, user: User) -> Dict[str, Any]:
        """
        创建新用户

        user: User 对象,包含用户信息
        返回: API 响应结果
        """
        endpoint = f"{self.base_url}/api/v1/users.create"
        payload = user.to_dict()

        # 如果未提供密码,自动生成
        if not user.password:
            payload['pass'] = self.generate_password()

        response = requests.post(endpoint, headers=self.headers, json=payload)
        result = response.json()

        if result.get('success'):
            user.user_id = result['user']['_id']
            self.log_action(user.username, 'create', 'success', 
                           f"User ID: {user.user_id}")
            print(f"[创建用户] {user.username} 创建成功")
        else:
            self.log_action(user.username, 'create', 'failed', 
                           result.get('error', 'Unknown error'))
            print(f"[创建用户] {user.username} 创建失败: {result.get('error')}")

        return result

    def update_user(self, user_id: str, updates: Dict) -> Dict[str, Any]:
        """
        更新用户信息

        user_id: 用户的 Rocket.Chat ID
        updates: 要更新的字段字典
        """
        endpoint = f"{self.base_url}/api/v1/users.update"
        payload = {
            "userId": user_id,
            "data": updates
        }

        response = requests.post(endpoint, headers=self.headers, json=payload)
        result = response.json()

        if result.get('success'):
            self.log_action(updates.get('username', user_id), 'update', 'success')

        return result

    def delete_user(self, user_id: str) -> Dict[str, Any]:
        """
        删除(停用)用户

        Rocket.Chat 中删除用户是不可逆的,这里采用禁用方式
        """
        # 先获取用户信息
        user_info = self.get_user_info(user_id)

        endpoint = f"{self.base_url}/api/v1/users.delete"
        payload = {"userId": user_id}

        response = requests.post(endpoint, headers=self.headers, json=payload)
        result = response.json()

        if result.get('success'):
            username = user_info.get('username', 'unknown')
            self.log_action(username, 'delete', 'success')
            print(f"[删除用户] {username} 已删除")

        return result

    def deactivate_user(self, user_id: str) -> Dict[str, Any]:
        """
        禁用用户(替代删除的推荐方式)
        """
        return self.update_user(user_id, {"active": False})

    def activate_user(self, user_id: str) -> Dict[str, Any]:
        """激活用户"""
        return self.update_user(user_id, {"active": True})

    def get_user_info(self, identifier: str, 
                      by: str = 'userId') -> Optional[Dict]:
        """
        获取用户信息
        by: 'userId', 'username', 或 'email'
        """
        endpoint = f"{self.base_url}/api/v1/users.info"
        params = {by: identifier}

        response = requests.get(endpoint, headers=self.headers, params=params)
        result = response.json()

        if result.get('success'):
            return result['user']
        return None

    def list_users(self, filter_params: Dict = None,
                   page: int = 1, per_page: int = 50) -> List[Dict]:
        """
        获取用户列表

        filter_params: 可选筛选条件
            - department: 部门
            - role: 角色
            - active: 是否激活
        """
        endpoint = f"{self.base_url}/api/v1/users.list"

        # Rocket.Chat 5.x 使用分页参数
        params = {
            "offset": (page - 1) * per_page,
            "count": per_page
        }

        response = requests.get(endpoint, headers=self.headers, params=params)
        result = response.json()

        users = result.get('users', [])

        # 应用筛选
        if filter_params:
            filtered_users = []
            for user in users:
                match = True
                if 'department' in filter_params:
                    # Rocket.Chat 原生不支持部门,需要通过自定义字段实现
                    custom_fields = user.get('customFields', {})
                    if custom_fields.get('department') != filter_params['department']:
                        match = False
                if 'role' in filter_params:
                    if filter_params['role'] not in user.get('roles', []):
                        match = False
                if 'active' in filter_params:
                    if user.get('active', True) != filter_params['active']:
                        match = False
                if match:
                    filtered_users.append(user)
            return filtered_users

        return users

    def assign_role(self, user_id: str, role: str) -> Dict[str, Any]:
        """为用户分配角色"""
        endpoint = f"{self.base_url}/api/v1/roles.addUserToRole"
        payload = {
            "roleName": role,
            "username": self.get_user_info(user_id, 'userId').get('username')
        }

        response = requests.post(endpoint, headers=self.headers, json=payload)
        result = response.json()

        if result.get('success'):
            print(f"[分配角色] 用户 {user_id} 获得角色 {role}")

        return result

    # === 批量操作方法 ===

    def import_users_from_csv(self, csv_file: str) -> Dict[str, Any]:
        """
        从 CSV 文件批量导入用户

        CSV 格式:
        username,email,name,password,department,roles
        zhang_san,zhang@example.com,张三,password123,技术部,user
        li_si,li@example.com,李四,password456,市场部,user
        """
        created = []
        failed = []

        with open(csv_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)

            for row in reader:
                try:
                    user = User(
                        username=row['username'],
                        email=row['email'],
                        name=row.get('name', row['username']),
                        password=row.get('password', ''),
                        department=row.get('department', ''),
                        roles=row.get('roles', 'user').split(',')
                    )

                    result = self.create_user(user)
                    if result.get('success'):
                        created.append(user.username)

                        # 设置部门(如果使用自定义字段)
                        if user.department:
                            user_id = result['user']['_id']
                            self.update_user(user_id, {
                                "customFields": {"department": user.department}
                            })
                    else:
                        failed.append({
                            'username': user.username,
                            'error': result.get('error', 'Unknown error')
                        })

                except Exception as e:
                    failed.append({
                        'username': row.get('username', 'unknown'),
                        'error': str(e)
                    })

        return {
            'created': len(created),
            'failed': len(failed),
            'created_list': created,
            'failed_list': failed
        }

    def bulk_deactivate_inactive_users(self, days: int = 90) -> Dict[str, Any]:
        """
        批量禁用长期不活跃的用户

        days: 超过多少天未登录则禁用
        """
        threshold = datetime.now() - timedelta(days=days)
        deactivated = []
        failed = []

        # 获取所有用户
        users = self.list_users(filter_params={'active': True})

        for user in users:
            last_login = user.get('lastLogin')
            if not last_login:
                # 如果没有登录记录,检查创建时间
                created_at = datetime.fromisoformat(
                    user.get('createdAt', '').replace('Z', '+00:00')
                )
                if created_at < threshold:
                    user_id = user['_id']
                    result = self.deactivate_user(user_id)
                    if result.get('success'):
                        deactivated.append(user['username'])
                    else:
                        failed.append(user['username'])
            else:
                # 检查最后登录时间
                try:
                    last_login_dt = datetime.fromisoformat(
                        last_login.replace('Z', '+00:00')
                    )
                    if last_login_dt < threshold:
                        user_id = user['_id']
                        result = self.deactivate_user(user_id)
                        if result.get('success'):
                            deactivated.append(user['username'])
                        else:
                            failed.append(user['username'])
                except (ValueError, TypeError):
                    continue

        return {
            'deactivated': len(deactivated),
            'failed': len(failed),
            'deactivated_list': deactivated,
            'failed_list': failed,
            'threshold_days': days
        }

    # === 工具方法 ===

    @staticmethod
    def generate_password(length: int = 16) -> str:
        """生成随机密码"""
        import secrets
        import string

        alphabet = string.ascii_letters + string.digits + "!@#$%"
        return ''.join(secrets.choice(alphabet) for _ in range(length))

    def generate_user_report(self, output_file: str = None) -> str:
        """
        生成用户管理报告

        返回报告文本,可选择保存到文件
        """
        users = self.list_users(per_page=1000)

        # 统计各项数据
        total_users = len(users)
        active_users = sum(1 for u in users if u.get('active', True))
        inactive_users = total_users - active_users

        # 按部门统计(如果有自定义字段)
        departments = {}
        for user in users:
            dept = user.get('customFields', {}).get('department', '未分配')
            departments[dept] = departments.get(dept, 0) + 1

        # 按角色统计
        roles = {}
        for user in users:
            user_roles = user.get('roles', [])
            for role in user_roles:
                roles[role] = roles.get(role, 0) + 1

        # 生成报告
        report = f"""
{'='*60}
Rocket.Chat 用户管理报告
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{'='*60}

📊 总体统计
{'-'*40}
总用户数: {total_users}
活跃用户: {active_users}
已禁用用户: {inactive_users}
活跃率: {active_users/total_users*100:.1f}%

📁 部门分布
{'-'*40}
"""

        for dept, count in sorted(departments.items(), key=lambda x: x[1], reverse=True):
            report += f"{dept}: {count}\n"

        report += f"""
🔐 角色分布
{'-'*40}
"""

        for role, count in sorted(roles.items(), key=lambda x: x[1], reverse=True):
            report += f"{role}: {count}\n"

        report += f"""
{'='*60}
"""

        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report)
            print(f"报告已保存到: {output_file}")

        return report


# 使用示例与主程序
if __name__ == "__main__":
    # 初始化管理器
    manager = RocketChatUserManager(
        base_url="http://localhost:3000",
        admin_token="your-admin-auth-token",
        admin_user_id="your-admin-user-id"
    )

    print("\n" + "="*50)
    print("Rocket.Chat 用户管理系统演示")
    print("="*50 + "\n")

    # === 演示:创建单个用户 ===
    new_user = User(
        username="zhang_san",
        email="zhang@example.com",
        name="张三",
        department="技术部",
        roles=["user"]
    )

    # 创建用户
    create_result = manager.create_user(new_user)
    print("创建用户结果:", json.dumps(create_result, ensure_ascii=False, indent=2))

    # 分配角色
    if create_result.get('success'):
        user_id = create_result['user']['_id']
        manager.assign_role(user_id, "moderator")

    # === 演示:批量导入用户 ===
    # manager.import_users_from_csv("users.csv")

    # === 演示:禁用不活跃用户 ===
    # result = manager.bulk_deactivate_inactive_users(days=90)
    # print("禁用结果:", json.dumps(result, ensure_ascii=False, indent=2))

    # === 演示:生成报告 ===
    report = manager.generate_user_report("user_report.txt")
    print(report)

    # === 演示:列出所有用户 ===
    all_users = manager.list_users()
    print(f"\n当前系统共有 {len(all_users)} 个用户")

常见使用场景与解决方案

场景一:企业内部即时通讯平台

对于中小型企业来说,搭建一套完整的内部通讯系统往往是刚需。使用 Rocket.Chat 可以快速实现以下功能:全员公告频道用于发布公司重要通知;部门专属频道让每个团队拥有独立的讨论空间;项目频道整合特定项目的所有沟通和协作;一对一面谈频道支持管理层与员工或跨部门同事的私密沟通。

在实际部署中,建议采用以下组织架构:顶层设置「全体公告」频道(全体成员可见),然后按部门创建一级频道(如「技术部」「市场部」「人力资源部」),再按项目或兴趣创建二级频道。敏感信息如薪酬讨论可以使用私密群组并设置严格的访问权限。

场景二:客户支持与工单系统集成

Rocket.Chat 可以作为客户支持的前端入口,与后端工单系统配合实现完整的支持流程。典型架构是:客户通过网页聊天窗口或移动应用发起咨询,消息通过 Incoming Webhook 流入 Rocket.Chat 支持频道,支持人员直接在 Rocket.Chat 中回复,后端工单系统通过 API 创建和更新工单,Webhook 通知相关方处理进度。

"""
客户支持消息处理系统
演示如何将 Rocket.Chat 与工单系统集成
"""
import requests
import json
from datetime import datetime
from typing import Dict, Any


class SupportTicketIntegrator:
    """支持工单集成器"""

    def __init__(self, rocketchat_webhook_url: str, 
                 ticket_system_api: str, api_key: str):
        self.rocketchat_webhook = rocketchat_webhook_url
        self.ticket_api = ticket_system_api
        self.api_key = api_key

    def handle_incoming_message(self, message: Dict) -> Dict[str, Any]:
        """
        处理新入站消息,创建或更新工单

        业务逻辑:
        1. 检查是否有现有会话
        2. 如果是新客户,创建新工单
        3. 如果是老客户,追加到现有工单
        4. 发送确认消息给客户
        """
        customer_email = message.get('email', '')
        customer_name = message.get('name', 'Anonymous')
        message_text = message.get('text', '')
        channel_id = message.get('channel_id', '')

        # === 检查是否需要创建新工单 ===
        # 这里简化处理,实际应查询工单系统数据库

        # 模拟创建工单
        ticket_id = self.create_ticket(
            customer_email=customer_email,
            customer_name=customer_name,
            subject=message_text[:100],
            description=message_text
        )

        # 构造通知消息
        notification = {
            "text": f"📩 新支持请求",
            "attachments": [
                {
                    "color": "#0077cc",
                    "fields": [
                        {"title": "客户", "value": customer_name, "short": True},
                        {"title": "邮箱", "value": customer_email, "short": True},
                        {"title": "工单号", "value": f"#{ticket_id}", "short": True}
                    ],
                    "text": f"> {message_text}"
                }
            ],
            "username": "Support Bot"
        }

        # 发送通知到支持频道
        requests.post(self.rocketchat_webhook, json=notification)

        # 返回工单信息
        return {
            "ticket_id": ticket_id,
            "status": "created",
            "message": f"您的请求已收到,工单号: #{ticket_id}"
        }

    def create_ticket(self, customer_email: str, customer_name: str,
                      subject: str, description: str) -> str:
        """
        在工单系统中创建新工单
        这是一个模拟实现
        """
        # 实际应调用工单系统 API
        ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d')}-{hash(customer_email) % 10000:04d}"
        print(f"[工单] 创建工单 {ticket_id}")
        return ticket_id

    def send_reply_to_customer(self, customer_email: str, reply: str):
        """
        向客户发送回复
        """
        notification = {
            "text": f"💬 支持团队回复:\n\n{reply}",
            "username": "Support Team"
        }
        requests.post(self.rocketchat_webhook, json=notification)

场景三:DevOps 监控告警中心

在 DevOps 实践中,统一的告警通知渠道至关重要。Rocket.Chat 可以作为各类监控工具(Prometheus、Grafana、PagerDuty、CloudWatch 等)的告警汇聚点,实现以下效果:告警按严重程度分级显示(红色严重/橙色警告/蓝色信息);告警消息包含完整上下文信息;支持一键跳转查看详情;值班人员可以通过消息直接确认或静默告警。

"""
统一告警通知系统
支持多数据源告警的统一处理和转发
"""
import requests
import json
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from dataclasses import dataclass


class AlertSeverity(Enum):
    """告警严重程度"""
    CRITICAL = "critical"
    ERROR = "error"
    WARNING = "warning"
    INFO = "info"

    def get_emoji(self) -> str:
        return {
            "critical": ":rotating_light:",
            "error": ":x:",
            "warning": ":warning:",
            "info": ":information_source:"
        }.get(self.value, ":bell:")

    def get_color(self) -> str:
        return {
            "critical": "#ff0000",
            "error": "#ff4500",
            "warning": "#ffa500",
            "info": "#0077cc"
        }.get(self.value, "#808080")


@dataclass
class Alert:
    """告警数据模型"""
    title: str
    message: str
    severity: AlertSeverity
    source: str
    host: str = ""
    metric: str = ""
    value: str = ""
    threshold: str = ""
    url: str = ""
    labels: Dict[str, str] = None

    def __post_init__(self):
        if self.labels is None:
            self.labels = {}


class UnifiedAlertNotifier:
    """统一告警通知系统"""

    def __init__(self, rocketchat_webhook_url: str):
        self.webhook_url = rocketchat_webhook_url

    def format_alert(self, alert: Alert) -> Dict[str, Any]:
        """将告警格式化为 Rocket.Chat 消息"""

        # 构建消息文本
        text_parts = [f"{alert.severity.get_emoji()} *{alert.title}*"]
        if alert.message:
            text_parts.append(f"\n{alert.message}")

        # 构建附件字段
        fields = [
            {
                "title": "严重程度",
                "value": f"{alert.severity.get_emoji()} {alert.severity.value.upper()}",
                "short": True
            },
            {
                "title": "来源",
                "value": alert.source,
                "short": True
            }
        ]

        if alert.host:
            fields.append({
                "title": "主机",
                "value": f"`{alert.host}`",
                "short": True
            })

        if alert.metric:
            fields.append({
                "title": "指标",
                "value": alert.metric,
                "short": True
            })

        if alert.value and alert.threshold:
            fields.append({
                "title": "当前值 / 阈值",
                "value": f"`{alert.value}` / `{alert.threshold}`",
                "short": True
            })

        # 添加标签(如果有)
        if alert.labels:
            label_text = " ".join(
                f"`{k}={v}`" for k, v in alert.labels.items()
            )
            fields.append({
                "title": "标签",
                "value": label_text,
                "short": False
            })

        # 构建完整消息
        message = {
            "text": "\n".join(text_parts),
            "attachments": [
                {
                    "color": alert.severity.get_color(),
                    "fields": fields,
                    "footer": alert.source,
                    "ts": int(datetime.now().timestamp())
                }
            ]
        }

        # 添加详情链接
        if alert.url:
            message["attachments"][0]["title_link"] = alert.url

        return message

    def send_alert(self, alert: Alert) -> bool:
        """发送告警通知"""
        message = self.format_alert(alert)

        try:
            response = requests.post(
                self.webhook_url,
                json=message,
                timeout=10
            )
            success = response.status_code == 200

            if success:
                print(f"[告警] 已发送: {alert.severity.value.upper()} - {alert.title}")
            else:
                print(f"[告警] 发送失败: {response.status_code}")

            return success

        except requests.RequestException as e:
            print(f"[告警] 发送异常: {e}")
            return False

    def handle_prometheus_alert(self, webhook_payload: Dict) -> List[Alert]:
        """
        处理 Prometheus AlertManager 的 webhook payload

        Prometheus AlertManager 发送的格式:
        {
            "alerts": [
                {
                    "status": "firing|resolved",
                    "labels": {...},
                    "annotations": {...},
                    "startsAt": "...",
                    "endsAt": "...",
                    "generatorURL": "..."
                }
            ]
        }
        """
        alerts = []

        for alert_data in webhook_payload.get('alerts', []):
            status = alert_data.get('status', 'firing')
            labels = alert_data.get('labels', {})
            annotations = alert_data.get('annotations', {})

            # 判断严重程度
            alertname = labels.get('alertname', 'Unknown')
            severity_label = labels.get('severity', 'info')

            try:
                severity = AlertSeverity(severity_label)
            except ValueError:
                severity = AlertSeverity.INFO

            # 判断告警状态
            status_emoji = ":rotating_light:" if status == "firing" else ":white_check_mark:"
            status_text = "触发" if status == "firing" else "恢复"

            alert = Alert(
                title=f"{status_emoji} [{status_text}] {alertname}",
                message=annotations.get('description', annotations.get('summary', '')),
                severity=severity,
                source="Prometheus AlertManager",
                host=labels.get('instance', ''),
                metric=labels.get('__name__', ''),
                value=annotations.get('value', ''),
                url=alert_data.get('generatorURL', ''),
                labels=labels
            )

            alerts.append(alert)
            self.send_alert(alert)

        return alerts


# 使用示例
if __name__ == "__main__":
    notifier = UnifiedAlertNotifier(
        rocketchat_webhook_url="https://your-rocketchat.com/hooks/xxxxx/xxxxx"
    )

    # 发送一个严重告警
    critical_alert = Alert(
        title="服务器 CPU 使用率过高",
        message="生产服务器 web-server-01 的 CPU 使用率已达到 95%,请立即处理!",
        severity=AlertSeverity.CRITICAL,
        source="Prometheus",
        host="web-server-01",
        metric="cpu_usage_percent",
        value="95.2%",
        threshold="80%",
        url="http://prometheus.example.com/graph"
    )

    notifier.send_alert(critical_alert)

    # 发送一个恢复通知
    resolved_alert = Alert(
        title="数据库连接恢复正常",
        message="数据库连接池使用率已恢复到正常水平",
        severity=AlertSeverity.INFO,
        source="Prometheus",
        host="db-master-01",
        metric="db_connection_pool_usage"
    )

    notifier.send_alert(resolved_alert)

技巧与最佳实践

性能优化建议

在生产环境中运行 Rocket.Chat 时,性能优化是一个持续关注的话题。首先是数据库优化,MongoDB 的性能直接影响整体响应速度。建议为常用查询字段创建索引,特别是 messages.roomIdmessages.tsusers.username 等高频查询字段。同时,定期执行 db.repairDatabase() 清理碎片数据,并在非高峰期运行 MongoDB 的慢查询分析工具定位性能瓶颈。

其次是缓存策略的应用。Rocket.Chat 支持 Redis 作为缓存层,可以显著减少数据库查询压力。对于频繁访问的数据如用户列表、频道配置等,启用缓存能够将响应时间从数百毫秒降低到几毫秒。配置 Redis 时需要合理设置过期时间,平衡数据新鲜度和缓存效率。

最后是前端资源优化。启用 Nginx 的静态资源缓存和压缩功能,将 JavaScript 和 CSS 文件配置合适的缓存头。图片等媒体文件建议使用 CDN 加速,既能降低服务器负载,也能提升全球用户的访问速度。

安全加固指南

保护 Rocket.Chat 实例的安全需要从多个层面入手。在网络层面,确保所有对外服务都通过 HTTPS 访问,配置严格的 CORS 策略,仅允许受信任的域名访问 API。使用防火墙限制非必要端口的访问,将 MongoDB 等数据库服务限制为仅允许应用服务器访问。

在应用层面,定期更新 Rocket.Chat 到最新稳定版本以获取安全补丁。管理员密码应满足复杂度要求,建议启用双因素认证(2FA)为管理员账号提供额外保护。对于 API 访问,使用 Personal Access Token 代替共享的管理员账号,并限制 Token 的权限范围。

在数据层面,启用 MongoDB 的副本集模式实现数据冗余,定期执行数据库备份并将备份文件存储到安全的异地位置。对于传输敏感信息的场景,启用端到端加密功能。监控日志中的异常登录尝试,设置告警阈值以便及时发现潜在的安全威胁。

监控与运维

建立完善的监控体系是保障 Rocket.Chat 稳定运行的关键。推荐使用 Prometheus + Grafana 组合监控应用指标,包括活跃用户数、消息吞吐量、API 响应时间、错误率等核心指标。Rocket.Chat 提供了完整的 metrics 端点,可以直接对接 Prometheus 进行数据采集。

日志管理方面,建议将 Rocket.Chat 的日志输出到统一的日志收集系统(如 ELK Stack 或 Loki),便于问题排查和审计追踪。设置日志级别时,生产环境通常使用 info 或 warning 级别,避免 debug 级别产生的海量日志影响性能和存储。

高可用部署架构是生产环境的推荐方案。使用 Docker Swarm 或 Kubernetes 进行容器编排,配置多个 Rocket.Chat 实例实现负载均衡和故障转移。MongoDB 应部署为三节点副本集,确保任何单节点故障不会影响服务可用性。对于大规模部署,可以考虑配置独立的 Redis 实例用于缓存和会话存储,进一步提升系统性能。


结论与资源链接

经过这篇详尽的教程,相信你已经对 Rocket.Chat 有了全面深入的认识。作为一款功能完备、开源免费、部署灵活的企业级通讯平台,Rocket.Chat 为追求数据自主和技术自由的团队提供了一个极具吸引力的选择。无论是搭建内部沟通系统、构建客户支持平台还是整合 DevOps 工作流,Rocket.Chat 都能提供稳定可靠的支持。

官方资源链接

  • Rocket.Chat 官方文档:https://docs.rocket.chat
  • GitHub 仓库:https://github.com/RocketChat/Rocket.Chat
  • 官方演示实例:https://open.rocket.chat
  • Rocket.Chat 社区论坛:https://forums.rocket.chat
  • Docker 镜像仓库:https://hub.docker.com/u/rocketchat

相关开源项目推荐

  • Mattermost —— 另一款优秀的开源团队通讯平台,同样值得了解
  • Zulip —— 以话题流组织讨论的开源聊天工具
  • Mattermost Integrations —— Mattermost 的官方集成库,可以参考设计理念
  • Rocket.Chat Apps —— Rocket.Chat 的官方应用开发框架

下一步学习建议

建议从实际动手开始,在本地环境完成一次完整的 Docker Compose 部署,熟悉基本配置和管理界面。然后尝试使用 REST API 创建用户、发送消息,体验 API 的强大能力。接下来可以根据自身需求,选择一个实际的集成场景进行开发实践。无论是构建自动化机器人、实现 CI/CD 通知还是开发客户支持系统,都是很好的起点。

Rocket.Chat 拥有一个活跃的开源社区,如果你遇到问题或有任何想法,都可以在 GitHub Issues 或社区论坛中与其他用户交流。期待看到你基于 Rocket.Chat 构建的精彩应用!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注