🎯 当「12-Factor」遇上AI Agent:人类层框架让大模型应用可控可追溯

🎯 当「12-Factor」遇上AI Agent:人类层框架让大模型应用可控可追溯

🎯 当「12-Factor」遇上AI Agent:人类层框架让大模型应用可控可追溯

别再让AI Agent自由发挥了,「人类层」才是企业级AI落地的最优解


为什么值得关注:AI Agent需要「刹车」和「方向盘」

在AI应用爆发的今天,大模型Agent正在以惊人的速度渗透到各个业务场景。但问题也随之而来:

当你让一个AI Agent自动执行操作时,你真的知道它会做什么吗?

  • 它会不会在执行过程中泄露敏感数据?
  • 它会不会在关键时刻做出错误的业务决策?
  • 它会不会在不可逆操作(如删除、支付)上「一路绿灯」?

这正是 humanlayer/12-factor-agents 要解决的核心问题。这个框架将经典的「12-Factor App」方法论与AI Agent开发完美结合,为我们提供了一套构建可控、可追溯、有人类监督的AI Agent应用的完整方案。

这个项目的核心价值

传统Agent        vs        12-Factor-Agents
┌─────────┐                ┌─────────────────┐
│ 直接执行 │     →→→      │ 执行前需人类确认 │
│ 无审计   │     →→→      │ 完整操作日志    │
│ 风险不可控│     →→→      │ 多层安全防护    │
└─────────┘                └─────────────────┘

关键特性一览:

  • 提供「人类在环」(Human-in-the-Loop) 机制,确保关键操作经过人工审批
  • 内置完整的操作审计日志,记录每个AI决策的上下文
  • 支持函数调用前的条件拦截和参数验证
  • 开箱即用的Approval Flow,灵活配置审批规则
  • 兼容主流AI Agent框架(LangChain、AutoGen等)

环境搭建:从零开始搭建开发环境

系统要求

在开始之前,确保你的开发环境满足以下要求:

- Python 3.9 或更高版本
- pip 包管理器
- 推荐的虚拟环境工具(venv 或 conda)

安装步骤

第一步:创建虚拟环境(推荐)

# 使用 venv 创建隔离环境
python -m venv agent-env

# 激活虚拟环境
# Linux/macOS
source agent-env/bin/activate

# Windows
agent-env\Scripts\activate

第二步:安装核心依赖

# 安装 humanlayer 核心包
pip install humanlayer

# 安装必要的AI框架支持
pip install langchain openai anthropic

# 安装可选的日志和数据处理库
pip install loguru pydantic

第三步:验证安装

# 创建一个验证脚本:verify_install.py
import humanlayer

# 检查版本
print(f"HumanLayer 版本: {humanlayer.__version__}")

# 检查核心功能是否可用
from humanlayer import HumanLayer
hl = HumanLayer()
print("✅ HumanLayer 安装成功!")

运行验证脚本:

python verify_install.py

你应该看到类似输出:

HumanLayer 版本: 0.1.x
✅ HumanLayer 安装成功!

项目结构初始化

创建一个标准的12-Factor-Agents项目结构:

# 创建项目目录
mkdir my-first-agent && cd my-first-agent

# 创建目录结构
mkdir -p src/agents
mkdir -p config
mkdir -p logs
mkdir -p tests

# 创建核心文件
touch src/__init__.py
touch src/agents/__init__.py
touch config/approvals.yaml
touch config/features.yaml

核心功能详解:深入理解HumanLayer架构

1. Human-in-the-Loop 审批机制

HumanLayer的核心是「审批网关」(Approval Gateway)。在AI Agent执行敏感操作前,系统会自动暂停并等待人工确认。

# 审批流程示意
"""
Agent意图识别 → 触发审批条件 → 暂停执行

                    ┌─────────────────────────┐
                    │     审批网关             │
                    │  ┌───────────────────┐  │
                    │  │ 操作描述: 删除文件 │  │
                    │  │ 风险等级: 🔴 高    │  │
                    │  │ 参数: /data/tmp/*  │  │
                    │  └───────────────────┘  │
                    │                         │
                    │  [批准] [拒绝] [修改参数] │
                    └─────────────────────────┘

                      批准 → 执行 / 拒绝 → 终止
"""

2. 函数调用拦截器 (Function Call Interceptor)

拦截器是12-Factor-Agents的核心组件,它在函数执行前进行多维度检查:

