**别再手动配置代理了!这个开源 CLI 工具让 API 路由效率提升 10 倍**


前言

在现代软件开发中,API 路由和代理管理已经成为每个开发者必须面对的挑战。无论是微服务架构中的请求转发,还是多环境切换时的代理配置,抑或是需要绕过网络限制进行开发调试,代理管理都是一个让人头疼的问题。

今天要介绍的这个开源项目 CLIProxyAPI,正是为了解决这些痛点而生。它将复杂的代理配置和 API 路由功能封装成简洁的命令行工具,让开发者能够通过几条简单的命令完成原本繁琐的配置工作。

本文将带你从零开始,全面掌握这个工具的使用方法,并通过大量实战案例帮助你快速将其应用到实际项目中。


为什么值得关注 / Why This Project Matters

痛点一:代理配置的复杂性

传统的代理配置方式通常需要手动修改系统环境变量、编写复杂的配置文件,或者使用第三方 GUI 工具。这些方法存在以下问题:

问题类型 具体表现 传统方案 CLIProxyAPI 方案
配置复杂度 需要记忆大量参数 手动编辑 JSON/YAML 简单命令参数
环境切换 多环境频繁切换 逐个修改配置文件 一键切换环境
批量操作 多个代理同时管理 逐个手动配置 批量命令支持
调试困难 出现问题难以排查 查看日志文件 内置诊断命令
可移植性 配置难以分享 导出导入繁琐 版本控制友好

痛点二:API 路由的效率问题

在微服务架构和前后端分离的开发模式中,API 路由管理尤为重要。开发者经常需要:

  • 将本地请求转发到远程服务器
  • 在多个后端服务之间进行负载均衡
  • 对请求进行拦截、修改或记录
  • 实现请求的自动重试和故障转移

CLIProxyAPI 通过声明式的配置方式,让这些复杂需求变得触手可及。

核心优势

优势概览:
├── 🚀 开箱即用 - 无需复杂安装配置
├── 📦 轻量高效 - 基于 Python,性能优异
├── 🔧 高度可定制 - 支持插件扩展
├── 📚 文档完善 - 配备详细使用指南
├── 🔄 易于集成 - 可与其他工具无缝配合
└── 💪 社区活跃 - 持续迭代更新

环境搭建 / Getting Started

系统要求

在开始之前,让我们确认你的系统环境满足以下要求:

最低要求:
├── 操作系统:Linux / macOS / Windows (WSL)
├── Python 版本:3.8 或更高
├── 内存:512MB 可用
└── 网络:能够访问 PyPI

推荐配置:
├── Python 版本:3.10 或更高
├── 内存:2GB 可用
└── 磁盘空间:100MB

安装步骤

方法一:通过 pip 安装(推荐)

最简单快捷的安装方式是使用 pip 包管理器:

# 确保 pip 是最新版本
python -m pip install --upgrade pip

# 安装 CLIProxyAPI
pip install cliproxyapi

# 验证安装
cliproxyapi --version

安装成功后,你应该会看到类似以下的输出:

CLIProxyAPI version 1.2.3
Python version: 3.10.12
Platform: darwin-arm64

方法二:通过源码安装

如果你想使用最新开发版本或者参与项目贡献,可以选择源码安装:

# 克隆仓库
git clone https://github.com/router-for-me/CLIProxyAPI.git

# 进入项目目录
cd CLIProxyAPI

# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate  # Linux/macOS
# venv\Scripts\activate   # Windows

# 安装依赖
pip install -e .

# 运行测试确保安装正确
pytest tests/ -v

方法三:使用 Docker 部署

对于需要隔离环境或者快速尝鲜的用户,Docker 是最好的选择:

# 拉取镜像
docker pull routerforme/cliproxyapi:latest

# 运行容器
docker run -it --rm \
    -v ~/.cliproxyapi:/root/.cliproxyapi \
    routerforme/cliproxyapi:latest \
    --help

配置初始化

首次安装后,需要进行初始化配置:

# 初始化配置目录
cliproxyapi init

# 这将在 ~/.cliproxyapi/ 目录下创建以下文件结构:
# ├── config.yaml          # 主配置文件
# ├── routes/              # 路由配置目录
# ├── plugins/             # 插件目录
# ├── logs/                # 日志目录
# └── cache/               # 缓存目录

初始化命令会引导你完成基本配置:

$ cliproxyapi init

欢迎使用 CLIProxyAPI 配置向导!
============================

请选择代理模式:
[1] SOCKS5 代理
[2] HTTP/HTTPS 代理
[3] 混合模式(同时支持)
[4] 稍后手动配置

请输入选项 (1-4): 3

配置已保存到 ~/.cliproxyapi/config.yaml

是否立即启动服务? [Y/n]: Y

正在启动 CLIProxyAPI 服务...
✓ 服务启动成功,监听端口:8080
✓ Web UI 可用:http://localhost:8080/ui

核心功能详解 / Core Features

CLIProxyAPI 提供了丰富而强大的功能,让我们逐一详细介绍。

功能一:智能代理管理

代理管理是 CLIProxyAPI 的核心功能之一,它支持多种代理协议和灵活的配置方式。

支持的代理协议

支持的代理类型
├── HTTP 代理 - 最常见的代理类型
├── HTTPS 代理 - 支持 SSL/TLS 加密
├── SOCKS4 代理 - 轻量级代理协议
├── SOCKS5 代理 - 支持认证和 UDP
├── Shadowsocks - 加密代理协议
└── VMess - V2Ray 协议支持

代理配置示例

# 在 config.yaml 中配置代理
proxy:
  # HTTP 代理配置
  http:
    host: 127.0.0.1
    port: 7890
    auth:
      username: myuser
      password: mypassword
    timeout: 30
    retry: 3

  # HTTPS 代理配置
  https:
    host: 127.0.0.1
    port: 7891
    ssl_verify: true  # 验证 SSL 证书
    cert_path: /path/to/cert.pem

  # SOCKS5 代理配置
  socks5:
    host: 127.0.0.1
    port: 1080
    auth:
      username: socks_user
      password: socks_pass
    udp: true  # 启用 UDP 支持

通过命令行快速添加代理:

# 添加 HTTP 代理
cliproxyapi proxy add --name my-proxy --type http \
    --host 192.168.1.100 --port 8080

# 添加带认证的代理
cliproxyapi proxy add --name auth-proxy --type http \
    --host 192.168.1.100 --port 8080 \
    --username admin --password secret123

# 添加 SOCKS5 代理
cliproxyapi proxy add --name socks-proxy --type socks5 \
    --host 10.0.0.50 --port 1080 --udp

# 查看所有代理
cliproxyapi proxy list

功能二:API 路由系统

API 路由是 CLIProxyAPI 最强大的功能,它允许你定义灵活的路由规则来控制请求的转发和处理。

路由匹配规则

