🎯 当「12-Factor」遇上AI Agent:人类层框架让大模型应用可控可追溯
别再让AI Agent自由发挥了,「人类层」才是企业级AI落地的最优解
为什么值得关注:AI Agent需要「刹车」和「方向盘」
在AI应用爆发的今天,大模型Agent正在以惊人的速度渗透到各个业务场景。但问题也随之而来:
当你让一个AI Agent自动执行操作时,你真的知道它会做什么吗?
- 它会不会在执行过程中泄露敏感数据?
- 它会不会在关键时刻做出错误的业务决策?
- 它会不会在不可逆操作(如删除、支付)上「一路绿灯」?
这正是 humanlayer/12-factor-agents 要解决的核心问题。这个框架将经典的「12-Factor App」方法论与AI Agent开发完美结合,为我们提供了一套构建可控、可追溯、有人类监督的AI Agent应用的完整方案。
这个项目的核心价值
传统Agent vs 12-Factor-Agents
┌─────────┐ ┌─────────────────┐
│ 直接执行 │ →→→ │ 执行前需人类确认 │
│ 无审计 │ →→→ │ 完整操作日志 │
│ 风险不可控│ →→→ │ 多层安全防护 │
└─────────┘ └─────────────────┘
关键特性一览:
- 提供「人类在环」(Human-in-the-Loop) 机制,确保关键操作经过人工审批
- 内置完整的操作审计日志,记录每个AI决策的上下文
- 支持函数调用前的条件拦截和参数验证
- 开箱即用的Approval Flow,灵活配置审批规则
- 兼容主流AI Agent框架(LangChain、AutoGen等)
环境搭建:从零开始搭建开发环境
系统要求
在开始之前,确保你的开发环境满足以下要求:
- Python 3.9 或更高版本
- pip 包管理器
- 推荐的虚拟环境工具(venv 或 conda)
安装步骤
第一步:创建虚拟环境(推荐)
# 使用 venv 创建隔离环境
python -m venv agent-env
# 激活虚拟环境
# Linux/macOS
source agent-env/bin/activate
# Windows
agent-env\Scripts\activate
第二步:安装核心依赖
# 安装 humanlayer 核心包
pip install humanlayer
# 安装必要的AI框架支持
pip install langchain openai anthropic
# 安装可选的日志和数据处理库
pip install loguru pydantic
第三步:验证安装
# 创建一个验证脚本:verify_install.py
import humanlayer
# 检查版本
print(f"HumanLayer 版本: {humanlayer.__version__}")
# 检查核心功能是否可用
from humanlayer import HumanLayer
hl = HumanLayer()
print("✅ HumanLayer 安装成功!")
运行验证脚本:
python verify_install.py
你应该看到类似输出:
HumanLayer 版本: 0.1.x
✅ HumanLayer 安装成功!
项目结构初始化
创建一个标准的12-Factor-Agents项目结构:
# 创建项目目录
mkdir my-first-agent && cd my-first-agent
# 创建目录结构
mkdir -p src/agents
mkdir -p config
mkdir -p logs
mkdir -p tests
# 创建核心文件
touch src/__init__.py
touch src/agents/__init__.py
touch config/approvals.yaml
touch config/features.yaml
核心功能详解:深入理解HumanLayer架构
1. Human-in-the-Loop 审批机制
HumanLayer的核心是「审批网关」(Approval Gateway)。在AI Agent执行敏感操作前,系统会自动暂停并等待人工确认。
# 审批流程示意
"""
Agent意图识别 → 触发审批条件 → 暂停执行
↓
┌─────────────────────────┐
│ 审批网关 │
│ ┌───────────────────┐ │
│ │ 操作描述: 删除文件 │ │
│ │ 风险等级: 🔴 高 │ │
│ │ 参数: /data/tmp/* │ │
│ └───────────────────┘ │
│ │
│ [批准] [拒绝] [修改参数] │
└─────────────────────────┘
↓
批准 → 执行 / 拒绝 → 终止
"""
2. 函数调用拦截器 (Function Call Interceptor)
拦截器是12-Factor-Agents的核心组件,它在函数执行前进行多维度检查:
# 拦截器检查流程
class FunctionInterceptor:
"""
多层检查机制:
Layer 1: 功能白名单检查
Layer 2: 参数类型和范围验证
Layer 3: 敏感信息脱敏
Layer 4: 风险评分计算
Layer 5: 人工审批触发(如果需要)
"""
def __init__(self, config):
self.config = config
self.approval_rules = config.get("approval_rules", {})
def should_approve(self, function_name, parameters):
"""
判断是否需要人工审批
返回: (needs_approval: bool, reason: str)
"""
# 检查是否为高风险操作
if self.is_high_risk_operation(function_name):
return True, f"高风险操作: {function_name}"
# 检查参数是否包含敏感数据
if self.contains_sensitive_data(parameters):
return True, "参数包含敏感信息"
# 检查是否超过操作阈值
if self.exceeds_threshold(parameters):
return True, f"参数超过预设阈值"
return False, "无需审批"
3. 审计日志系统
完整的操作记录是合规和排查问题的关键:
# 日志数据结构
{
"timestamp": "2024-01-15T10:30:45.123Z",
"agent_id": "agent-001",
"session_id": "session-abc123",
"function_call": {
"name": "send_email",
"parameters": {
"to": "user@example.com",
"subject": "订单确认",
"body": "您的订单已处理"
}
},
"approval": {
"status": "approved", # approved, rejected, modified
"approver": "admin@example.com",
"approval_time": "2024-01-15T10:31:00.000Z",
"comment": "确认无误"
},
"execution": {
"start_time": "2024-01-15T10:31:00.500Z",
"end_time": "2024-01-15T10:31:02.100Z",
"status": "success",
"result": "邮件发送成功"
}
}
4. 配置驱动的能力体系 (Features)
12-Factor方法论强调「配置与代码分离」,HumanLayer完美实践了这一点:
# config/features.yaml
features:
# 文件操作配置
file_operations:
enabled: true
max_file_size_mb: 100
allowed_extensions: [".txt", ".csv", ".json"]
require_approval_for:
- delete
- overwrite
- external_access
# 通信功能配置
communications:
enabled: true
approved_recipients:
- "@company.com"
require_approval_for:
- external_emails
- scheduled_messages
# 数据访问配置
data_access:
enabled: true
allowed_databases:
- "analytics_db"
- "user_db"
require_approval_for:
- user_pii_access
- bulk_queries
实战教程:从入门到精通
项目一:构建带审批流程的文件管理Agent
让我们从头开始,构建一个企业级的文件管理AI助手。
第一步:定义工具函数
# src/agents/tools.py
from typing import List, Dict, Any
import os
from pathlib import Path
from humanlayer import human_approved
class FileManagerTools:
"""
文件管理工具集
每个高风险操作都通过 @human_approved 装饰器进行保护
"""
@human_approved(
name="read_file",
description="读取指定路径的文件内容",
risk_level="low",
requires_approval_if={"path": {"contains": "secret"}}
)
def read_file(self, path: str) -> Dict[str, Any]:
"""
读取文件内容
Args:
path: 文件路径
Returns:
包含文件内容的字典
"""
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"status": "success",
"path": path,
"content": content,
"size": len(content)
}
except FileNotFoundError:
return {"status": "error", "message": f"文件不存在: {path}"}
except Exception as e:
return {"status": "error", "message": str(e)}
@human_approved(
name="write_file",
description="创建或覆盖指定文件",
risk_level="medium",
always_requires_approval=True
)
def write_file(self, path: str, content: str) -> Dict[str, Any]:
"""
写入文件
Args:
path: 文件路径
content: 文件内容
Returns:
操作结果
"""
# 参数预检查
if not path:
return {"status": "error", "message": "路径不能为空"}
# 确保目录存在
Path(path).parent.mkdir(parents=True, exist_ok=True)
try:
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return {
"status": "success",
"path": path,
"bytes_written": len(content.encode('utf-8'))
}
except Exception as e:
return {"status": "error", "message": str(e)}
@human_approved(
name="delete_file",
description="删除指定文件",
risk_level="high",
always_requires_approval=True,
confirmation_template="确认删除文件 {path}?此操作不可撤销。"
)
def delete_file(self, path: str) -> Dict[str, Any]:
"""
删除文件(不可恢复)
Args:
path: 要删除的文件路径
Returns:
操作结果
"""
try:
if not os.path.exists(path):
return {"status": "error", "message": f"文件不存在: {path}"}
os.remove(path)
return {
"status": "success",
"message": f"文件已删除: {path}"
}
except Exception as e:
return {"status": "error", "message": str(e)}
@human_approved(
name="list_directory",
description="列出目录内容",
risk_level="low"
)
def list_directory(self, path: str = ".") -> Dict[str, Any]:
"""
列出目录下的文件和文件夹
Args:
path: 目录路径
Returns:
目录内容列表
"""
try:
items = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
items.append({
"name": item,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": os.path.getsize(item_path) if os.path.isfile(item_path) else None
})
return {
"status": "success",
"path": path,
"items": items
}
except Exception as e:
return {"status": "error", "message": str(e)}
第二步:配置审批规则
# config/approvals.yaml
approval_rules:
# 全局设置
global:
default_risk_level: "low"
timeout_seconds: 300
escalation_enabled: true
# 风险等级定义
risk_levels:
low:
auto_approve: true
notification_only: false
medium:
auto_approve: false
require_one_approval: true
high:
auto_approve: false
require_two_approvals: true
timeout_escalation: "supervisor@example.com"
# 功能特定的规则
function_rules:
delete_file:
risk_level: "high"
always_approve: false
preview_changes: true
write_file:
risk_level: "medium"
preview_content: true
size_limit_mb: 50
read_file:
risk_level: "low"
auto_approve: true
log_access: true
第三步:创建Agent主类
# src/agents/file_agent.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from humanlayer import HumanLayer, ApprovalRequest, ApprovalResponse
from .tools import FileManagerTools
@dataclass
class AgentResponse:
"""Agent响应数据类"""
success: bool
message: str
data: Optional[Dict[str, Any]] = None
approval_requested: bool = False
execution_id: Optional[str] = None
@dataclass
class AgentConfig:
"""Agent配置"""
agent_name: str = "FileManagerAgent"
max_iterations: int = 10
enable_logging: bool = True
log_path: str = "logs/agent.log"
class FileManagerAgent:
"""
企业级文件管理Agent
特性:
- 所有文件操作都经过审批网关
- 完整的操作审计日志
- 敏感文件访问保护
- 操作历史追踪
"""
def __init__(self, config: Optional[AgentConfig] = None):
self.config = config or AgentConfig()
self.tools = FileManagerTools()
self.human_layer = HumanLayer()
# 初始化日志
if self.config.enable_logging:
from loguru import logger
logger.add(
self.config.log_path,
rotation="10 MB",
retention="7 days",
level="INFO"
)
self.logger = logger
# 操作历史
self.execution_history: List[Dict[str, Any]] = []
def _log_operation(self, operation: str, params: Dict, result: Any):
"""记录操作日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
"parameters": params,
"result": result
}
self.execution_history.append(log_entry)
if self.config.enable_logging:
self.logger.info(f"操作: {operation}, 参数: {params}")
def process_request(self, request: str) -> AgentResponse:
"""
处理用户请求
Args:
request: 自然语言请求
Returns:
AgentResponse: 处理结果
"""
# 意图识别
intent = self._identify_intent(request)
# 提取参数
params = self._extract_parameters(request, intent)
# 执行操作(会触发审批流程)
try:
result = self._execute_with_approval(intent, params)
self._log_operation(intent, params, result)
return result
except Exception as e:
return AgentResponse(
success=False,
message=f"执行出错: {str(e)}"
)
def _identify_intent(self, request: str) -> str:
"""识别用户意图"""
request_lower = request.lower()
if any(word in request_lower for word in ["读取", "查看", "打开", "read"]):
return "read_file"
elif any(word in request_lower for word in ["写入", "创建", "保存", "write"]):
return "write_file"
elif any(word in request_lower for word in ["删除", "remove", "delete"]):
return "delete_file"
elif any(word in request_lower for word in ["列出", "列表", "list", "ls"]):
return "list_directory"
else:
return "unknown"
def _extract_parameters(self, request: str, intent: str) -> Dict[str, Any]:
"""从请求中提取参数"""
params = {}
# 简单的参数提取逻辑
# 实际应用中可以使用更复杂的NLP处理
if intent == "read_file":
# 提取文件路径
import re
path_match = re.search(r'["\'](.+?)["\']|([\w./]+)', request)
if path_match:
params["path"] = path_match.group(1) or path_match.group(2)
elif intent == "write_file":
import re
path_match = re.search(r'["\'](.+?)["\']', request)
if path_match:
params["path"] = path_match.group(1)
# 提取内容
content_match = re.search(r'内容[::]\s*["\'](.+?)["\']', request)
if content_match:
params["content"] = content_match.group(1)
elif intent == "delete_file":
import re
path_match = re.search(r'["\'](.+?)["\']|([\w./]+)', request)
if path_match:
params["path"] = path_match.group(1) or path_match.group(2)
elif intent == "list_directory":
import re
path_match = re.search(r'目录[::]\s*["\']?([\w./]*?)["\']?', request)
if path_match:
params["path"] = path_match.group(1)
else:
params["path"] = "."
return params
def _execute_with_approval(self, intent: str, params: Dict) -> AgentResponse:
"""通过审批流程执行操作"""
# 获取工具方法
tool_method = getattr(self.tools, intent, None)
if not tool_method:
return AgentResponse(
success=False,
message=f"未知操作: {intent}"
)
# 通过HumanLayer执行,会自动处理审批流程
result = tool_method(**params)
if result.get("status") == "error":
return AgentResponse(
success=False,
message=result.get("message", "未知错误")
)
return AgentResponse(
success=True,
message="操作成功",
data=result,
approval_requested=result.get("_approval_requested", False)
)
def get_history(self) -> List[Dict[str, Any]]:
"""获取操作历史"""
return self.execution_history
第四步:创建命令行界面
# src/cli.py
import sys
from src.agents.file_agent import FileManagerAgent, AgentConfig
def main():
"""命令行入口"""
print("=" * 50)
print("🤖 企业级文件管理Agent")
print("=" * 50)
print("提示:所有文件操作都受HumanLayer保护")
print("输入 'help' 查看可用命令,输入 'quit' 退出")
print("-" * 50)
# 初始化Agent
config = AgentConfig(
agent_name="FileManagerAgent",
enable_logging=True,
log_path="logs/agent.log"
)
agent = FileManagerAgent(config)
while True:
try:
user_input = input("\n📝 请输入指令: ").strip()
if not user_input:
continue
if user_input.lower() in ["quit", "exit", "退出"]:
print("再见!👋")
break
if user_input.lower() == "help":
print_help()
continue
if user_input.lower() == "history":
show_history(agent)
continue
# 处理请求
response = agent.process_request(user_input)
# 输出结果
print_result(response)
except KeyboardInterrupt:
print("\n\n操作已取消")
break
except Exception as e:
print(f"❌ 错误: {e}")
def print_help():
"""打印帮助信息"""
help_text = """
可用命令:
---------
1. 读取文件: "读取文件 '/path/to/file.txt'"
2. 写入文件: "写入文件 '/path/to/file.txt' 内容: '文件内容'"
3. 删除文件: "删除文件 '/path/to/file.txt'"
4. 列出目录: "列出目录 '/path/to/dir'"
5. 查看历史: "history"
6. 获取帮助: "help"
7. 退出程序: "quit"
示例:
-----
读取文件 '/etc/hosts'
写入文件 'test.txt' 内容: 'Hello World'
列出目录 '.'
"""
print(help_text)
def show_history(agent):
"""显示操作历史"""
history = agent.get_history()
if not history:
print("暂无操作记录")
return
print("\n📋 操作历史:")
print("-" * 50)
for i, entry in enumerate(history, 1):
print(f"{i}. [{entry['timestamp']}]")
print(f" 操作: {entry['operation']}")
print(f" 参数: {entry['parameters']}")
print(f" 结果: {entry['result'].get('status', 'unknown')}")
print()
def print_result(response):
"""打印执行结果"""
if response.success:
print(f"✅ {response.message}")
if response.data:
if response.approval_requested:
print("⏳ 此操作需要人工审批")
print(f"📦 数据: {response.data}")
else:
print(f"❌ {response.message}")
if __name__ == "__main__":
main()
第五步:运行测试
# 运行Agent
python -m src.cli
测试场景:
# 测试1:列出目录(低风险,自动通过)
输入: 列出目录 '.'
# 测试2:写入文件(需要审批)
输入: 写入文件 'test.txt' 内容: '这是一次测试写入'
# 测试3:删除文件(高风险,需要双重审批)
输入: 删除文件 'test.txt'
项目二:构建安全的邮件发送Agent
完整的邮件Agent实现
# src/agents/email_agent.py
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime
from humanlayer import human_approved, HumanLayer, ApprovalContext
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
@dataclass
class EmailConfig:
"""邮件配置"""
smtp_server: str
smtp_port: int
sender_email: str
sender_password: str
use_tls: bool = True
@dataclass
class Email:
"""邮件数据类"""
to: str
subject: str
body: str
cc: Optional[List[str]] = None
bcc: Optional[List[str]] = None
class EmailAgent:
"""
企业邮件发送Agent
安全特性:
- 所有外发邮件需要审批
- 限制可发送的收件人范围
- 自动记录所有邮件往来
- 敏感内容检测和警告
"""
# 允许发送的域名白名单
ALLOWED_DOMAINS = [
"company.com",
"partner.com",
"client.com"
]
# 禁止发送的关键词
BLOCKED_KEYWORDS = [
"机密", "confidential",
"password", "密码",
"secret", "密钥"
]
def __init__(self, config: EmailConfig):
self.config = config
self.human_layer = HumanLayer()
self.sent_emails: List[Dict[str, Any]] = []
@human_approved(
name="send_email",
description="发送电子邮件",
risk_level="high",
always_requires_approval=True,
approval_timeout_seconds=600
)
def send_email(self, email: Email) -> Dict[str, Any]:
"""
发送邮件(需要人工审批)
Args:
email: Email对象
Returns:
发送结果
"""
# 前置检查
validation_result = self._validate_email(email)
if not validation_result["valid"]:
return {
"status": "error",
"message": validation_result["message"]
}
# 执行发送
try:
result = self._send_via_smtp(email)
# 记录已发送邮件
self._log_sent_email(email, result)
return {
"status": "success",
"message": f"邮件已发送至 {email.to}",
"message_id": result.get("message_id")
}
except Exception as e:
return {
"status": "error",
"message": f"发送失败: {str(e)}"
}
def _validate_email(self, email: Email) -> Dict[str, Any]:
"""验证邮件内容"""
# 检查收件人域名
recipient_domain = email.to.split("@")[-1] if "@" in email.to else ""
if recipient_domain not in self.ALLOWED_DOMAINS:
return {
"valid": False,
"message": f"收件人域名不在白名单中: {recipient_domain}"
}
# 检查敏感关键词
combined_text = f"{email.subject} {email.body}".lower()
for keyword in self.BLOCKED_KEYWORDS:
if keyword.lower() in combined_text:
return {
"valid": False,
"message": f"邮件内容包含敏感词: {keyword}"
}
# 检查邮件内容长度
if len(email.body) > 50000:
return {
"valid": False,
"message": "邮件内容过长,请精简"
}
return {"valid": True}
def _send_via_smtp(self, email: Email) -> Dict[str, Any]:
"""通过SMTP发送邮件"""
msg = MIMEMultipart()
msg['From'] = self.config.sender_email
msg['To'] = email.to
msg['Subject'] = email.subject
if email.cc:
msg['Cc'] = ",".join(email.cc)
msg.attach(MIMEText(email.body, 'plain', 'utf-8'))
# 建立SMTP连接
server = smtplib.SMTP(self.config.smtp_server, self.config.smtp_port)
if self.config.use_tls:
server.starttls()
server.login(self.config.sender_email, self.config.sender_password)
# 发送邮件
recipients = [email.to]
if email.cc:
recipients.extend(email.cc)
server.send_message(msg)
server.quit()
return {
"message_id": f"{datetime.now().timestamp()}",
"sent_at": datetime.now().isoformat()
}
def _log_sent_email(self, email: Email, result: Dict):
"""记录已发送邮件"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"to": email.to,
"subject": email.subject,
"body_preview": email.body[:100] + "..." if len(email.body) > 100 else email.body,
"result": result
}
self.sent_emails.append(log_entry)
def get_sent_history(self) -> List[Dict[str, Any]]:
"""获取发送历史"""
return self.sent_emails
# 使用示例
if __name__ == "__main__":
# 配置
config = EmailConfig(
smtp_server="smtp.company.com",
smtp_port=587,
sender_email="agent@company.com",
sender_password="your-password-here",
use_tls=True
)
# 创建Agent
agent = EmailAgent(config)
# 准备邮件
email = Email(
to="colleague@company.com",
subject="周报总结",
body="本周工作进展顺利,已完成核心功能开发。"
)
# 发送邮件(会触发审批流程)
result = agent.send_email(email)
print(result)
常见使用场景与案例分析
场景一:企业客服Agent
需求:构建一个客服AI,能够回答客户问题,但在以下情况需要人工介入:
- 涉及退款、投诉升级
- 需要访问客户敏感信息
- 回复内容包含法律建议
# src/agents/customer_service_agent.py
from humanlayer import human_approved, HumanLayer
from enum import Enum
class IntentType(Enum):
GENERAL_INQUIRY = "general_inquiry"
REFUND_REQUEST = "refund_request"
COMPLAINT = "complaint"
SENSITIVE_ACCESS = "sensitive_access"
LEGAL_ADVICE = "legal_advice"
class CustomerServiceAgent:
"""
企业客服Agent
智能路由:
- 简单问题 → AI直接回复
- 敏感操作 → 转人工审批
- 高风险请求 → 升级处理
"""
def __init__(self):
self.human_layer = HumanLayer()
@human_approved(
name="process_refund",
description="处理退款请求",
risk_level="high",
always_requires_approval=True,
approver_role="supervisor"
)
def process_refund(self, order_id: str, amount: float, reason: str):
"""处理退款"""
# 退款逻辑
pass
@human_approved(
name="access_customer_pii",
description="访问客户个人信息",
risk_level="medium",
requires_approval_if={"amount": {">": 10000}}
)
def access_customer_pii(self, customer_id: str, fields: list):
"""访问客户敏感信息"""
# 数据访问逻辑
pass
@human_approved(
name="escalate_to_legal",
description="升级为法律咨询",
risk_level="high",
always_requires_approval=True
)
def escalate_to_legal(self, ticket_id: str, description: str):
"""转交法律部门"""
# 升级逻辑
pass
场景二:数据分析Agent
需求:构建数据分析Agent,能够:
- 执行SQL查询(需要审批高风险查询)
- 生成可视化图表
- 导出数据报告
# src/agents/data_analytics_agent.py
from humanlayer import human_approved
class DataAnalyticsAgent:
"""
数据分析Agent
安全策略:
- 只读查询自动通过
- JOIN多表需要审批
- 涉及PII字段需要审批
- 批量导出需要双重审批
"""
@human_approved(
name="execute_read_query",
description="执行只读数据查询",
risk_level="low"
)
def execute_read_query(self, query: str):
"""执行只读查询"""
# 确保是SELECT语句
assert query.strip().upper().startswith("SELECT")
# 执行查询
pass
@human_approved(
name="execute_join_query",
description="执行多表JOIN查询",
risk_level="medium",
always_requires_approval=True
)
def execute_join_query(self, query: str):
"""执行涉及多表的JOIN查询"""
pass
@human_approved(
name="export_data",
description="导出数据分析报告",
risk_level="high",
always_requires_approval=True,
require_two_approvals=True
)
def export_data(self, format: str, destination: str):
"""导出数据"""
pass
场景三:代码审查Agent
# src/agents/code_review_agent.py
from humanlayer import human_approved
class CodeReviewAgent:
"""
代码审查Agent
审查策略:
- 自动检查代码风格
- 安全漏洞自动标记
- 高风险修改需要人工确认
- 自动化重构需要双重审批
"""
@human_approved(
name="suggest_refactoring",
description="建议代码重构",
risk_level="medium",
always_requires_approval=True,
show_diff_preview=True
)
def suggest_refactoring(self, file_path: str, suggestion: str):
"""建议代码重构"""
pass
@human_approved(
name="auto_fix_security",
description="自动修复安全漏洞",
risk_level="high",
always_requires_approval=True,
require_two_approvals=True
)
def auto_fix_security(self, vulnerability_id: str):
"""自动修复安全漏洞"""
pass
@human_approved(
name="apply_code_change",
description="直接应用代码修改",
risk_level="critical",
always_requires_approval=True,
approver_role="senior_developer"
)
def apply_code_change(self, changes: list):
"""直接应用代码修改"""
pass
最佳实践与高级技巧
实践一:优雅的审批流程设计
# 高级审批配置示例
from humanlayer import ApprovalStrategy, RiskLevel
# 条件化审批规则
approval_config = {
# 基于金额的审批策略
"amount_threshold": {
"low": {"max": 1000, "auto_approve": True},
"medium": {"min": 1000, "max": 10000, "require_one": True},
"high": {"min": 10000, "require_two": True}
},
# 基于时间的审批策略
"time_based": {
"business_hours": {"auto_approve": True},
"after_hours": {"require_one": True},
"weekend": {"require_two": True}
},
# 基于用户角色的审批策略
"role_based": {
"new_user": {"require_one": True},
"verified_user": {"auto_approve_threshold": "medium"},
"premium_user": {"elevated_limits": True}
}
}
@human_approved(
name="process_payment",
description="处理支付",
dynamic_approval_strategy=approval_config
)
def process_payment(amount: float, recipient: str):
"""智能审批:根据金额、时间、用户角色动态决定审批流程"""
pass
实践二:批量操作的安全处理
# 批量操作审批策略
@human_approved(
name="batch_process",
description="批量处理数据",
risk_level="high",
batch_mode=True, # 启用批量模式
batch_preview_limit=10, # 预览前10项
batch_require_full_approval=True # 需要对全部项目审批
)
def batch_process(items: list):
"""批量操作需要完整预览和审批"""
pass
实践三:与LangChain集成
# 与LangChain的集成
from langchain.agents import Agent
from langchain.tools import Tool
from humanlayer import HumanLayer
class HumanInTheLoopAgent(Agent):
"""
集成HumanLayer的LangChain Agent
在执行Tool之前自动进行审批检查
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.human_layer = HumanLayer()
def _execute_tool(self, tool_name: str, tool_input: str):
"""在执行工具前进行审批"""
# 检查是否需要审批
needs_approval, reason = self.human_layer.check_approval_required(
tool_name,
tool_input
)
if needs_approval:
# 请求人工审批
approved = self.human_layer.request_approval(
tool_name=tool_name,
parameters=tool_input,
reason=reason
)
if not approved:
return {"error": "操作被拒绝"}
# 执行工具
return super()._execute_tool(tool_name, tool_input)
实践四:自定义审批UI
# 自定义审批界面组件
from humanlayer import ApprovalUI, ApprovalCard
class CustomApprovalUI(ApprovalUI):
"""自定义审批界面"""
def render_approval_request(self, request: ApprovalRequest):
"""渲染审批卡片"""
card = ApprovalCard(
title=f"⚠️ {request.operation_name} 需要审批",
sections=[
{
"操作类型": request.operation_name,
"描述": request.description,
"风险等级": self._render_risk_badge(request.risk_level),
"参数": request.parameters,
"上下文": request.context
}
],
actions=[
{"label": "✅ 批准", "style": "success"},
{"label": "❌ 拒绝", "style": "danger"},
{"label": "📝 带备注批准", "action": "approve_with_comment"},
{"label": "✏️ 修改参数", "action": "modify_params"}
]
)
return card.render()
def _render_risk_badge(self, level: str):
"""渲染风险等级徽章"""
colors = {
"low": "green",
"medium": "yellow",
"high": "orange",
"critical": "red"
}
return f"<span class='badge badge-{colors.get(level, 'gray')}'>{level.upper()}</span>"
实践五:监控与告警
# 设置监控告警
from humanlayer import Monitor, AlertChannel
monitor = Monitor(
alert_channels=[
AlertChannel(
type="slack",
webhook_url="https://hooks.slack.com/...",
events=["rejection", "escalation", "anomaly"]
),
AlertChannel(
type="email",
recipients=["security@company.com"],
events=["critical_operation", "pattern_detected"]
)
],
# 异常检测规则
anomaly_detection={
"rapid_rejections": {"threshold": 5, "window_minutes": 10},
"high_risk_cluster": {"min_high_risk_operations": 10, "window_hours": 1},
"approval_bypass_attempt": {"enabled": True}
}
)
# 启动监控
monitor.start()
# 注册Agent
monitor.register_agent(agent_id="file-agent", agent_type="FileManager")
monitor.register_agent(agent_id="email-agent", agent_type="EmailSender")
配置详解:深度定制HumanLayer
features.yaml 完整配置
# config/features.yaml
# 应用基本信息
application:
name: "MyAIApplication"
version: "1.0.0"
environment: "production"
# HumanLayer核心配置
humanlayer:
# 是否启用
enabled: true
# 默认审批模式
default_mode: "manual" # manual | auto | monitor
# 审批超时设置
timeout:
default_seconds: 300
max_seconds: 3600
escalation_enabled: true
escalation_after_seconds: 600
# 审批结果缓存
approval_cache:
enabled: true
ttl_seconds: 3600
cache_by_user: true
# 功能开关
features:
# 文件系统操作
file_system:
enabled: true
root_path: "/app/uploads"
max_file_size_mb: 100
allowed_operations:
- read
- write
- list
blocked_operations:
- delete
- execute
require_approval:
- write
- delete
- external_access
# 网络请求
network:
enabled: true
allowed_domains:
- "*.company.com"
- "api.trusted-partner.com"
blocked_domains:
- "*.banned.com"
require_approval:
- post_requests
- external_calls
# 数据库操作
database:
enabled: true
allowed_queries:
- SELECT
- INSERT
blocked_queries:
- DROP
- DELETE
- TRUNCATE
require_approval:
- bulk_operations
- multi_table_joins
# 邮件发送
email:
enabled: true
allowed_recipients:
- "@company.com"
- "@partner.com"
require_approval_for_external: true
max_recipients_per_email: 50
# 风险评估规则
risk_assessment:
# 基于参数的规则
parameter_rules:
- name: "sensitive_data_detection"
check: "contains_pattern"
patterns:
- "password"
- "api_key"
- "ssn"
- "credit_card"
risk_adjustment: "increase_by_one_level"
- name: "large_value_check"
parameter: "amount"
condition: ">10000"
risk_adjustment: "increase_to_high"
# 基于上下文的规则
context_rules:
- name: "after_hours"
check: "time_outside_business_hours"
business_hours:
start: "09:00"
end: "18:00"
timezone: "Asia/Shanghai"
risk_adjustment: "increase_by_one_level"
- name: "new_user"
check: "account_age_days_less_than"
threshold: 7
risk_adjustment: "require_secondary_approval"
# 审计日志配置
audit:
enabled: true
storage: "database" # database | file | cloud
retention_days: 365
# 日志详情级别
detail_level: "full" # minimal | standard | full
# 包含字段
include_fields:
- timestamp
- user_id
- operation
- parameters
- approval_result
- execution_result
- execution_time
- risk_score
# 敏感数据处理
sensitive_data_handling:
mode: "mask" # mask | hash | exclude
mask_character: "*"
fields_to_mask:
- password
- api_key
- credit_card
# 通知配置
notifications:
channels:
- type: "slack"
enabled: true
webhook_url: "${SLACK_WEBHOOK_URL}"
- type: "email"
enabled: true
smtp_config:
host: "${SMTP_HOST}"
port: 587
- type: "webhook"
enabled: true
endpoints:
- "https://internal-api.company.com/audit"
# 通知触发条件
triggers:
on_approval_request: true
on_approval_completed: true
on_rejection: true
on_escalation: true
on_anomaly: true
总结与资源
核心要点回顾
┌─────────────────────────────────────────────────────────────┐
│ 12-Factor-Agents 核心要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1️⃣ Human-in-the-Loop │
│ → 关键操作必须经过人工审批 │
│ → 风险越高,审批越严格 │
│ │
│ 2️⃣ 配置与代码分离 │
│ → 使用 YAML/JSON 配置文件管理审批规则 │
│ → 同一套代码支持不同环境(dev/staging/prod) │
│ │
│ 3️⃣ 完整审计日志 │
│ → 记录每一次操作的完整上下文 │
│ → 支持事后追溯和合规审计 │
│ │
│ 4️⃣ 风险分级管理 │
│ → 低风险:自动通过或仅通知 │
│ → 中风险:单人多级审批 │
│ → 高风险:双人审批或更高层级审批 │
│ │
│ 5️⃣ 灵活的集成能力 │
│ → 支持 LangChain、AutoGen 等主流框架 │
│ → 易于扩展新的工具和功能 │
│ │
└─────────────────────────────────────────────────────────────┘
相关资源链接
官方资源:
- GitHub 仓库:https://github.com/humanlayer/12-factor-agents
- 官方文档:https://humanlayer.dev/docs
- 示例代码:https://github.com/humanlayer/12-factor-agents/tree/main/examples
延伸阅读:
- 12-Factor App 方法论:https://12factor.net/zh_cn/
- LangChain 文档:https://docs.langchain.com/
- Pydantic 数据验证:https://docs.pydantic.dev/
适用场景总结
适合使用 12-Factor-Agents 的场景:
✓ 企业级AI应用,需要合规审计
✓ 需要人工审批关键业务操作
✓ 对数据安全和隐私有严格要求
✓ 需要完整操作记录和追溯能力
✓ 多用户协作的AI Agent系统
不太适合的场景:
✗ 完全自动化的无监督场景
✗ 实时性要求极高(毫秒级响应)
✗ 操作频率极高且审批会成为瓶颈
构建安全可控的AI Agent,从理解HumanLayer开始。
这个框架为AI应用开发提供了一套实用的安全治理方案,让我们在充分发挥AI能力的同时,也能保持对系统的有效控制。希望这篇教程能帮助你快速上手构建企业级的AI Agent应用!
评论区