# 拦截器检查流程
class FunctionInterceptor:
    """
    多层检查机制:

    Layer 1: 功能白名单检查
    Layer 2: 参数类型和范围验证  
    Layer 3: 敏感信息脱敏
    Layer 4: 风险评分计算
    Layer 5: 人工审批触发(如果需要)
    """

    def __init__(self, config):
        self.config = config
        self.approval_rules = config.get("approval_rules", {})

    def should_approve(self, function_name, parameters):
        """
        判断是否需要人工审批

        返回: (needs_approval: bool, reason: str)
        """
        # 检查是否为高风险操作
        if self.is_high_risk_operation(function_name):
            return True, f"高风险操作: {function_name}"

        # 检查参数是否包含敏感数据
        if self.contains_sensitive_data(parameters):
            return True, "参数包含敏感信息"

        # 检查是否超过操作阈值
        if self.exceeds_threshold(parameters):
            return True, f"参数超过预设阈值"

        return False, "无需审批"

3. 审计日志系统

完整的操作记录是合规和排查问题的关键:

# 日志数据结构
{
    "timestamp": "2024-01-15T10:30:45.123Z",
    "agent_id": "agent-001",
    "session_id": "session-abc123",
    "function_call": {
        "name": "send_email",
        "parameters": {
            "to": "user@example.com",
            "subject": "订单确认",
            "body": "您的订单已处理"
        }
    },
    "approval": {
        "status": "approved",  # approved, rejected, modified
        "approver": "admin@example.com",
        "approval_time": "2024-01-15T10:31:00.000Z",
        "comment": "确认无误"
    },
    "execution": {
        "start_time": "2024-01-15T10:31:00.500Z",
        "end_time": "2024-01-15T10:31:02.100Z",
        "status": "success",
        "result": "邮件发送成功"
    }
}

4. 配置驱动的能力体系 (Features)

12-Factor方法论强调「配置与代码分离」,HumanLayer完美实践了这一点:

# config/features.yaml
features:
  # 文件操作配置
  file_operations:
    enabled: true
    max_file_size_mb: 100
    allowed_extensions: [".txt", ".csv", ".json"]
    require_approval_for:
      - delete
      - overwrite
      - external_access

  # 通信功能配置
  communications:
    enabled: true
    approved_recipients:
      - "@company.com"
    require_approval_for:
      - external_emails
      - scheduled_messages

  # 数据访问配置  
  data_access:
    enabled: true
    allowed_databases:
      - "analytics_db"
      - "user_db"
    require_approval_for:
      - user_pii_access
      - bulk_queries

实战教程:从入门到精通

项目一:构建带审批流程的文件管理Agent

让我们从头开始,构建一个企业级的文件管理AI助手。

第一步:定义工具函数

# src/agents/tools.py

from typing import List, Dict, Any
import os
from pathlib import Path
from humanlayer import human_approved

class FileManagerTools:
    """
    文件管理工具集
    每个高风险操作都通过 @human_approved 装饰器进行保护
    """

    @human_approved(
        name="read_file",
        description="读取指定路径的文件内容",
        risk_level="low",
        requires_approval_if={"path": {"contains": "secret"}}
    )
    def read_file(self, path: str) -> Dict[str, Any]:
        """
        读取文件内容

        Args:
            path: 文件路径

        Returns:
            包含文件内容的字典
        """
        try:
            with open(path, 'r', encoding='utf-8') as f:
                content = f.read()
            return {
                "status": "success",
                "path": path,
                "content": content,
                "size": len(content)
            }
        except FileNotFoundError:
            return {"status": "error", "message": f"文件不存在: {path}"}
        except Exception as e:
            return {"status": "error", "message": str(e)}

    @human_approved(
        name="write_file", 
        description="创建或覆盖指定文件",
        risk_level="medium",
        always_requires_approval=True
    )
    def write_file(self, path: str, content: str) -> Dict[str, Any]:
        """
        写入文件

        Args:
            path: 文件路径
            content: 文件内容

        Returns:
            操作结果
        """
        # 参数预检查
        if not path:
            return {"status": "error", "message": "路径不能为空"}

        # 确保目录存在
        Path(path).parent.mkdir(parents=True, exist_ok=True)

        try:
            with open(path, 'w', encoding='utf-8') as f:
                f.write(content)
            return {
                "status": "success",
                "path": path,
                "bytes_written": len(content.encode('utf-8'))
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}

    @human_approved(
        name="delete_file",
        description="删除指定文件",
        risk_level="high",
        always_requires_approval=True,
        confirmation_template="确认删除文件 {path}?此操作不可撤销。"
    )
    def delete_file(self, path: str) -> Dict[str, Any]:
        """
        删除文件(不可恢复)

        Args:
            path: 要删除的文件路径

        Returns:
            操作结果
        """
        try:
            if not os.path.exists(path):
                return {"status": "error", "message": f"文件不存在: {path}"}

            os.remove(path)
            return {
                "status": "success",
                "message": f"文件已删除: {path}"
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}

    @human_approved(
        name="list_directory",
        description="列出目录内容",
        risk_level="low"
    )
    def list_directory(self, path: str = ".") -> Dict[str, Any]:
        """
        列出目录下的文件和文件夹

        Args:
            path: 目录路径

        Returns:
            目录内容列表
        """
        try:
            items = []
            for item in os.listdir(path):
                item_path = os.path.join(path, item)
                items.append({
                    "name": item,
                    "type": "directory" if os.path.isdir(item_path) else "file",
                    "size": os.path.getsize(item_path) if os.path.isfile(item_path) else None
                })
            return {
                "status": "success",
                "path": path,
                "items": items
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}