路由匹配优先级从高到低):
├── 精确匹配 /api/users/123
├── 路径前缀匹配 /api/users/*
├── 参数匹配 /api/users?name=john
├── 正则表达式匹配 ^/api/v[0-9]+/.*
└── 默认路由 /.*

路由配置示例

# 在 routes/api.yaml 中定义路由
routes:
  # 基础路由配置
  - name: 本地开发服务器
    match:
      path: /api/*
    target: http://localhost:3000/api
    methods: [GET, POST, PUT, DELETE]
    timeout: 10

  # 带权重负载均衡
  - name: 生产环境路由
    match:
      path: /prod/*
    targets:
      - url: http://prod-1.example.com
        weight: 3
      - url: http://prod-2.example.com
        weight: 2
      - url: http://prod-3.example.com
        weight: 1
    strategy: weighted_round_robin

  # 正则表达式路由
  - name: API 版本路由
    match:
      path: ^/api/v[0-9]+/(.*)
    target: http://api-gateway.example.com/{1}
    rewrite: true

  # 条件路由
  - name: 测试环境路由
    match:
      path: /api/*
      headers:
        X-Environment: test
    target: http://test-server.local:8080
    middleware:
      - delay: 500  # 模拟网络延迟
      - log_request

通过命令行创建路由:

# 创建简单路由
cliproxyapi route add --name local-dev \
    --path "/api/*" \
    --target http://localhost:3000/api

# 创建带条件判断的路由
cliproxyapi route add --name prod-api \
    --path "/api/*" \
    --target https://api.production.com \
    --header "X-API-Key: your-secret-key" \
    --timeout 30

# 创建负载均衡路由
cliproxyapi route add --name load-balance \
    --path "/services/*" \
    --targets "server1:8000,server2:8000,server3:8000" \
    --strategy round_robin

# 列出所有路由
cliproxyapi route list

# 查看路由详情
cliproxyapi route show local-dev

# 删除路由
cliproxyapi route delete local-dev

功能三:中间件系统

中间件是处理请求和响应的拦截器,CLIProxyAPI 提供了丰富的内置中间件。

可用中间件

内置中间件分类
├── 请求处理
   ├── header_modifier    # 修改请求头
   ├── query_rewrite      # 重写查询参数
   ├── body_transform     # 请求体转换
   └── auth_validator     # 认证验证

├── 响应处理
   ├── response_cache     # 响应缓存
   ├── gzip_compress      # Gzip 压缩
   ├── cors_handler       # CORS 处理
   └── error_handler      # 错误格式化

├── 流量控制
   ├── rate_limiter       # 限流
   ├── circuit_breaker    # 熔断器
   └── retry_policy       # 重试策略

└── 监控调试
    ├── request_logger     # 请求日志
    ├── metrics_collector  # 指标收集
    └── request_tracer     # 请求追踪

中间件配置示例

# 全局中间件配置
middleware:
  # 请求日志中间件
  - name: request_logger
    enabled: true
    config:
      log_level: INFO
      log_body: false  # 不记录请求体(敏感信息保护)
      log_headers: true

  # 限流中间件
  - name: rate_limiter
    enabled: true
    config:
      requests_per_second: 100
      burst_size: 200
      strategy: token_bucket

  # 缓存中间件
  - name: response_cache
    enabled: true
    config:
      ttl: 300  # 缓存 5 分钟
      cache_key: "{method}:{path}:{query}"
      storage: memory  # 或 redis
      conditions:
        - status: 200
        - methods: [GET]
        - paths: ["/api/*"]

  # 熔断器中间件
  - name: circuit_breaker
    enabled: true
    config:
      failure_threshold: 5
      success_threshold: 2
      timeout: 60
      half_open_requests: 3

功能四:插件系统

CLIProxyAPI 支持通过插件扩展功能,让你能够根据自己的需求定制化功能。

插件开发示例

# my_custom_plugin.py
"""
这是一个自定义插件示例
实现了请求签名和响应验证功能
"""

from cliproxyapi.plugin import BasePlugin, PluginMetadata

class SignaturePlugin(BasePlugin):
    """请求签名插件 - 为请求添加 HMAC 签名"""

    metadata = PluginMetadata(
        name="request_signer",
        version="1.0.0",
        description="为 API 请求添加签名验证",
        author="Your Name"
    )

    def __init__(self, secret_key: str, algorithm: str = "sha256"):
        """
        初始化插件

        Args:
            secret_key: 签名密钥
            algorithm: 签名算法,支持 sha256, sha512, md5
        """
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.excluded_paths = ["/health", "/metrics"]

    def on_request(self, context):
        """
        请求拦截器 - 在请求发送前添加签名
        """
        # 获取请求信息
        path = context.request.path
        method = context.request.method

        # 跳过不需要签名的路径
        if any(path.startswith(p) for p in self.excluded_paths):
            return context

        # 生成签名
        import hmac
        import hashlib
        from datetime import datetime

        timestamp = datetime.utcnow().isoformat()
        message = f"{method}:{path}:{timestamp}"

        if self.algorithm == "sha256":
            hash_func = hashlib.sha256
        elif self.algorithm == "sha512":
            hash_func = hashlib.sha512
        else:
            hash_func = hashlib.md5

        signature = hmac.new(
            self.secret_key.encode(),
            message.encode(),
            hash_func
        ).hexdigest()

        # 添加签名头
        context.request.headers["X-Signature"] = signature
        context.request.headers["X-Timestamp"] = timestamp

        return context

    def on_response(self, context):
        """
        响应拦截器 - 验证响应签名
        """
        response_signature = context.response.headers.get("X-Response-Signature")

        if response_signature:
            # 验证响应签名逻辑
            expected = self._calculate_response_signature(context.response)
            if not hmac.compare_digest(response_signature, expected):
                raise ValueError("响应签名验证失败!")

        return context


# 注册插件
def register():
    """插件注册函数 - CLIProxyAPI 会调用此函数"""
    return SignaturePlugin

安装和使用插件

# 安装插件
cliproxyapi plugin install ./my_custom_plugin.py

# 或者从插件市场安装
cliproxyapi plugin install --market request_signer

# 列出已安装的插件
cliproxyapi plugin list

# 启用/禁用插件
cliproxyapi plugin enable request_signer
cliproxyapi plugin disable request_signer

# 更新插件
cliproxyapi plugin update request_signer

# 卸载插件
cliproxyapi plugin uninstall request_signer

实战教程 / Step-by-Step Tutorial

现在让我们通过一系列实战案例,从简单到复杂,逐步掌握 CLIProxyAPI 的使用。

实战一:搭建本地开发代理

场景描述

你正在开发一个前后端分离的项目,前端运行在 localhost:5173(Vite 开发服务器),后端 API 运行在 localhost:8080。为了方便开发和调试,你需要将 /api 开头的请求代理到后端服务器。

实现步骤

第一步:创建项目目录结构

# 创建项目目录
mkdir -p ~/my-proxy-project/routes

# 进入目录
cd ~/my-proxy-project

第二步:创建路由配置文件

# routes/dev.yaml
version: "1.0"
name: "开发环境代理配置"
description: "本地开发使用的 API 代理配置"

# 监听配置
server:
  host: "0.0.0.0"
  port: 3000
  ssl: false

# 路由规则
routes:
  # 前端静态文件服务
  - name: 前端开发服务器
    match:
      path: "/*"
    target: http://localhost:5173
    methods: ["GET", "HEAD"]
    rewrite: false
    middleware:
      - name: cors_handler
        config:
          allow_origins: ["*"]
          allow_methods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
          allow_headers: ["*"]
          expose_headers: ["Content-Length", "X-Request-Id"]
          max_age: 3600

  # API 请求代理
  - name: API 代理
    match:
      path: "/api/*"
    target: http://localhost:8080
    methods: ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
    timeout: 30
    retry:
      max_attempts: 3
      backoff: exponential
      initial_delay: 100
    middleware:
      - name: request_logger
        config:
          log_level: DEBUG
          include_body: false
      - name: error_handler
        config:
          error_template: "json"
          include_stack_trace: true

  # WebSocket 支持
  - name: WebSocket 代理
    match:
      path: "/ws/*"
    target: ws://localhost:8080/ws
    upgrade: websocket
    ping_interval: 30
    ping_timeout: 10

第三步:启动代理服务

# 使用配置文件启动
cliproxyapi start --config routes/dev.yaml

# 或者使用默认配置并动态添加路由
cliproxyapi start --port 3000
cliproxyapi route add --name "前端" --path "/*" --target http://localhost:5173
cliproxyapi route add --name "API" --path "/api/*" --target http://localhost:8080

第四步:验证代理是否工作

# 测试 API 代理
curl -X GET http://localhost:3000/api/users

# 测试带参数的请求
curl -X POST http://localhost:3000/api/users \
  -H "Content-Type: application/json" \
  -d '{"name": "张三", "email": "zhangsan@example.com"}'

# 查看请求日志
cliproxyapi logs --tail 20

预期输出:

{
  "timestamp": "2024-01-15T10:30:45.123Z",
  "method": "POST",
  "path": "/api/users",
  "status": 201,
  "duration_ms": 45,
  "request_id": "req_abc123",
  "response": {
    "id": 1,
    "name": "张三",
    "email": "zhangsan@example.com",
    "created_at": "2024-01-15T10:30:45Z"
  }
}

实战二:多环境一键切换

场景描述

你的项目有三个环境:开发环境(dev)、预发布环境(staging)和生产环境(production)。每次切换环境都需要修改大量配置,非常繁琐。现在你希望通过 CLIProxyAPI 实现一键环境切换。

实现步骤

第一步:创建环境配置文件

# config/env/dev.yaml
environment:
  name: development
  debug: true

proxy:
  enabled: true
  type: http
  host: localhost
  port: 7890

api:
  base_url: http://localhost:8080
  timeout: 30

routes:
  - name: API
    path: /api/*
    target: http://localhost:8080

middleware:
  - name: request_logger
    config:
      log_level: DEBUG
      log_body: true
  - name: error_handler
    config:
      include_stack_trace: true
# config/env/staging.yaml
environment:
  name: staging
  debug: false

proxy:
  enabled: true
  type: http
  host: staging-proxy.example.com
  port: 8080

api:
  base_url: https://api.staging.example.com
  timeout: 20

routes:
  - name: API
    path: /api/*
    target: https://api.staging.example.com

middleware:
  - name: request_logger
    config:
      log_level: INFO
      log_body: false
  - name: rate_limiter
    config:
      requests_per_second: 100
# config/env/prod.yaml
environment:
  name: production
  debug: false

proxy:
  enabled: true
  type: https
  host: prod-proxy.example.com
  port: 443
  ssl_verify: true

api:
  base_url: https://api.production.example.com
  timeout: 15

routes:
  - name: API
    path: /api/*
    target: https://api.production.example.com

middleware:
  - name: request_logger
    config:
      log_level: WARNING
      log_body: false
  - name: response_cache
    config:
      ttl: 300
  - name: circuit_breaker
    config:
      failure_threshold: 5
      timeout: 60

第二步:创建环境管理脚本

#!/usr/bin/env python3
"""
环境切换管理脚本
提供简单易用的环境切换功能
"""

import sys
import os
import yaml
import subprocess
from pathlib import Path

class EnvironmentManager:
    """环境管理器"""

    def __init__(self, config_dir="~/.cliproxyapi"):
        self.config_dir = Path(config_dir).expanduser()
        self.env_dir = self.config_dir / "environments"
        self.current_env_file = self.config_dir / ".current_env"

    def list_environments(self):
        """列出所有可用环境"""
        print("可用环境列表:")
        print("=" * 40)

        envs = sorted(self.env_dir.glob("*.yaml"))
        current = self.get_current_env()

        for env_file in envs:
            env_name = env_file.stem
            marker = " ← 当前" if env_name == current else ""
            print(f"  • {env_name}{marker}")

        print("=" * 40)
        return [e.stem for e in envs]

    def get_current_env(self):
        """获取当前环境名称"""
        if self.current_env_file.exists():
            return self.current_env_file.read_text().strip()
        return None

    def switch_environment(self, env_name):
        """
        切换到指定环境

        Args:
            env_name: 环境名称(dev/staging/prod)
        """
        env_file = self.env_dir / f"{env_name}.yaml"

        if not env_file.exists():
            print(f"错误:环境 '{env_name}' 不存在!")
            print("可用的环境:")
            self.list_environments()
            return False

        # 备份当前配置
        main_config = self.config_dir / "config.yaml"
        if main_config.exists():
            backup_file = self.config_dir / "config.yaml.backup"
            subprocess.run(["cp", str(main_config), str(backup_file)])

        # 切换配置
        subprocess.run(["cp", str(env_file), str(main_config)])

        # 记录当前环境
        self.current_env_file.write_text(env_name)

        print(f"✓ 环境已切换到:{env_name}")

        # 询问是否重启服务
        response = input("是否立即重启 CLIProxyAPI 服务?[Y/n]: ").strip().lower()
        if response != "n":
            self.restart_service()

        return True

    def restart_service(self):
        """重启 CLIProxyAPI 服务"""
        print("正在重启服务...")
        subprocess.run(["cliproxyapi", "restart"])
        print("✓ 服务已重启")


def main():
    """主函数"""
    manager = EnvironmentManager()

    if len(sys.argv) < 2:
        # 无参数,显示帮助和列表
        manager.list_environments()
        print("\n使用方式:")
        print("  python env_manager.py list      # 列出所有环境")
        print("  python env_manager.py switch <env>  # 切换环境")
        print("  python env_manager.py current   # 显示当前环境")
        return

    command = sys.argv[1].lower()

    if command == "list":
        manager.list_environments()

    elif command == "switch":
        if len(sys.argv) < 3:
            print("错误:请指定环境名称")
            print("用法:python env_manager.py switch <env_name>")
            return
        manager.switch_environment(sys.argv[2])

    elif command == "current":
        current = manager.get_current_env()
        if current:
            print(f"当前环境:{current}")
        else:
            print("尚未设置环境")

    else:
        print(f"未知命令:{command}")
        print("可用命令:list, switch, current")


if __name__ == "__main__":
    main()

第三步:使用环境切换功能

# 给脚本添加执行权限
chmod +x env_manager.py

# 列出所有环境
./env_manager.py list

# 切换到开发环境
./env_manager.py switch dev

# 切换到生产环境
./env_manager.py switch prod

# 查看当前环境
./env_manager.py current

第四步:创建便捷的 Shell 别名

# 在 ~/.bashrc 或 ~/.zshrc 中添加以下别名
echo '
# CLIProxyAPI 环境管理别名
alias cpx-list="cliproxyapi env list"
alias cpx-dev="cliproxyapi env switch dev"
alias cpx-staging="cliproxyapi env switch staging"
alias cpx-prod="cliproxyapi env switch prod"
alias cpx-current="cliproxyapi env current"
alias cpx-logs="cliproxyapi logs --follow"
alias cpx-restart="cliproxyapi restart"
' >> ~/.zshrc

# 重新加载配置
source ~/.zshrc

# 现在可以非常方便地使用这些命令
cpx-dev        # 一键切换到开发环境
cpx-prod       # 一键切换到生产环境
cpx-logs       # 实时查看日志

实战三:实现请求限流和熔断

场景描述

你的 API 需要对外提供服务,为了保护后端服务不被突发流量冲垮,同时避免级联故障,你需要实现请求限流和熔断机制。

实现步骤

第一步:创建限流和熔断配置

# config/rate_limiting.yaml
version: "1.0"
name: "限流与熔断配置示例"

# 服务基础配置
server:
  host: "0.0.0.0"
  port: 8000

# 全局限流配置
global_rate_limit:
  enabled: true
  strategy: "token_bucket"
  rate: 1000        # 每秒允许 1000 个请求
  capacity: 2000    # 桶容量
  refill_rate: 500  # 每秒补充 500 个令牌

# 客户端级限流(基于 IP)
client_rate_limit:
  enabled: true
  strategy: "sliding_window"
  requests_per_window: 100    # 每个时间窗口内的请求数
  window_size_seconds: 60     # 时间窗口大小(秒)

# 路由级限流
routes:
  - name: "高优先级 API"
    match:
      path: "/api/vip/*"
    target: http://localhost:8080
    rate_limit:
      enabled: true
      requests_per_second: 500
      burst: 1000
    circuit_breaker:
      enabled: true
      failure_threshold: 10      # 失败 10 次后开启熔断
      success_threshold: 3      # 成功 3 次后关闭熔断
      timeout_seconds: 60       # 熔断持续时间
      half_open_max_requests: 5  # 半开状态最大请求数

  - name: "普通 API"
    match:
      path: "/api/*"
    target: http://localhost:8080
    rate_limit:
      enabled: true
      requests_per_second: 100
      burst: 200
    circuit_breaker:
      enabled: true
      failure_threshold: 5
      success_threshold: 2
      timeout_seconds: 30

# 限流响应配置
rate_limit_response:
  status_code: 429
  content_type: "application/json"
  body: |
    {
      "error": "rate_limit_exceeded",
      "message": "请求过于频繁,请稍后再试",
      "retry_after": {{ retry_after_seconds }}
    }

# 熔断响应配置
circuit_breaker_response:
  status_code: 503
  content_type: "application/json"
  body: |
    {
      "error": "service_unavailable",
      "message": "服务暂时不可用,请稍后再试",
      "circuit_state": "{{ circuit_state }}"
    }

第二步:创建自定义限流中间件

#!/usr/bin/env python3
"""
自定义滑动窗口限流中间件
相比内置的令牌桶算法,提供更精确的限流控制
"""

import time
import threading
from collections import deque
from cliproxyapi.middleware import BaseMiddleware

class SlidingWindowRateLimiter(BaseMiddleware):
    """
    滑动窗口限流器

    使用滑动窗口算法实现精确的请求限流
    优势:比固定窗口更精确,比令牌桶更简单
    """

    def __init__(self, requests_per_window: int, window_size_seconds: int):
        """
        初始化限流器

        Args:
            requests_per_window: 时间窗口内允许的最大请求数
            window_size_seconds: 时间窗口大小(秒)
        """
        self.max_requests = requests_per_window
        self.window_size = window_size_seconds
        self.requests = deque()
        self.lock = threading.Lock()
        self.stats = {
            "total_requests": 0,
            "allowed_requests": 0,
            "rejected_requests": 0
        }

    def _cleanup_old_requests(self, current_time):
        """
        清理超出时间窗口的请求记录

        Args:
            current_time: 当前时间戳
        """
        cutoff_time = current_time - self.window_size

        # 移除超出窗口的请求
        while self.requests and self.requests[0] < cutoff_time:
            self.requests.popleft()

    def _is_allowed(self) -> tuple[bool, int]:
        """
        检查请求是否允许

        Returns:
            (是否允许, 剩余请求数)
        """
        current_time = time.time()

        with self.lock:
            # 清理旧请求
            self._cleanup_old_requests(current_time)

            # 检查是否超限
            if len(self.requests) < self.max_requests:
                self.requests.append(current_time)
                return True, self.max_requests - len(self.requests)
            else:
                return False, 0

    def on_request(self, context):
        """
        请求拦截器

        检查请求是否应该被限流
        """
        self.stats["total_requests"] += 1

        allowed, remaining = self._is_allowed()

        # 添加限流相关的头信息
        context.request.headers["X-RateLimit-Limit"] = str(self.max_requests)
        context.request.headers["X-RateLimit-Remaining"] = str(remaining)

        if not allowed:
            self.stats["rejected_requests"] += 1

            # 计算需要等待的时间
            oldest_request = self.requests[0]
            wait_time = oldest_request + self.window_size - time.time()

            context.request.headers["X-RateLimit-Reset"] = str(int(oldest_request + self.window_size))
            context.request.headers["Retry-After"] = str(int(wait_time) + 1)

            # 设置拒绝响应
            context.response.status_code = 429
            context.response.body = {
                "error": "rate_limit_exceeded",
                "message": f"请求过于频繁,请在 {int(wait_time) + 1} 秒后重试",
                "retry_after": int(wait_time) + 1
            }
            context.stop_processing = True

        self.stats["allowed_requests"] += 1
        return context

    def get_stats(self) -> dict:
        """
        获取限流统计信息

        Returns:
            包含统计数据的字典
        """
        with self.lock:
            return {
                **self.stats,
                "current_window_requests": len(self.requests)
            }


# 便捷的工厂函数
def create_sliding_window_limiter(
    requests_per_minute: int = 60,
    window_seconds: int = 60
) -> SlidingWindowRateLimiter:
    """
    创建滑动窗口限流器的便捷函数

    Args:
        requests_per_minute: 每分钟允许的请求数
        window_seconds: 时间窗口(默认 60 秒)

    Returns:
        配置好的限流器实例
    """
    return SlidingWindowRateLimiter(
        requests_per_window=requests_per_minute,
        window_size_seconds=window_seconds
    )

第三步:启动限流服务并测试

# 启动带限流配置的代理
cliproxyapi start --config config/rate_limiting.yaml

# 在另一个终端查看限流统计
watch -n 1 "cliproxyapi stats --rate-limit"

# 测试限流效果
for i in {1..150}; do
  curl -s -o /dev/null -w "请求 $i: HTTP %{http_code}\n" \
    http://localhost:8000/api/test
  sleep 0.1
done

第四步:验证熔断机制

#!/usr/bin/env python3
"""
熔断器测试脚本
模拟后端故障,验证熔断机制是否正常工作
"""

import requests
import time
import json

class CircuitBreakerTest:
    """熔断器测试类"""

    def __init__(self, base_url, failure_threshold=5):
        self.base_url = base_url
        self.failure_threshold = failure_threshold
        self.success_count = 0
        self.failure_count = 0

    def make_request(self, simulate_failure=True):
        """
        发送测试请求

        Args:
            simulate_failure: 是否模拟后端故障
        """
        try:
            response = requests.get(
                f"{self.base_url}/api/test",
                timeout=5
            )

            if response.status_code >= 500 and simulate_failure:
                raise Exception(f"服务器错误: {response.status_code}")

            self.success_count += 1
            print(f"✓ 请求成功 (成功率: {self.success_count}/{self.success_count + self.failure_count})")
            return True

        except Exception as e:
            self.failure_count += 1
            print(f"✗ 请求失败: {e} (失败率: {self.failure_count}/{self.success_count + self.failure_count})")

            if self.failure_count >= self.failure_threshold:
                self._trigger_circuit_open()

            return False

    def _trigger_circuit_open(self):
        """触发熔断开启"""
        print("\n" + "=" * 50)
        print("⚠️  熔断器已开启!")
        print("连续失败次数已达到阈值,后续请求将被直接拒绝")
        print("=" * 50 + "\n")

    def test_circuit_breaker(self, duration_seconds=30):
        """
        执行熔断器测试

        Args:
            duration_seconds: 测试持续时间
        """
        print(f"开始熔断器测试,持续 {duration_seconds} 秒...")
        print(f"熔断阈值:连续 {self.failure_threshold} 次失败\n")

        start_time = time.time()

        while time.time() - start_time < duration_seconds:
            self.make_request(simulate_failure=(self.failure_count < 10))
            time.sleep(1)

            # 每 10 秒检查一次状态
            if (self.failure_count + self.success_count) % 10 == 0:
                self._print_status()

        self._print_final_stats()

    def _print_status(self):
        """打印当前状态"""
        status = requests.get(f"{self.base_url}/api/health").json()
        print(f"\n当前状态: {json.dumps(status, indent=2)}\n")

    def _print_final_stats(self):
        """打印最终统计"""
        total = self.success_count + self.failure_count
        success_rate = (self.success_count / total * 100) if total > 0 else 0

        print("\n" + "=" * 50)
        print("测试完成!统计信息:")
        print(f"  总请求数:{total}")
        print(f"  成功请求:{self.success_count}")
        print(f"  失败请求:{self.failure_count}")
        print(f"  成功率:{success_rate:.1f}%")
        print("=" * 50)


def main():
    """主函数"""
    test = CircuitBreakerTest(
        base_url="http://localhost:8000",
        failure_threshold=5
    )
    test.test_circuit_breaker(duration_seconds=30)


if __name__ == "__main__":
    main()
# 运行熔断器测试
python circuit_breaker_test.py

实战四:API 请求缓存与性能优化

场景描述

你的 API 中有些数据变化不频繁(如配置信息、分类列表等),但每次页面加载都会请求这些数据。通过实现智能缓存,可以大幅减少对后端的请求压力,同时提升用户体验。

实现步骤

第一步:创建缓存配置文件

# config/caching.yaml
version: "1.0"
name: "API 缓存配置"

server:
  host: "0.0.0.0"
  port: 8080

# 缓存存储配置
cache:
  backend: redis              # 使用 Redis 作为缓存存储
  redis:
    host: localhost
    port: 6379
    db: 0
    password: ""
    key_prefix: "cliproxy:"
  memory:
    max_size_mb: 100
    max_entries: 10000

# 缓存策略配置
cache_policies:
  # 配置数据 - 长缓存
  - name: 配置信息缓存
    match:
      path: "/api/config/*"
    cache:
      enabled: true
      ttl: 3600              # 1 小时
      stale_while_revalidate: true
      vary_headers:
        - Accept-Language
    conditions:
      methods: ["GET"]
      status_codes: [200]

  # 用户信息 - 中等缓存
  - name: 用户列表缓存
    match:
      path: "/api/users"
      query:
        role: ".*"
    cache:
      enabled: true
      ttl: 300               # 5 分钟
      cache_key_include_query: true
    conditions:
      methods: ["GET"]
      response_has_body: true

  # 搜索结果 - 短缓存
  - name: 搜索结果缓存
    match:
      path: "/api/search"
    cache:
      enabled: true
      ttl: 60                # 1 分钟
      vary_headers:
        - Accept-Language
        - X-User-Id
    conditions:
      methods: ["GET"]

  # 禁止缓存
  - name: 禁止缓存路径
    match:
      path: "/api/auth/*"
    cache:
      enabled: false
    conditions:
      always: true

# 缓存控制头处理
cache_control:
  respect_client_cache_headers: true
  default_max_age: 300

  # 处理规则
  handlers:
    - header: "Cache-Control: no-store"
      action: bypass_cache

    - header: "Cache-Control: no-cache"
      action: revalidate

    - header: "Cache-Control: max-age=0"
      action: revalidate

    - header: "Pragma: no-cache"
      action: bypass_cache

# 缓存统计
stats:
  enabled: true
  report_interval: 60
  metrics:
    - hit_count
    - miss_count
    - hit_rate
    - memory_usage
    - eviction_count

第二步:实现自定义缓存策略

#!/usr/bin/env python3
"""
自定义缓存策略示例
实现基于标签的缓存管理和自动失效
"""

import hashlib
import json
import time
from typing import Optional, Any
from cliproxyapi.cache import BaseCacheBackend

class TaggedCacheBackend(BaseCacheBackend):
    """
    带标签的缓存后端

    支持:
    - 按标签分组管理缓存
    - 按标签批量失效
    - 缓存版本控制
    """

    def __init__(self, redis_client=None, default_ttl=300):
        self.redis = redis_client
        self.default_ttl = default_ttl
        self.local_cache = {}  # 本地内存缓存备份

        # 标签索引
        # 结构: {tag_name: set of cache_keys}
        self.tag_index = {}

        # 版本索引
        # 结构: {version_key: version_number}
        self.version_index = {}

    def _make_key(self, key: str, namespace: str = "default") -> str:
        """
        生成缓存键

        Args:
            key: 原始键
            namespace: 命名空间

        Returns:
            处理后的缓存键
        """
        # 添加命名空间前缀
        prefixed = f"{namespace}:{key}"

        # 添加版本控制
        version = self.version_index.get(namespace, "v1")
        return f"{prefixed}:{version}"

    def get(self, key: str, namespace: str = "default") -> Optional[Any]:
        """
        获取缓存值

        Args:
            key: 缓存键
            namespace: 命名空间

        Returns:
            缓存值,如果不存在返回 None
        """
        full_key = self._make_key(key, namespace)

        # 尝试从 Redis 获取
        if self.redis:
            try:
                value = self.redis.get(full_key)
                if value:
                    return json.loads(value)
            except Exception:
                pass

        # 回退到本地缓存
        return self.local_cache.get(full_key)

    def set(
        self,
        key: str,
        value: Any,
        ttl: int = None,
        namespace: str = "default",
        tags: list = None
    ):
        """
        设置缓存值

        Args:
            key: 缓存键
            value: 缓存值
            ttl: 过期时间(秒)
            namespace: 命名空间
            tags: 关联的标签列表
        """
        ttl = ttl or self.default_ttl
        full_key = self._make_key(key, namespace)

        # 序列化值
        serialized = json.dumps(value)

        # 存储到 Redis
        if self.redis:
            try:
                self.redis.setex(full_key, ttl, serialized)
            except Exception:
                pass

        # 同时存储到本地缓存
        self.local_cache[full_key] = value

        # 更新标签索引
        if tags:
            for tag in tags:
                if tag not in self.tag_index:
                    self.tag_index[tag] = set()
                self.tag_index[tag].add(full_key)

    def invalidate_by_tag(self, tag: str):
        """
        按标签失效缓存

        Args:
            tag: 要失效的标签名
        """
        if tag not in self.tag_index:
            return

        keys_to_delete = self.tag_index[tag]

        # 从 Redis 删除
        if self.redis:
            try:
                self.redis.delete(*keys_to_delete)
            except Exception:
                pass

        # 从本地缓存删除
        for key in keys_to_delete:
            self.local_cache.pop(key, None)

        # 清空标签索引
        self.tag_index[tag] = set()

    def increment_version(self, namespace: str):
        """
        增加命名空间版本号

        这会导致该命名空间下的所有缓存键失效

        Args:
            namespace: 命名空间名称
        """
        current_version = self.version_index.get(namespace, "v1")

        # 解析版本号
        if current_version.startswith("v"):
            try:
                version_num = int(current_version[1:])
                new_version = f"v{version_num + 1}"
            except ValueError:
                new_version = "v1"
        else:
            new_version = "v1"

        self.version_index[namespace] = new_version

        # 触发版本更新事件
        self._on_version_change(namespace, new_version)

    def _on_version_change(self, namespace: str, version: str):
        """
        版本变化回调

        在这里可以添加清理旧版本缓存的逻辑
        """
        print(f"命名空间 '{namespace}' 已更新到版本 {version}")

    def get_stats(self) -> dict:
        """
        获取缓存统计信息
        """
        return {
            "local_cache_size": len(self.local_cache),
            "tags_count": len(self.tag_index),
            "namespaces": list(self.version_index.keys()),
            "version_index": self.version_index
        }


def create_tagged_cache(redis_url: str = None, default_ttl: int = 300):
    """
    创建带标签的缓存实例

    Args:
        redis_url: Redis 连接 URL
        default_ttl: 默认过期时间(秒)

    Returns:
        配置好的 TaggedCacheBackend 实例
    """
    redis_client = None

    if redis_url:
        import redis
        redis_client = redis.from_url(redis_url)

    return TaggedCacheBackend(
        redis_client=redis_client,
        default_ttl=default_ttl
    )

第三步:使用缓存管理命令

# 查看缓存统计
cliproxyapi cache stats

# 查看缓存列表
cliproxyapi cache list --limit 50

# 手动失效特定缓存
cliproxyapi cache invalidate --key "users:list:v1"
cliproxyapi cache invalidate --tag "user_profile"

# 清除所有缓存
cliproxyapi cache clear

# 预热缓存(提前加载常用数据)
cliproxyapi cache warmup --paths "/api/config,/api/categories"

# 导出缓存配置
cliproxyapi cache export --format json --output cache_config.json

# 导入缓存配置
cliproxyapi cache import --file cache_config.json

实战五:WebSocket 代理与实时通信

场景描述

你的应用需要支持实时通信功能,如在线聊天、实时协作、股票行情推送等。CLIProxyAPI 支持 WebSocket 代理,让你能够轻松地将这些功能集成到现有架构中。

实现步骤

第一步:配置 WebSocket 代理

# config/websocket.yaml
version: "1.0"
name: "WebSocket 代理配置"

server:
  host: "0.0.0.0"
  port: 8080

# WebSocket 路由配置
websocket:
  # 基础 WebSocket 代理
  - name: 聊天服务
    match:
      path: "/ws/chat/*"
    target: ws://localhost:9000/chat
    ping_interval: 30          # 心跳间隔(秒)
    ping_timeout: 10           # 心跳超时(秒)
    max_message_size: 65536   # 最大消息大小(字节)
    queue_size: 1000          # 消息队列大小

    # 连接管理
    connection:
      max_connections: 10000
      idle_timeout: 3600      # 空闲超时(秒)
      auth_required: true

    # 中间件
    middleware:
      - name: ws_auth          # WebSocket 认证
      - name: ws_rate_limit   # WebSocket 限流
        config:
          messages_per_second: 100
          connections_per_ip: 10
      - name: ws_logger        # WebSocket 日志

  # 带压缩的 WebSocket
  - name: 实时数据推送
    match:
      path: "/ws/realtime/*"
    target: ws://localhost:9001/realtime
    compression:
      enabled: true
      level: 6                 # 压缩级别 (1-9)
      mem_level: 8            # 内存级别
      window_bits: 15         # 窗口大小

# HTTP 降级处理
http_fallback:
  enabled: true
  fallback_path: "/http-fallback"

  # SSE (Server-Sent Events) 作为备选
  - name: SSE 备选方案
    match:
      path: "/sse/*"
    target: http://localhost:9002/sse
    content_type: "text/event-stream"

第二步:创建 WebSocket 客户端测试工具

#!/usr/bin/env python3
"""
WebSocket 客户端测试工具
用于测试 WebSocket 连接和消息收发
"""

import asyncio
import json
import time
import argparse
from websockets.client import connect
from websockets.exceptions import ConnectionClosed

class WebSocketTester:
    """WebSocket 测试器"""

    def __init__(self, url, token=None):
        self.url = url
        self.token = token
        self.websocket = None
        self.message_count = 0
        self.start_time = None
        self.latencies = []

    async def connect(self):
        """建立 WebSocket 连接"""
        headers = {}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"

        self.websocket = await connect(self.url, extra_headers=headers)
        self.start_time = time.time()
        print(f"✓ 已连接到 {self.url}")

    async def send_message(self, message: dict):
        """
        发送消息

        Args:
            message: 要发送的消息字典
        """
        if not self.websocket:
            raise Exception("未连接,请先调用 connect()")

        send_time = time.time()
        await self.websocket.send(json.dumps(message))

        print(f"→ 发送: {json.dumps(message, ensure_ascii=False)}")

        # 如果请求响应式消息,等待响应并计算延迟
        if "id" in message:
            response = await self.receive_message()
            if response:
                latency = time.time() - send_time
                self.latencies.append(latency)
                print(f"← 响应延迟: {latency*1000:.2f}ms")

    async def receive_message(self):
        """
        接收消息

        Returns:
            解析后的消息字典
        """
        if not self.websocket:
            return None

        try:
            message = await asyncio.wait_for(
                self.websocket.recv(),
                timeout=30
            )
            self.message_count += 1

            try:
                parsed = json.loads(message)
                print(f"← 收到: {json.dumps(parsed, ensure_ascii=False)}")
                return parsed
            except json.JSONDecodeError:
                print(f"← 收到 (原始): {message}")
                return message

        except asyncio.TimeoutError:
            print("⚠ 接收超时")
            return None
        except ConnectionClosed as e:
            print(f"✗ 连接已关闭: {e}")
            return None

    async def ping_test(self, count=10):
        """
        执行 ping 测试

        Args:
            count: 测试次数
        """
        print(f"\n执行 {count} 次 ping 测试...")

        for i in range(count):
            start = time.time()

            # 发送 ping 消息
            await self.websocket.send(json.dumps({
                "type": "ping",
                "timestamp": start
            }))

            # 等待 pong 响应
            response = await self.receive_message()

            if response and response.get("type") == "pong":
                latency = (time.time() - start) * 1000
                self.latencies.append(latency / 1000)
                print(f"  Ping #{i+1}: {latency:.2f}ms")

            await asyncio.sleep(1)

    def print_stats(self):
        """打印统计信息"""
        duration = time.time() - self.start_time

        print("\n" + "=" * 50)
        print("WebSocket 连接统计:")
        print("=" * 50)
        print(f"  连接时长: {duration:.1f} 秒")
        print(f"  收到消息: {self.message_count} 条")
        print(f"  消息速率: {self.message_count/duration:.2f} 条/秒")

        if self.latencies:
            avg_latency = sum(self.latencies) / len(self.latencies)
            min_latency = min(self.latencies)
            max_latency = max(self.latencies)

            print(f"\n延迟统计 (共 {len(self.latencies)} 个样本):")
            print(f"  平均延迟: {avg_latency*1000:.2f}ms")
            print(f"  最小延迟: {min_latency*1000:.2f}ms")
            print(f"  最大延迟: {max_latency*1000:.2f}ms")

        print("=" * 50)

    async def close(self):
        """关闭连接"""
        if self.websocket:
            self.print_stats()
            await self.websocket.close()
            print("✓ 连接已关闭")


async def main():
    """主函数"""
    parser = argparse.ArgumentParser(description="WebSocket 测试工具")
    parser.add_argument("url", help="WebSocket 服务器地址")
    parser.add_argument("--token", "-t", help="认证令牌")
    parser.add_argument("--test", action="store_true", help="执行 ping 测试")
    parser.add_argument("--count", "-c", type=int, default=10, help="ping 测试次数")

    args = parser.parse_args()

    tester = WebSocketTester(args.url, args.token)

    try:
        await tester.connect()

        if args.test:
            await tester.ping_test(args.count)
        else:
            # 交互式模式
            print("\n提示:输入消息并按回车发送,输入 'quit' 退出")

            while True:
                user_input = input("\n> ").strip()

                if user_input.lower() == "quit":
                    break

                if user_input:
                    message = {
                        "type": "message",
                        "content": user_input,
                        "timestamp": time.time()
                    }
                    await tester.send_message(message)

                    # 尝试接收响应
                    await tester.receive_message()

    except KeyboardInterrupt:
        print("\n\n正在退出...")
    finally:
        await tester.close()


if __name__ == "__main__":
    asyncio.run(main())

第三步:测试 WebSocket 代理

# 启动 WebSocket 代理服务
cliproxyapi start --config config/websocket.yaml

# 运行 WebSocket 测试工具
python ws_tester.py ws://localhost:8080/ws/chat

# 执行 ping 测试
python ws_tester.py ws://localhost:8080/ws/chat --test --count 20

# 使用 wscat 工具测试(需要先安装)
# npm install -g wscat
wscat -c ws://localhost:8080/ws/chat

# 订阅消息
{"action": "subscribe", "channel": "general"}

# 发送消息
{"action": "send", "channel": "general", "message": "Hello!"}

# 查看 WebSocket 连接状态
cliproxyapi websocket status

# 查看活跃连接
cliproxyapi websocket connections --limit 50

常见使用场景 / Common Use Cases

场景一:远程开发环境访问

# 场景:需要通过代理访问远程开发服务器
# 例如:访问公司内网的开发环境 API

routes:
  - name: 远程开发 API
    match:
      path: "/remote-api/*"
    target: http://dev-server.internal.company.com
    proxy:
      enabled: true
      type: http
      host: vpn-proxy.company.com
      port: 8080
      auth:
        username: "{{ ENV.DEPLOY_USER }}"
        password: "{{ ENV.DEPLOY_PASS }}"
# 使用环境变量存储敏感信息
export DEPLOY_USER=your_username
export DEPLOY_PASS=your_password

# 启动代理
cliproxyapi start --config remote_dev.yaml

# 现在可以通过本地代理访问远程 API
curl http://localhost:3000/remote-api/users

场景二:API 负载均衡

# 场景:多个后端服务器之间的负载均衡
# 支持多种负载均衡策略

routes:
  - name: API 负载均衡
    match:
      path: "/api/*"

    # 加权轮询
    targets:
      - url: http://backend-1:8080
        weight: 3
        max_connections: 1000
      - url: http://backend-2:8080
        weight: 3
        max_connections: 1000
      - url: http://backend-3:8080
        weight: 2
        max_connections: 500

    strategy: weighted_round_robin

    # 健康检查配置
    health_check:
      enabled: true
      interval: 10
      timeout: 5
      path: /health
      unhealthy_threshold: 3
      healthy_threshold: 2
# 查看后端服务器健康状态
cliproxyapi lb health

# 手动移除故障后端
cliproxyapi lb remove backend-3

# 添加新后端
cliproxyapi lb add backend-4 http://backend-4:8080 --weight 2

# 查看负载统计
cliproxyapi lb stats

场景三:灰度发布与 A/B 测试

# 场景:渐进式发布新版本,验证稳定性

routes:
  - name: 灰度发布路由
    match:
      path: "/api/v2/*"

    # 按权重分流
    targets:
      - url: http://new-version:8080
        weight: 10      # 10% 流量
      - url: http://stable-version:8080
        weight: 90      # 90% 流量

    # 按 Header 分流(内部测试人员)
    rules:
      - name: 内部测试人员
        match:
          headers:
            X-Test-User: "true"
        target: http://new-version:8080
        weight: 100

    # 按 Cookie 分流(用户粘性)
    - name: 用户粘性路由
      match:
        cookies:
          version: "new"
      target: http://new-version:8080
# 查看灰度流量分布
cliproxyapi canary stats

# 调整流量比例
cliproxyapi canary weight --set 30

# 提升版本(将新版本设为稳定版本)
cliproxyapi canary promote

# 回滚
cliproxyapi canary rollback

场景四:请求聚合与数据合并

# 场景:客户端需要从多个 API 获取数据,通过代理聚合减少请求次数

routes:
  - name: 聚合接口
    match:
      path: "/aggregate/dashboard"

    # 定义聚合的子请求
    requests:
      - id: user_info
        url: http://localhost:8080/api/user/{user_id}
        cache: true
        cache_ttl: 300

      - id: user_orders
        url: http://localhost:8080/api/orders?user_id={user_id}&limit=5
        cache: false

      - id: user_notifications
        url: http://localhost:8080/api/notifications?user_id={user_id}&unread=true
        cache: true
        cache_ttl: 60

      - id: system_stats
        url: http://localhost:8080/api/stats
        parallel: true

    # 响应模板
    response_template: |
      {
        "user": "{{ user_info }}",
        "recent_orders": "{{ user_orders.orders }}",
        "unread_notifications": "{{ user_notifications }}",
        "system_status": "{{ system_stats }}"
      }

    # 并发执行优化
    execution:
      parallel: true
      max_concurrent: 4
      timeout: 5000
# 调用聚合接口
curl "http://localhost:8080/aggregate/dashboard?user_id=123"

# 预热聚合缓存
cliproxyapi aggregate warmup --paths "/aggregate/dashboard"

技巧与最佳实践 / Tips and Best Practices

性能优化技巧

技巧一:合理设置缓存 TTL

# 根据数据变化频率设置缓存策略
cache_strategies = {
    # 几乎不变的数据 - 长缓存
    "配置信息": {"ttl": 3600, "vary": ["Accept-Language"]},
    "分类列表": {"ttl": 1800, "vary": []},

    # 用户相关 - 中等缓存
    "用户资料": {"ttl": 300, "vary": ["Authorization"]},
    "权限信息": {"ttl": 600, "vary": []},

    # 动态数据 - 短缓存或禁用
    "实时数据": {"ttl": 5, "vary": []},
    "搜索结果": {"ttl": 60, "vary": ["Accept-Language", "X-User-Id"]},
}

技巧二:使用连接池

# 配置连接池以提高性能
connection_pool:
  # HTTP 连接池
  http:
    max_connections: 100
    max_connections_per_host: 10
    keepalive: true
    keepalive_timeout: 30

  # 数据库连接池(如果有)
  database:
    max_connections: 20
    min_connections: 5
    acquire_timeout: 30

技巧三:启用压缩

# 请求和响应压缩
compression:
  enabled: true

  # Gzip 压缩配置
  gzip:
    level: 6              # 压缩级别 1-9
    min_length: 1024      # 最小压缩长度
    mime_types:
      - "application/json"
      - "text/plain"
      - "text/html"
      - "text/css"
      - "application/javascript"

安全最佳实践

实践一:敏感信息管理

# 使用环境变量存储敏感信息
export CLIPROXY_SECRET_KEY="your-secret-key-here"
export CLIPROXY_DB_PASSWORD="database-password"

# 或者使用密钥管理服务集成
# 支持 HashiCorp Vault, AWS Secrets Manager 等
# 配置密钥管理
secrets:
  provider: "vault"  # 或 "aws", "azure", "gcp"

  vault:
    address: "https://vault.example.com"
    auth_method: "kubernetes"
    role: "cliproxy"
    path: "secret/data/cliproxy"

  # 或者直接在配置中引用环境变量
  # 格式:{{ ENV.VAR_NAME }}
  references:
    api_key: "{{ ENV.API_SECRET_KEY }}"
    db_password: "{{ ENV.DB_PASSWORD }}"

实践二:请求验证与过滤

# middleware/security.py
"""
安全中间件
实现请求验证、IP 白名单、频率限制等安全功能
"""

from cliproxyapi.middleware import BaseMiddleware

class SecurityMiddleware(BaseMiddleware):
    """安全中间件基类"""

    def __init__(self, config: dict):
        self.allowed_ips = set(config.get("allowed_ips", []))
        self.blocked_ips = set(config.get("blocked_ips", []))
        self.rate_limit = config.get("rate_limit", 1000)
        self.require_auth = config.get("require_auth", False)

    def on_request(self, context):
        """请求安全检查"""
        client_ip = context.request.client_ip

        # IP 黑名单检查
        if client_ip in self.blocked_ips:
            raise PermissionError(f"IP {client_ip} 已被禁止访问")

        # IP 白名单检查(如果配置了白名单)
        if self.allowed_ips and client_ip not in self.allowed_ips:
            raise PermissionError(f"IP {client_ip} 不在允许访问列表中")

        # 认证检查
        if self.require_auth:
            auth_header = context.request.headers.get("Authorization")
            if not self._validate_auth(auth_header):
                raise PermissionError("认证失败")

        return context

    def _validate_auth(self, auth_header: str) -> bool:
        """验证认证信息"""
        # 实现具体的认证逻辑
        if not auth_header:
            return False

        # 验证 Bearer Token
        if auth_header.startswith("Bearer "):
            token = auth_header[7:]
            return self._verify_token(token)

        return False

    def _verify_token(self, token: str) -> bool:
        """验证 Token"""
        # 实现 Token 验证逻辑
        import hashlib
        expected = hashlib.sha256(token.encode()).hexdigest()
        return token == expected  # 简化示例,实际应使用 JWT 等

实践三:日志脱敏

# middleware/logging.py
"""
日志中间件
实现请求日志记录,并对敏感信息进行脱敏
"""

import re
from cliproxyapi.middleware import BaseMiddleware

class SecureLoggingMiddleware(BaseMiddleware):
    """安全日志中间件"""

    def __init__(self, config: dict):
        self.log_level = config.get("log_level", "INFO")
        self.log_body = config.get("log_body", False)
        self.sensitive_fields = config.get("sensitive_fields", [
            "password", "secret", "token", "api_key", "credit_card"
        ])
        self.mask_char = config.get("mask_char", "*")

    def _mask_sensitive_data(self, data: dict) -> dict:
        """
        对敏感字段进行脱敏

        Args:
            data: 原始数据字典

        Returns:
            脱敏后的数据字典
        """
        if not isinstance(data, dict):
            return data

        masked = {}

        for key, value in data.items():
            # 检查是否是敏感字段
            is_sensitive = any(
                sensitive in key.lower()
                for sensitive in self.sensitive_fields
            )

            if is_sensitive:
                # 对敏感字段进行脱敏
                if isinstance(value, str):
                    masked[key] = self._mask_string(value)
                elif isinstance(value, dict):
                    masked[key] = "*** (object)"
                elif isinstance(value, list):
                    masked[key] = "*** (array)"
                else:
                    masked[key] = "***"
            else:
                # 递归处理非敏感字段
                if isinstance(value, dict):
                    masked[key] = self._mask_sensitive_data(value)
                else:
                    masked[key] = value

        return masked

    def _mask_string(self, value: str, visible_chars: int = 4) -> str:
        """
        掩码字符串

        Args:
            value: 原始字符串
            visible_chars: 保留的可显示字符数

        Returns:
            掩码后的字符串
        """
        if len(value) <= visible_chars:
            return self.mask_char * len(value)

        visible_start = value[:visible_chars]
        mask_length = len(value) - visible_chars
        return f"{visible_start}{self.mask_char * mask_length}"

    def on_request(self, context):
        """记录请求日志"""
        log_data = {
            "timestamp": context.request.timestamp,
            "method": context.request.method,
            "path": context.request.path,
            "client_ip": context.request.client_ip,
        }

        if self.log_body and context.request.body:
            try:
                import json
                body = json.loads(context.request.body)
                log_data["body"] = self._mask_sensitive_data(body)
            except Exception:
                log_data["body"] = "[无法解析]"

        # 输出日志
        print(f"[REQUEST] {log_data}")

        return context

调试技巧

技巧一:启用调试模式

# 启动调试模式
cliproxyapi start --debug

# 或者设置日志级别
cliproxyapi start --log-level DEBUG

# 实时查看详细日志
cliproxyapi logs --level DEBUG --follow

技巧二:请求追踪

# 启用请求追踪
cliproxyapi start --trace-enabled

# 为请求添加追踪 ID
curl -H "X-Trace-Id: my-request-123" http://localhost:8080/api/test

# 查看追踪信息
cliproxyapi trace show my-request-123

# 查看最近的追踪记录
cliproxyapi trace list --limit 100

技巧三:模拟故障测试

# config/fault_injection.yaml
fault_injection:
  enabled: true

  # 延迟注入
  - name: 随机延迟
    match:
      path: "/api/*"
    fault:
      type: delay
      delay_ms: 100-500       # 100-500ms 随机延迟
      probability: 0.1       # 10% 概率触发

  # 错误注入
  - name: 模拟错误
    match:
      path: "/api/orders/*"
    fault:
      type: error
      status_code: 500
      probability: 0.05       # 5% 概率触发
      body: '{"error": "Internal Server Error"}'

  # 异常响应
  - name: 超时模拟
    match:
      path: "/api/slow/*"
    fault:
      type: timeout
      probability: 0.2         # 20% 概率触发
# 启用故障注入
cliproxyapi fault-injection enable

# 查看故障注入统计
cliproxyapi fault-injection stats

# 禁用故障注入
cliproxyapi fault-injection disable

总结 / Conclusion

核心要点回顾

CLIProxyAPI 核心能力总结:
│
├── 🎯 代理管理
│   ├── 支持多种代理协议(HTTP/HTTPS/SOCKS5/ShadowSocks)
│   ├── 简单的命令式配置
│   └── 灵活的认证支持
│
├── 🔀 智能路由
│   ├── 基于路径、Header、参数的路由匹配
│   ├── 多种负载均衡策略
│   └── 路由优先级和条件判断
│
├── 🔌 插件扩展
│   ├── 丰富的内置中间件
│   ├── 自定义插件开发
│   └── 插件市场生态
│
├── 🛡️ 安全特性
│   ├── 限流与熔断保护
│   ├── 认证与授权
│   └── 日志脱敏
│
├── ⚡ 性能优化
│   ├── 智能缓存
│   ├── 连接池管理
│   └── 压缩传输
│
└── 🔧 调试工具
    ├── 请求追踪
    ├── 故障注入
    └── 实时日志

下一步学习路径

graph LR
    A[入门基础] --> B[进阶配置]
    B --> C[插件开发]
    C --> D[生产部署]
    D --> E[最佳实践]

    A -.- A1[环境搭建] -.- A2[基础路由] -.- A3[简单代理]
    B -.- B1[负载均衡] -.- B2[中间件配置] -.- B3[缓存策略]
    C -.- C1[插件架构] -.- C2[自定义中间件] -.- C3[集成外部服务]
    D -.- D1[高可用部署] -.- D2[监控告警] -.- D3[容器化]
    E -.- E1[性能优化] -.- E2[安全加固] -.- E3[故障排查]

相关资源链接

官方资源:

  • GitHub 仓库:https://github.com/router-for-me/CLIProxyAPI
  • 官方文档:https://docs.cliproxyapi.example.com
  • 示例项目:https://github.com/router-for-me/CLIProxyAPI-Examples
  • 插件市场:https://plugins.cliproxyapi.example.com

学习资源:

  • 视频教程:https://youtube.com/@cliproxyapi
  • 社区论坛:https://community.cliproxyapi.example.com
  • 技术博客:https://blog.cliproxyapi.example.com

相关 AI 项目推荐:

如果你对 API 管理和网络工具感兴趣,以下项目也值得关注:
│
├── 🌐 API 工具
│   ├── Postman - API 开发协作平台
│   ├── Insomnia - 开源 API 客户端
│   └── Bruno - Git 友好的 API 客户端
│
├── 🔄 API 网关
│   ├── Kong - 云原生 API 网关
│   ├── APISIX - 动态、实时、高性能 API 网关
│   └── Traefik - 现代 HTTP 反向代理和负载均衡器
│
├── 🛡️ 代理工具
│   ├── Mitmproxy - 交互式 HTTPS 代理
│   ├── Charles - Web 调试代理
│   └── WireGuard - 下一代 VPN
│
└── 🤖 AI 相关
    ├── LangChain - 构建 LLM 应用的框架
    ├── LangServe - 部署 LangChain 模型的工具
    └── PortKey - AI 应用的可观测性平台

贡献与支持

CLIProxyAPI 是一个开源项目,欢迎所有形式的贡献:

# 克隆仓库
git clone https://github.com/router-for-me/CLIProxyAPI.git

# 创建功能分支
git checkout -b feature/amazing-feature

# 提交更改
git commit -m "Add amazing feature"

# 推送分支
git push origin feature/amazing-feature

# 创建 Pull Request

**如果这篇文章对你有帮助,欢迎分享给更多需要的朋友!**

**关注作者,获取更多优质技术内容**

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注