第二步:配置审批规则

# config/approvals.yaml

approval_rules:
  # 全局设置
  global:
    default_risk_level: "low"
    timeout_seconds: 300
    escalation_enabled: true

  # 风险等级定义
  risk_levels:
    low:
      auto_approve: true
      notification_only: false
    medium:
      auto_approve: false
      require_one_approval: true
    high:
      auto_approve: false
      require_two_approvals: true
      timeout_escalation: "supervisor@example.com"

  # 功能特定的规则
  function_rules:
    delete_file:
      risk_level: "high"
      always_approve: false
      preview_changes: true

    write_file:
      risk_level: "medium"
      preview_content: true
      size_limit_mb: 50

    read_file:
      risk_level: "low"
      auto_approve: true
      log_access: true

第三步:创建Agent主类

# src/agents/file_agent.py

from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from humanlayer import HumanLayer, ApprovalRequest, ApprovalResponse
from .tools import FileManagerTools

@dataclass
class AgentResponse:
    """Agent响应数据类"""
    success: bool
    message: str
    data: Optional[Dict[str, Any]] = None
    approval_requested: bool = False
    execution_id: Optional[str] = None

@dataclass
class AgentConfig:
    """Agent配置"""
    agent_name: str = "FileManagerAgent"
    max_iterations: int = 10
    enable_logging: bool = True
    log_path: str = "logs/agent.log"

class FileManagerAgent:
    """
    企业级文件管理Agent

    特性:
    - 所有文件操作都经过审批网关
    - 完整的操作审计日志
    - 敏感文件访问保护
    - 操作历史追踪
    """

    def __init__(self, config: Optional[AgentConfig] = None):
        self.config = config or AgentConfig()
        self.tools = FileManagerTools()
        self.human_layer = HumanLayer()

        # 初始化日志
        if self.config.enable_logging:
            from loguru import logger
            logger.add(
                self.config.log_path,
                rotation="10 MB",
                retention="7 days",
                level="INFO"
            )
            self.logger = logger

        # 操作历史
        self.execution_history: List[Dict[str, Any]] = []

    def _log_operation(self, operation: str, params: Dict, result: Any):
        """记录操作日志"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "operation": operation,
            "parameters": params,
            "result": result
        }
        self.execution_history.append(log_entry)

        if self.config.enable_logging:
            self.logger.info(f"操作: {operation}, 参数: {params}")

    def process_request(self, request: str) -> AgentResponse:
        """
        处理用户请求

        Args:
            request: 自然语言请求

        Returns:
            AgentResponse: 处理结果
        """
        # 意图识别
        intent = self._identify_intent(request)

        # 提取参数
        params = self._extract_parameters(request, intent)

        # 执行操作(会触发审批流程)
        try:
            result = self._execute_with_approval(intent, params)
            self._log_operation(intent, params, result)
            return result
        except Exception as e:
            return AgentResponse(
                success=False,
                message=f"执行出错: {str(e)}"
            )

    def _identify_intent(self, request: str) -> str:
        """识别用户意图"""
        request_lower = request.lower()

        if any(word in request_lower for word in ["读取", "查看", "打开", "read"]):
            return "read_file"
        elif any(word in request_lower for word in ["写入", "创建", "保存", "write"]):
            return "write_file"
        elif any(word in request_lower for word in ["删除", "remove", "delete"]):
            return "delete_file"
        elif any(word in request_lower for word in ["列出", "列表", "list", "ls"]):
            return "list_directory"
        else:
            return "unknown"

    def _extract_parameters(self, request: str, intent: str) -> Dict[str, Any]:
        """从请求中提取参数"""
        params = {}

        # 简单的参数提取逻辑
        # 实际应用中可以使用更复杂的NLP处理

        if intent == "read_file":
            # 提取文件路径
            import re
            path_match = re.search(r'["\'](.+?)["\']|([\w./]+)', request)
            if path_match:
                params["path"] = path_match.group(1) or path_match.group(2)

        elif intent == "write_file":
            import re
            path_match = re.search(r'["\'](.+?)["\']', request)
            if path_match:
                params["path"] = path_match.group(1)
            # 提取内容
            content_match = re.search(r'内容[::]\s*["\'](.+?)["\']', request)
            if content_match:
                params["content"] = content_match.group(1)

        elif intent == "delete_file":
            import re
            path_match = re.search(r'["\'](.+?)["\']|([\w./]+)', request)
            if path_match:
                params["path"] = path_match.group(1) or path_match.group(2)

        elif intent == "list_directory":
            import re
            path_match = re.search(r'目录[::]\s*["\']?([\w./]*?)["\']?', request)
            if path_match:
                params["path"] = path_match.group(1)
            else:
                params["path"] = "."

        return params

    def _execute_with_approval(self, intent: str, params: Dict) -> AgentResponse:
        """通过审批流程执行操作"""

        # 获取工具方法
        tool_method = getattr(self.tools, intent, None)
        if not tool_method:
            return AgentResponse(
                success=False,
                message=f"未知操作: {intent}"
            )

        # 通过HumanLayer执行,会自动处理审批流程
        result = tool_method(**params)

        if result.get("status") == "error":
            return AgentResponse(
                success=False,
                message=result.get("message", "未知错误")
            )

        return AgentResponse(
            success=True,
            message="操作成功",
            data=result,
            approval_requested=result.get("_approval_requested", False)
        )

    def get_history(self) -> List[Dict[str, Any]]:
        """获取操作历史"""
        return self.execution_history

第四步:创建命令行界面

# src/cli.py

import sys
from src.agents.file_agent import FileManagerAgent, AgentConfig

def main():
    """命令行入口"""
    print("=" * 50)
    print("🤖 企业级文件管理Agent")
    print("=" * 50)
    print("提示:所有文件操作都受HumanLayer保护")
    print("输入 'help' 查看可用命令,输入 'quit' 退出")
    print("-" * 50)

    # 初始化Agent
    config = AgentConfig(
        agent_name="FileManagerAgent",
        enable_logging=True,
        log_path="logs/agent.log"
    )
    agent = FileManagerAgent(config)

    while True:
        try:
            user_input = input("\n📝 请输入指令: ").strip()

            if not user_input:
                continue

            if user_input.lower() in ["quit", "exit", "退出"]:
                print("再见!👋")
                break

            if user_input.lower() == "help":
                print_help()
                continue

            if user_input.lower() == "history":
                show_history(agent)
                continue

            # 处理请求
            response = agent.process_request(user_input)

            # 输出结果
            print_result(response)

        except KeyboardInterrupt:
            print("\n\n操作已取消")
            break
        except Exception as e:
            print(f"❌ 错误: {e}")

def print_help():
    """打印帮助信息"""
    help_text = """
可用命令:
---------
1. 读取文件: "读取文件 '/path/to/file.txt'"
2. 写入文件: "写入文件 '/path/to/file.txt' 内容: '文件内容'"
3. 删除文件: "删除文件 '/path/to/file.txt'"  
4. 列出目录: "列出目录 '/path/to/dir'"
5. 查看历史: "history"
6. 获取帮助: "help"
7. 退出程序: "quit"

示例:
-----
读取文件 '/etc/hosts'
写入文件 'test.txt' 内容: 'Hello World'
列出目录 '.'
"""
    print(help_text)

def show_history(agent):
    """显示操作历史"""
    history = agent.get_history()
    if not history:
        print("暂无操作记录")
        return

    print("\n📋 操作历史:")
    print("-" * 50)
    for i, entry in enumerate(history, 1):
        print(f"{i}. [{entry['timestamp']}]")
        print(f"   操作: {entry['operation']}")
        print(f"   参数: {entry['parameters']}")
        print(f"   结果: {entry['result'].get('status', 'unknown')}")
        print()

def print_result(response):
    """打印执行结果"""
    if response.success:
        print(f"✅ {response.message}")
        if response.data:
            if response.approval_requested:
                print("⏳ 此操作需要人工审批")
            print(f"📦 数据: {response.data}")
    else:
        print(f"❌ {response.message}")

if __name__ == "__main__":
    main()

第五步:运行测试

# 运行Agent
python -m src.cli

测试场景:

# 测试1:列出目录(低风险,自动通过)
输入: 列出目录 '.'

# 测试2:写入文件(需要审批)
输入: 写入文件 'test.txt' 内容: '这是一次测试写入'

# 测试3:删除文件(高风险,需要双重审批)
输入: 删除文件 'test.txt'

项目二:构建安全的邮件发送Agent

完整的邮件Agent实现

# src/agents/email_agent.py

from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime
from humanlayer import human_approved, HumanLayer, ApprovalContext
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

@dataclass
class EmailConfig:
    """邮件配置"""
    smtp_server: str
    smtp_port: int
    sender_email: str
    sender_password: str
    use_tls: bool = True

@dataclass  
class Email:
    """邮件数据类"""
    to: str
    subject: str
    body: str
    cc: Optional[List[str]] = None
    bcc: Optional[List[str]] = None

class EmailAgent:
    """
    企业邮件发送Agent

    安全特性:
    - 所有外发邮件需要审批
    - 限制可发送的收件人范围
    - 自动记录所有邮件往来
    - 敏感内容检测和警告
    """

    # 允许发送的域名白名单
    ALLOWED_DOMAINS = [
        "company.com",
        "partner.com", 
        "client.com"
    ]

    # 禁止发送的关键词
    BLOCKED_KEYWORDS = [
        "机密", "confidential",
        "password", "密码",
        "secret", "密钥"
    ]

    def __init__(self, config: EmailConfig):
        self.config = config
        self.human_layer = HumanLayer()
        self.sent_emails: List[Dict[str, Any]] = []

    @human_approved(
        name="send_email",
        description="发送电子邮件",
        risk_level="high",
        always_requires_approval=True,
        approval_timeout_seconds=600
    )
    def send_email(self, email: Email) -> Dict[str, Any]:
        """
        发送邮件(需要人工审批)

        Args:
            email: Email对象

        Returns:
            发送结果
        """
        # 前置检查
        validation_result = self._validate_email(email)
        if not validation_result["valid"]:
            return {
                "status": "error",
                "message": validation_result["message"]
            }

        # 执行发送
        try:
            result = self._send_via_smtp(email)

            # 记录已发送邮件
            self._log_sent_email(email, result)

            return {
                "status": "success",
                "message": f"邮件已发送至 {email.to}",
                "message_id": result.get("message_id")
            }

        except Exception as e:
            return {
                "status": "error",
                "message": f"发送失败: {str(e)}"
            }

    def _validate_email(self, email: Email) -> Dict[str, Any]:
        """验证邮件内容"""

        # 检查收件人域名
        recipient_domain = email.to.split("@")[-1] if "@" in email.to else ""
        if recipient_domain not in self.ALLOWED_DOMAINS:
            return {
                "valid": False,
                "message": f"收件人域名不在白名单中: {recipient_domain}"
            }

        # 检查敏感关键词
        combined_text = f"{email.subject} {email.body}".lower()
        for keyword in self.BLOCKED_KEYWORDS:
            if keyword.lower() in combined_text:
                return {
                    "valid": False,
                    "message": f"邮件内容包含敏感词: {keyword}"
                }

        # 检查邮件内容长度
        if len(email.body) > 50000:
            return {
                "valid": False,
                "message": "邮件内容过长,请精简"
            }

        return {"valid": True}

    def _send_via_smtp(self, email: Email) -> Dict[str, Any]:
        """通过SMTP发送邮件"""

        msg = MIMEMultipart()
        msg['From'] = self.config.sender_email
        msg['To'] = email.to
        msg['Subject'] = email.subject

        if email.cc:
            msg['Cc'] = ",".join(email.cc)

        msg.attach(MIMEText(email.body, 'plain', 'utf-8'))

        # 建立SMTP连接
        server = smtplib.SMTP(self.config.smtp_server, self.config.smtp_port)

        if self.config.use_tls:
            server.starttls()

        server.login(self.config.sender_email, self.config.sender_password)

        # 发送邮件
        recipients = [email.to]
        if email.cc:
            recipients.extend(email.cc)

        server.send_message(msg)
        server.quit()

        return {
            "message_id": f"{datetime.now().timestamp()}",
            "sent_at": datetime.now().isoformat()
        }

    def _log_sent_email(self, email: Email, result: Dict):
        """记录已发送邮件"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "to": email.to,
            "subject": email.subject,
            "body_preview": email.body[:100] + "..." if len(email.body) > 100 else email.body,
            "result": result
        }
        self.sent_emails.append(log_entry)

    def get_sent_history(self) -> List[Dict[str, Any]]:
        """获取发送历史"""
        return self.sent_emails


# 使用示例
if __name__ == "__main__":
    # 配置
    config = EmailConfig(
        smtp_server="smtp.company.com",
        smtp_port=587,
        sender_email="agent@company.com",
        sender_password="your-password-here",
        use_tls=True
    )

    # 创建Agent
    agent = EmailAgent(config)

    # 准备邮件
    email = Email(
        to="colleague@company.com",
        subject="周报总结",
        body="本周工作进展顺利,已完成核心功能开发。"
    )

    # 发送邮件(会触发审批流程)
    result = agent.send_email(email)
    print(result)

常见使用场景与案例分析

场景一:企业客服Agent

需求:构建一个客服AI,能够回答客户问题,但在以下情况需要人工介入:
- 涉及退款、投诉升级
- 需要访问客户敏感信息
- 回复内容包含法律建议
# src/agents/customer_service_agent.py

from humanlayer import human_approved, HumanLayer
from enum import Enum

class IntentType(Enum):
    GENERAL_INQUIRY = "general_inquiry"
    REFUND_REQUEST = "refund_request"
    COMPLAINT = "complaint"
    SENSITIVE_ACCESS = "sensitive_access"
    LEGAL_ADVICE = "legal_advice"

class CustomerServiceAgent:
    """
    企业客服Agent

    智能路由:
    - 简单问题 → AI直接回复
    - 敏感操作 → 转人工审批
    - 高风险请求 → 升级处理
    """

    def __init__(self):
        self.human_layer = HumanLayer()

    @human_approved(
        name="process_refund",
        description="处理退款请求",
        risk_level="high",
        always_requires_approval=True,
        approver_role="supervisor"
    )
    def process_refund(self, order_id: str, amount: float, reason: str):
        """处理退款"""
        # 退款逻辑
        pass

    @human_approved(
        name="access_customer_pii",
        description="访问客户个人信息",
        risk_level="medium",
        requires_approval_if={"amount": {">": 10000}}
    )
    def access_customer_pii(self, customer_id: str, fields: list):
        """访问客户敏感信息"""
        # 数据访问逻辑
        pass

    @human_approved(
        name="escalate_to_legal",
        description="升级为法律咨询",
        risk_level="high",
        always_requires_approval=True
    )
    def escalate_to_legal(self, ticket_id: str, description: str):
        """转交法律部门"""
        # 升级逻辑
        pass

场景二:数据分析Agent

需求:构建数据分析Agent,能够:
- 执行SQL查询(需要审批高风险查询)
- 生成可视化图表
- 导出数据报告
# src/agents/data_analytics_agent.py

from humanlayer import human_approved

class DataAnalyticsAgent:
    """
    数据分析Agent

    安全策略:
    - 只读查询自动通过
    - JOIN多表需要审批
    - 涉及PII字段需要审批
    - 批量导出需要双重审批
    """

    @human_approved(
        name="execute_read_query",
        description="执行只读数据查询",
        risk_level="low"
    )
    def execute_read_query(self, query: str):
        """执行只读查询"""
        # 确保是SELECT语句
        assert query.strip().upper().startswith("SELECT")
        # 执行查询
        pass

    @human_approved(
        name="execute_join_query",
        description="执行多表JOIN查询",
        risk_level="medium",
        always_requires_approval=True
    )
    def execute_join_query(self, query: str):
        """执行涉及多表的JOIN查询"""
        pass

    @human_approved(
        name="export_data",
        description="导出数据分析报告",
        risk_level="high",
        always_requires_approval=True,
        require_two_approvals=True
    )
    def export_data(self, format: str, destination: str):
        """导出数据"""
        pass

场景三:代码审查Agent

# src/agents/code_review_agent.py

from humanlayer import human_approved

class CodeReviewAgent:
    """
    代码审查Agent

    审查策略:
    - 自动检查代码风格
    - 安全漏洞自动标记
    - 高风险修改需要人工确认
    - 自动化重构需要双重审批
    """

    @human_approved(
        name="suggest_refactoring",
        description="建议代码重构",
        risk_level="medium",
        always_requires_approval=True,
        show_diff_preview=True
    )
    def suggest_refactoring(self, file_path: str, suggestion: str):
        """建议代码重构"""
        pass

    @human_approved(
        name="auto_fix_security",
        description="自动修复安全漏洞",
        risk_level="high",
        always_requires_approval=True,
        require_two_approvals=True
    )
    def auto_fix_security(self, vulnerability_id: str):
        """自动修复安全漏洞"""
        pass

    @human_approved(
        name="apply_code_change",
        description="直接应用代码修改",
        risk_level="critical",
        always_requires_approval=True,
        approver_role="senior_developer"
    )
    def apply_code_change(self, changes: list):
        """直接应用代码修改"""
        pass

最佳实践与高级技巧

实践一:优雅的审批流程设计

# 高级审批配置示例
from humanlayer import ApprovalStrategy, RiskLevel

# 条件化审批规则
approval_config = {
    # 基于金额的审批策略
    "amount_threshold": {
        "low": {"max": 1000, "auto_approve": True},
        "medium": {"min": 1000, "max": 10000, "require_one": True},
        "high": {"min": 10000, "require_two": True}
    },

    # 基于时间的审批策略
    "time_based": {
        "business_hours": {"auto_approve": True},
        "after_hours": {"require_one": True},
        "weekend": {"require_two": True}
    },

    # 基于用户角色的审批策略
    "role_based": {
        "new_user": {"require_one": True},
        "verified_user": {"auto_approve_threshold": "medium"},
        "premium_user": {"elevated_limits": True}
    }
}

@human_approved(
    name="process_payment",
    description="处理支付",
    dynamic_approval_strategy=approval_config
)
def process_payment(amount: float, recipient: str):
    """智能审批:根据金额、时间、用户角色动态决定审批流程"""
    pass

实践二:批量操作的安全处理

# 批量操作审批策略
@human_approved(
    name="batch_process",
    description="批量处理数据",
    risk_level="high",
    batch_mode=True,  # 启用批量模式
    batch_preview_limit=10,  # 预览前10项
    batch_require_full_approval=True  # 需要对全部项目审批
)
def batch_process(items: list):
    """批量操作需要完整预览和审批"""
    pass

实践三:与LangChain集成

# 与LangChain的集成
from langchain.agents import Agent
from langchain.tools import Tool
from humanlayer import HumanLayer

class HumanInTheLoopAgent(Agent):
    """
    集成HumanLayer的LangChain Agent

    在执行Tool之前自动进行审批检查
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.human_layer = HumanLayer()

    def _execute_tool(self, tool_name: str, tool_input: str):
        """在执行工具前进行审批"""

        # 检查是否需要审批
        needs_approval, reason = self.human_layer.check_approval_required(
            tool_name, 
            tool_input
        )

        if needs_approval:
            # 请求人工审批
            approved = self.human_layer.request_approval(
                tool_name=tool_name,
                parameters=tool_input,
                reason=reason
            )

            if not approved:
                return {"error": "操作被拒绝"}

        # 执行工具
        return super()._execute_tool(tool_name, tool_input)

实践四:自定义审批UI

# 自定义审批界面组件
from humanlayer import ApprovalUI, ApprovalCard

class CustomApprovalUI(ApprovalUI):
    """自定义审批界面"""

    def render_approval_request(self, request: ApprovalRequest):
        """渲染审批卡片"""
        card = ApprovalCard(
            title=f"⚠️ {request.operation_name} 需要审批",
            sections=[
                {
                    "操作类型": request.operation_name,
                    "描述": request.description,
                    "风险等级": self._render_risk_badge(request.risk_level),
                    "参数": request.parameters,
                    "上下文": request.context
                }
            ],
            actions=[
                {"label": "✅ 批准", "style": "success"},
                {"label": "❌ 拒绝", "style": "danger"},
                {"label": "📝 带备注批准", "action": "approve_with_comment"},
                {"label": "✏️ 修改参数", "action": "modify_params"}
            ]
        )
        return card.render()

    def _render_risk_badge(self, level: str):
        """渲染风险等级徽章"""
        colors = {
            "low": "green",
            "medium": "yellow", 
            "high": "orange",
            "critical": "red"
        }
        return f"<span class='badge badge-{colors.get(level, 'gray')}'>{level.upper()}</span>"

实践五:监控与告警

# 设置监控告警
from humanlayer import Monitor, AlertChannel

monitor = Monitor(
    alert_channels=[
        AlertChannel(
            type="slack",
            webhook_url="https://hooks.slack.com/...",
            events=["rejection", "escalation", "anomaly"]
        ),
        AlertChannel(
            type="email",
            recipients=["security@company.com"],
            events=["critical_operation", "pattern_detected"]
        )
    ],

    # 异常检测规则
    anomaly_detection={
        "rapid_rejections": {"threshold": 5, "window_minutes": 10},
        "high_risk_cluster": {"min_high_risk_operations": 10, "window_hours": 1},
        "approval_bypass_attempt": {"enabled": True}
    }
)

# 启动监控
monitor.start()

# 注册Agent
monitor.register_agent(agent_id="file-agent", agent_type="FileManager")
monitor.register_agent(agent_id="email-agent", agent_type="EmailSender")

配置详解:深度定制HumanLayer

features.yaml 完整配置

# config/features.yaml

# 应用基本信息
application:
  name: "MyAIApplication"
  version: "1.0.0"
  environment: "production"

# HumanLayer核心配置
humanlayer:
  # 是否启用
  enabled: true

  # 默认审批模式
  default_mode: "manual"  # manual | auto | monitor

  # 审批超时设置
  timeout:
    default_seconds: 300
    max_seconds: 3600
    escalation_enabled: true
    escalation_after_seconds: 600

  # 审批结果缓存
  approval_cache:
    enabled: true
    ttl_seconds: 3600
    cache_by_user: true

# 功能开关
features:
  # 文件系统操作
  file_system:
    enabled: true
    root_path: "/app/uploads"
    max_file_size_mb: 100
    allowed_operations:
      - read
      - write
      - list
    blocked_operations:
      - delete
      - execute
    require_approval:
      - write
      - delete
      - external_access

  # 网络请求
  network:
    enabled: true
    allowed_domains:
      - "*.company.com"
      - "api.trusted-partner.com"
    blocked_domains:
      - "*.banned.com"
    require_approval:
      - post_requests
      - external_calls

  # 数据库操作
  database:
    enabled: true
    allowed_queries:
      - SELECT
      - INSERT
    blocked_queries:
      - DROP
      - DELETE
      - TRUNCATE
    require_approval:
      - bulk_operations
      - multi_table_joins

  # 邮件发送
  email:
    enabled: true
    allowed_recipients:
      - "@company.com"
      - "@partner.com"
    require_approval_for_external: true
    max_recipients_per_email: 50

# 风险评估规则
risk_assessment:
  # 基于参数的规则
  parameter_rules:
    - name: "sensitive_data_detection"
      check: "contains_pattern"
      patterns:
        - "password"
        - "api_key"
        - "ssn"
        - "credit_card"
      risk_adjustment: "increase_by_one_level"

    - name: "large_value_check"
      parameter: "amount"
      condition: ">10000"
      risk_adjustment: "increase_to_high"

  # 基于上下文的规则
  context_rules:
    - name: "after_hours"
      check: "time_outside_business_hours"
      business_hours:
        start: "09:00"
        end: "18:00"
        timezone: "Asia/Shanghai"
      risk_adjustment: "increase_by_one_level"

    - name: "new_user"
      check: "account_age_days_less_than"
      threshold: 7
      risk_adjustment: "require_secondary_approval"

# 审计日志配置
audit:
  enabled: true
  storage: "database"  # database | file | cloud
  retention_days: 365

  # 日志详情级别
  detail_level: "full"  # minimal | standard | full

  # 包含字段
  include_fields:
    - timestamp
    - user_id
    - operation
    - parameters
    - approval_result
    - execution_result
    - execution_time
    - risk_score

  # 敏感数据处理
  sensitive_data_handling:
    mode: "mask"  # mask | hash | exclude
    mask_character: "*"
    fields_to_mask:
      - password
      - api_key
      - credit_card

# 通知配置
notifications:
  channels:
    - type: "slack"
      enabled: true
      webhook_url: "${SLACK_WEBHOOK_URL}"

    - type: "email"
      enabled: true
      smtp_config:
        host: "${SMTP_HOST}"
        port: 587

    - type: "webhook"
      enabled: true
      endpoints:
        - "https://internal-api.company.com/audit"

  # 通知触发条件
  triggers:
    on_approval_request: true
    on_approval_completed: true
    on_rejection: true
    on_escalation: true
    on_anomaly: true

总结与资源

核心要点回顾

┌─────────────────────────────────────────────────────────────┐
│                    12-Factor-Agents 核心要点                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1️⃣  Human-in-the-Loop                                       │
│      关键操作必须经过人工审批                               │
│      风险越高,审批越严格                                   │
│                                                             │
│  2️⃣  配置与代码分离                                          │
│      使用 YAML/JSON 配置文件管理审批规则                    │
│      同一套代码支持不同环境(dev/staging/prod              │
│                                                             │
│  3️⃣  完整审计日志                                            │
│      记录每一次操作的完整上下文                             │
│      支持事后追溯和合规审计                                 │
│                                                             │
│  4️⃣  风险分级管理                                            │
│      低风险:自动通过或仅通知                               │
│      中风险:单人多级审批                                   │
│      高风险:双人审批或更高层级审批                         │
│                                                             │
│  5️⃣  灵活的集成能力                                          │
│      支持 LangChainAutoGen 等主流框架                    │
│      易于扩展新的工具和功能                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

相关资源链接

官方资源:

  • GitHub 仓库:https://github.com/humanlayer/12-factor-agents
  • 官方文档:https://humanlayer.dev/docs
  • 示例代码:https://github.com/humanlayer/12-factor-agents/tree/main/examples

延伸阅读:

  • 12-Factor App 方法论:https://12factor.net/zh_cn/
  • LangChain 文档:https://docs.langchain.com/
  • Pydantic 数据验证:https://docs.pydantic.dev/

适用场景总结

适合使用 12-Factor-Agents 的场景:
✓ 企业级AI应用,需要合规审计
✓ 需要人工审批关键业务操作
✓ 对数据安全和隐私有严格要求
✓ 需要完整操作记录和追溯能力
✓ 多用户协作的AI Agent系统

不太适合的场景:
✗ 完全自动化的无监督场景
✗ 实时性要求极高(毫秒级响应)
✗ 操作频率极高且审批会成为瓶颈

构建安全可控的AI Agent,从理解HumanLayer开始。

这个框架为AI应用开发提供了一套实用的安全治理方案,让我们在充分发挥AI能力的同时,也能保持对系统的有效控制。希望这篇教程能帮助你快速上手构建企业级的AI Agent应用!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注