别再用 LangChain 手动编排了,微软 Agent Framework 才是 AI Agent 开发的最优解
一、为什么微软 Agent Framework 值得关注
在 AI Agent 开发领域,开发者们长期面临着复杂的编排困境。当我们需要让大语言模型调用工具、访问记忆、执行多步骤任务时,往往需要编写大量胶水代码,处理各种边界情况,维护成本极高。微软 Agent Framework 的出现,正是为了解决这一核心痛点。
这个项目基于微软 Semantic Kernel 核心引擎构建,提供了一套优雅的抽象层,让 AI Agent 的开发变得模块化、可组合、易维护。与其他框架相比,它的优势在于:对企业级应用场景的深度支持、与现有 .NET 生态的天然融合、以及经过微软产品团队实际验证的稳定性。
更重要的是,Agent Framework 不仅仅是另一个 LLM 编排工具,它代表了一种全新的开发范式:通过将 AI 能力封装为可复用的插件,让开发者可以像搭积木一样构建复杂的智能应用。这种设计思路极大地降低了 AI 应用的开发门槛,同时保证了代码的可测试性和可维护性。
二、环境搭建:从零开始配置开发环境
在开始之前,我们需要确保开发环境满足基本要求。推荐使用 Python 3.10 或更高版本,因为新版本的类型提示和异步支持能让代码更加清晰。以下是完整的环境配置步骤。
首先,创建一个独立的虚拟环境是最佳实践,这样可以避免依赖冲突。使用 conda 或 venv 都可以,根据个人偏好选择即可。
# 创建并激活虚拟环境(使用 conda)
conda create -n agent-framework python=3.11
conda activate agent-framework
# 或者使用 venv
python -m venv agent-env
source agent-env/bin/activate # Linux/Mac
agent-env\Scripts\activate # Windows
环境激活后,接下来安装 Agent Framework 的核心依赖包。通过 pip install 命令可以快速完成安装,注意要同时安装 semantic-kernel 以获得完整功能。
# 安装核心框架
pip install semantic-kernel==0.9.1b1
# 安装可选依赖(根据需要选择)
pip install semantic-kernel-plugins # 官方插件集合
pip install azure-identity # Azure 认证支持
pip install openai # OpenAI API 客户端
安装完成后,创建一个配置文件来管理 API 密钥和连接参数。这不仅便于管理敏感信息,也方便在不同环境间切换。
# config.py - 配置文件示例
import os
class Settings:
"""应用配置类"""
# Azure OpenAI 配置
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "your-api-key")
AZURE_OPENAI_ENDPOINT = os.getenv(
"AZURE_OPENAI_ENDPOINT",
"https://your-resource.openai.azure.com"
)
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv(
"AZURE_OPENAI_DEPLOYMENT_NAME",
"gpt-4"
)
AZURE_OPENAI_API_VERSION = "2024-02-15-preview"
# 或者使用 OpenAI 直接配置
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key")
OPENAI_MODEL_ID = "gpt-4-turbo"
@classmethod
def validate(cls) -> bool:
"""验证配置是否完整"""
has_azure = bool(cls.AZURE_OPENAI_API_KEY and cls.AZURE_OPENAI_ENDPOINT)
has_openai = bool(cls.OPENAI_API_KEY)
return has_azure or has_openai
环境验证脚本可以帮助我们确认所有组件是否正确安装。这个步骤虽然简单,但能避免后续很多麻烦。
# verify_setup.py - 环境验证脚本
import semantic_kernel as sk
def verify_installation():
"""验证框架安装是否成功"""
print("=" * 50)
print("Agent Framework 环境验证")
print("=" * 50)
try:
# 验证核心模块
kernel = sk.Kernel()
print("[✓] 核心 Kernel 模块加载成功")
# 验证版本信息
print(f"[✓] Semantic Kernel 版本: {sk.__version__}")
# 验证插件系统
from semantic_kernel.plugins import PluginCollection
plugins = PluginCollection(kernel)
print("[✓] 插件系统初始化成功")
# 验证内存服务
from semantic_kernel.memory import SemanticTextMemory
print("[✓] 内存服务加载成功")
print("\n" + "=" * 50)
print("所有组件验证通过!可以开始开发。")
print("=" * 50)
return True
except ImportError as e:
print(f"[✗] 导入错误: {e}")
print("请重新安装依赖: pip install semantic-kernel")
return False
except Exception as e:
print(f"[✗] 验证过程出错: {e}")
return False
if __name__ == "__main__":
verify_installation()
三、核心概念详解:理解 Agent Framework 的设计哲学
理解 Agent Framework 的核心概念是掌握这个框架的关键。微软的设计理念是将复杂的 AI 能力分解为几个核心抽象,每个抽象都有明确的职责边界,通过组合这些抽象来完成复杂任务。
3.1 Kernel:智能编排的核心引擎
Kernel 是整个框架的心脏,它负责管理所有 AI 相关的资源和调度。你可以把它想象成一个操作系统内核,它管理着插件、内存、聊天历史,以及与各种 AI 服务的连接。一个应用中通常只需要一个 Kernel 实例,但根据架构设计,你也可以创建多个 Kernel 来处理不同的业务域。
# kernel_basics.py - Kernel 的基本使用
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
def create_basic_kernel():
"""创建一个基础配置的 Kernel"""
# 初始化 Kernel
kernel = sk.Kernel()
# 添加 AI 服务,这里以 OpenAI 为例
# Azure OpenAI 的配置方式类似,只是连接器不同
kernel.add_service(
OpenAIChatCompletion(
service_id="default-gpt",
ai_model_id="gpt-4-turbo",
api_key="your-api-key"
)
)
# 配置日志级别(对于调试非常有用)
kernel.Log = sk.NullLogger()
return kernel
def create_azure_kernel():
"""创建 Azure OpenAI 配置的 Kernel"""
kernel = sk.Kernel()
# Azure OpenAI 需要使用特定的连接器
from semantic_kernel.connectors.ai.azure_open_ai import (
AzureChatCompletion,
AzureTextCompletion
)
kernel.add_service(
AzureChatCompletion(
service_id="azure-gpt",
deployment_name="gpt-4",
endpoint="https://your-resource.openai.azure.com",
api_key="your-azure-key"
)
)
return kernel
Kernel 的配置非常灵活,支持多种 AI 服务的混合使用。例如,你可以同时连接 GPT-4 和 Claude,让不同的插件使用不同的模型,实现更复杂的功能。
3.2 Plugins:可复用的能力单元
插件是 Agent Framework 中最重要的概念之一。它将各种能力封装成可复用的单元,AI 模型可以像调用函数一样调用这些能力。插件的设计遵循了开放封闭原则:你不需要修改插件代码来添加新功能,只需要扩展即可。
一个插件本质上是一个类或函数的集合,每个可被 AI 调用的方法称为原生函数(Native Function)。与传统的函数调用不同,这些函数需要声明详细的功能描述,AI 模型会根据这些描述来决定何时、如何调用它们。
# plugins_example.py - 插件的定义和使用
from semantic_kernel import Kernel
from semantic_kernel.skill_definition import (
sk_function,
sk_function_context_parameter
)
from semantic_kernel.planning.basic_planner import BasicPlanner
class MathPlugin:
"""数学计算插件示例"""
def __init__(self):
self._precision = 2
@sk_function(
description="计算两个数的和,支持整数和浮点数",
name="add",
input_description="需要相加的两个数字,用逗号分隔"
)
def add(self, numbers: str) -> str:
"""执行加法运算"""
try:
a, b = map(float, numbers.split(","))
result = a + b
return str(round(result, self._precision))
except ValueError as e:
return f"输入格式错误: {e}"
@sk_function(
description="计算两个数的差",
name="subtract",
input_description="被减数和减数,用逗号分隔"
)
def subtract(self, numbers: str) -> str:
"""执行减法运算"""
try:
a, b = map(float, numbers.split(","))
result = a - b
return str(round(result, self._precision))
except ValueError as e:
return f"输入格式错误: {e}"
@sk_function(
description="计算两个数的乘积",
name="multiply",
input_description="需要相乘的两个数字,用逗号分隔"
)
def multiply(self, numbers: str) -> str:
"""执行乘法运算"""
try:
a, b = map(float, numbers.split(","))
result = a * b
return str(round(result, self._precision))
except ValueError as e:
return f"输入格式错误: {e}"
@sk_function(
description="计算两个数的商,结果保留两位小数",
name="divide",
input_description="被除数和除数,用逗号分隔"
)
def divide(self, numbers: str) -> str:
"""执行除法运算"""
try:
a, b = map(float, numbers.split(","))
if b == 0:
return "错误:除数不能为零"
result = a / b
return str(round(result, self._precision))
except ValueError as e:
return f"输入格式错误: {e}"
class DateTimePlugin:
"""日期时间处理插件"""
@sk_function(
description="获取当前日期和时间",
name="get_current_datetime",
input_description="不需要输入参数"
)
def get_current_datetime(self) -> str:
"""返回当前日期时间"""
from datetime import datetime
now = datetime.now()
return now.strftime("%Y年%m月%d日 %H:%M:%S")
@sk_function(
description="计算两个日期之间的天数差",
name="days_between",
input_description="两个日期,格式为 YYYY-MM-DD,用逗号分隔"
)
def days_between(self, dates: str) -> str:
"""计算日期差"""
from datetime import datetime
try:
date1_str, date2_str = dates.split(",")
date1 = datetime.strptime(date1_str.strip(), "%Y-%m-%d")
date2 = datetime.strptime(date2_str.strip(), "%Y-%m-%d")
delta = abs((date2 - date1).days)
return f"相差 {delta} 天"
except ValueError as e:
return f"日期格式错误: {e}"
3.3 语义函数:自然语言与代码的桥梁
除了原生函数,Agent Framework 还支持语义函数(Semantic Function)。与原生函数不同,语义函数的核心是一个 Prompt 模板,AI 模型会结合输入和上下文来生成结果。这种方式特别适合需要灵活性或创造性输出的场景。
# semantic_functions.py - 语义函数的定义和使用
import semantic_kernel as sk
def create_semantic_functions(kernel: sk.Kernel):
"""创建语义函数集合"""
# 创建提示模板目录
prompts_dir = "./prompts"
# 注册一个翻译助手函数
# 这是通过目录结构定义的语义函数
# prompts_dir/translator/skprompt.txt - 提示模板
# prompts_dir/translator/config.json - 配置文件
kernel.import_semantic_skill_from_directory(
prompts_dir,
"translator"
)
return kernel
# 目录结构示例:
# prompts/
# └── translator/
# ├── skprompt.txt
# └── config.json
# skprompt.txt 内容示例:
"""
你是一个专业的翻译助手。
任务:将用户输入的文本翻译成目标语言。
目标语言:{{$target_language}}
翻译风格:{{$style}}
原文:{{$input}}
请提供准确、自然的翻译结果。
"""
# config.json 内容示例:
"""
{
"description": "多语言翻译助手",
"completion": {
"max_tokens": 1000,
"temperature": 0.3,
"top_p": 0.8
}
}
语义函数的优势在于它的灵活性。由于核心是 Prompt 模板,你可以随时调整 AI 的行为而无需修改代码。这对于需要频繁优化的场景特别有用。
3.4 记忆系统:赋予 Agent 持久化上下文的能力
Agent Framework 的记忆系统是其核心能力之一。它允许 AI 模型在对话之间保持状态,记住用户偏好、历史交互等信息。这对于构建真正智能的助手至关重要。
# memory_system.py - 记忆系统的使用
from semantic_kernel import Kernel
from semantic_kernel.memory import (
SemanticTextMemory,
VolatileMemoryStore,
MemoryQueryResult
)
from semantic_kernel.connectors.memory.azure_cognitive_search import (
AzureCognitiveSearchMemoryStore
)
class AgentMemory:
"""Agent 记忆管理类"""
def __init__(self, kernel: Kernel, use_azure: bool = False):
self.kernel = kernel
self.use_azure = use_azure
self.memory = self._setup_memory()
def _setup_memory(self) -> SemanticTextMemory:
"""配置记忆存储"""
# 方式一:使用内存存储(仅当前会话有效)
if not self.use_azure:
memory_store = VolatileMemoryStore()
memory = SemanticTextMemory(storage=memory_store)
# 方式二:使用 Azure Cognitive Search(持久化存储)
else:
memory_store = AzureCognitiveSearchMemoryStore(
search_service_endpoint="your-endpoint",
api_key="your-key",
index_name="agent-memories"
)
memory = SemanticTextMemory(storage=memory_store)
# 将记忆系统注册到 Kernel
self.kernel.register_memory(memory)
return memory
async def save_memory(self, collection: str, key: str, text: str,
description: str = "") -> None:
"""保存信息到记忆库"""
await self.memory.save_information(
collection=collection,
id=key,
text=text,
description=description
)
print(f"已保存记忆: [{key}] {description}")
async def search_memory(self, collection: str, query: str,
limit: int = 5) -> list[MemoryQueryResult]:
"""从记忆库搜索相关信息"""
results = await self.memory.search(
collection=collection,
query=query,
limit=limit
)
return results
async def save_user_preference(self, user_id: str, preference: dict) -> None:
"""保存用户偏好设置"""
import json
preference_text = json.dumps(preference, ensure_ascii=False)
await self.save_memory(
collection="user_preferences",
key=f"user_{user_id}",
text=preference_text,
description=f"用户 {user_id} 的偏好设置"
)
async def get_user_context(self, user_id: str, query: str) -> str:
"""获取与用户相关的上下文信息"""
results = await self.search_memory(
collection="user_preferences",
query=query
)
context_parts = []
for result in results:
context_parts.append(result.text)
return "\n".join(context_parts) if context_parts else "未找到相关记忆"
四、实战教程:从零构建一个多功能 AI 助手
现在我们已经理解了核心概念,接下来通过一个完整的实战项目来巩固所学。我们将构建一个多功能 AI 助手,它具备以下能力:回答用户问题、执行计算任务、管理日程、搜索网络信息,以及记住用户偏好。
4.1 项目架构设计
在开始编码之前,先规划一下整体架构。良好的架构能让代码更容易维护和扩展。
# project_structure.py - 项目结构定义
"""
项目架构:
assistant/
├── config/
│ ├── __init__.py
│ ├── settings.py # 配置管理
│ └── prompts.py # Prompt 模板
├── plugins/
│ ├── __init__.py
│ ├── math_plugin.py # 数学计算插件
│ ├── calendar_plugin.py # 日历管理插件
│ ├── search_plugin.py # 搜索功能插件
│ └── memory_plugin.py # 记忆管理插件
├── skills/
│ └── semantic/ # 语义函数目录
├── core/
│ ├── __init__.py
│ ├── kernel_builder.py # Kernel 构建器
│ └── agent.py # Agent 基类
├── services/
│ ├── __init__.py
│ └── memory_service.py # 记忆服务
├── main.py # 入口文件
└── requirements.txt # 依赖清单
"""
# 这个示例展示了推荐的目录结构组织方式
4.2 配置管理模块
配置管理是任何项目的基础。我们创建一个灵活的配置类,支持从环境变量、配置文件和代码默认值多层次配置。
# config/settings.py - 配置管理
import os
import json
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field
@dataclass
class AIConfig:
"""AI 服务配置"""
provider: str = "azure" # azure 或 openai
api_key: str = ""
endpoint: str = ""
deployment_name: str = "gpt-4"
api_version: str = "2024-02-15-preview"
model_id: str = "gpt-4-turbo"
temperature: float = 0.7
max_tokens: int = 2000
@dataclass
class MemoryConfig:
"""记忆存储配置"""
use_azure: bool = False
connection_string: str = ""
index_name: str = "agent-memories"
@dataclass
class AppConfig:
"""应用整体配置"""
ai: AIConfig = field(default_factory=AIConfig)
memory: MemoryConfig = field(default_factory=MemoryConfig)
log_level: str = "INFO"
enable_streaming: bool = False
@classmethod
def from_env(cls) -> "AppConfig":
"""从环境变量加载配置"""
return cls(
ai=AIConfig(
provider=os.getenv("AI_PROVIDER", "azure"),
api_key=os.getenv("API_KEY", ""),
endpoint=os.getenv("AZURE_ENDPOINT", ""),
deployment_name=os.getenv("DEPLOYMENT_NAME", "gpt-4")
),
memory=MemoryConfig(
use_azure=os.getenv("USE_AZURE_MEMORY", "false").lower() == "true",
connection_string=os.getenv("AZURE_COSMOS_CONNECTION", ""),
index_name=os.getenv("MEMORY_INDEX", "agent-memories")
),
log_level=os.getenv("LOG_LEVEL", "INFO")
)
@classmethod
def from_file(cls, path: str) -> "AppConfig":
"""从配置文件加载"""
config_path = Path(path)
if not config_path.exists():
raise FileNotFoundError(f"配置文件不存在: {path}")
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
return cls(
ai=AIConfig(**data.get("ai", {})),
memory=MemoryConfig(**data.get("memory", {})),
log_level=data.get("log_level", "INFO")
)
def validate(self) -> tuple[bool, list[str]]:
"""验证配置完整性"""
errors = []
if not self.ai.api_key:
errors.append("API Key 未设置")
if self.ai.provider == "azure" and not self.ai.endpoint:
errors.append("Azure Endpoint 未设置")
return len(errors) == 0, errors
# 配置使用示例
def main():
# 方式一:从环境变量加载
# config = AppConfig.from_env()
# 方式二:从配置文件加载
# config = AppConfig.from_file("config.json")
# 方式三:直接创建
config = AppConfig(
ai=AIConfig(
provider="openai",
api_key="sk-xxx",
model_id="gpt-4-turbo"
)
)
is_valid, errors = config.validate()
if not is_valid:
for error in errors:
print(f"配置错误: {error}")
return
print("配置验证通过")
print(f"AI Provider: {config.ai.provider}")
print(f"Model: {config.ai.model_id}")
if __name__ == "__main__":
main()
4.3 核心 Kernel 构建器
Kernel 构建器封装了 Kernel 的创建逻辑,提供了统一且可配置的初始化方式。
# core/kernel_builder.py - Kernel 构建器
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.connectors.ai.azure_open_ai import AzureChatCompletion
from typing import Optional, Dict, Any
class KernelBuilder:
"""Kernel 构建器,提供链式配置接口"""
def __init__(self):
self._kernel = sk.Kernel()
self._services: Dict[str, Any] = {}
self._plugins: Dict[str, Any] = {}
def with_azure_openai(
self,
endpoint: str,
api_key: str,
deployment_name: str,
api_version: str = "2024-02-15-preview",
service_id: str = "azure-gpt"
) -> "KernelBuilder":
"""添加 Azure OpenAI 服务"""
service = AzureChatCompletion(
service_id=service_id,
deployment_name=deployment_name,
endpoint=endpoint,
api_key=api_key,
api_version=api_version
)
self._kernel.add_service(service)
self._services["azure"] = service
return self
def with_openai(
self,
api_key: str,
model_id: str = "gpt-4-turbo",
org_id: Optional[str] = None,
service_id: str = "openai-gpt"
) -> "KernelBuilder":
"""添加 OpenAI 服务"""
service = OpenAIChatCompletion(
service_id=service_id,
ai_model_id=model_id,
api_key=api_key,
org_id=org_id
)
self._kernel.add_service(service)
self._services["openai"] = service
return self
def with_plugin(self, plugin: Any, skill_name: str) -> "KernelBuilder":
"""添加原生插件"""
self._kernel.import_skill(plugin, skill_name)
self._plugins[skill_name] = plugin
return self
def with_memory(
self,
store: Any,
collection_name: str = "generic_memory"
) -> "KernelBuilder":
"""添加记忆系统"""
from semantic_kernel.memory import SemanticTextMemory
memory = SemanticTextMemory(storage=store)
self._kernel.register_memory(memory)
return self
def with_configuration(
self,
temperature: float = 0.7,
max_tokens: int = 2000,
top_p: float = 1.0,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0
) -> "KernelBuilder":
"""设置 AI 生成参数"""
request_settings = {
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
"presence_penalty": presence_penalty,
"frequency_penalty": frequency_penalty
}
# 应用到所有已添加的服务
for service in self._services.values():
service.request_settings = request_settings
return self
def build(self) -> sk.Kernel:
"""构建并返回配置好的 Kernel"""
if not self._services:
raise ValueError("必须至少添加一个 AI 服务")
return self._kernel
def get_plugin_count(self) -> int:
"""获取已添加的插件数量"""
return len(self._plugins)
def get_service_names(self) -> list[str]:
"""获取已添加的服务名称"""
return list(self._services.keys())
# 使用示例
def create_production_kernel() -> sk.Kernel:
"""创建生产环境 Kernel"""
kernel = (
KernelBuilder()
.with_azure_openai(
endpoint="https://your-resource.openai.azure.com",
api_key="your-key",
deployment_name="gpt-4",
api_version="2024-02-15-preview"
)
.with_configuration(
temperature=0.7,
max_tokens=2000
)
.build()
)
return kernel
4.4 日历管理插件
这是一个功能完整的日历管理插件,展示了如何创建实用的原生函数。
# plugins/calendar_plugin.py - 日历管理插件
from datetime import datetime, timedelta
from typing import Optional, List
from semantic_kernel.skill_definition import (
sk_function,
sk_function_context_parameter
)
class CalendarPlugin:
"""日历管理插件 - 管理日程安排"""
def __init__(self):
# 使用内存存储日程,实际应用中可用数据库
self._events: dict[str, list[dict]] = {
"default": []
}
@sk_function(
description="创建新的日程事件",
name="create_event",
input_description="日程标题"
)
@sk_function_context_parameter(
name="date",
description="日期,格式为 YYYY-MM-DD",
default_value=""
)
@sk_function_context_parameter(
name="time",
description="时间,格式为 HH:MM",
default_value=""
)
@sk_function_context_parameter(
name="duration",
description="持续时间(分钟)",
default_value="60"
)
@sk_function_context_parameter(
name="description",
description="日程描述",
default_value=""
)
def create_event(
self,
context: sk.KernelContext
) -> str:
"""创建新日程"""
title = context.input
date = context.variables.get("date", "")
time = context.variables.get("time", "")
duration = int(context.variables.get("duration", "60"))
description = context.variables.get("description", "")
# 验证输入
if not title:
return "错误:日程标题不能为空"
if not date:
date = datetime.now().strftime("%Y-%m-%d")
if not time:
time = datetime.now().strftime("%H:%M")
# 解析时间
try:
start_dt = datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M")
end_dt = start_dt + timedelta(minutes=duration)
except ValueError:
return "错误:日期或时间格式不正确"
# 创建事件
event = {
"id": f"evt_{len(self._events['default']) + 1}",
"title": title,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"description": description
}
self._events["default"].append(event)
return (
f"日程已创建:\n"
f"标题:{title}\n"
f"时间:{start_dt.strftime('%Y-%m-%d %H:%M')} - {end_dt.strftime('%H:%M')}\n"
f"时长:{duration} 分钟\n"
f"事件ID:{event['id']}"
)
@sk_function(
description="查询指定日期的日程",
name="list_events",
input_description="查询日期,格式为 YYYY-MM-DD,不填则查询今天"
)
def list_events(self, date: str) -> str:
"""列出指定日期的日程"""
if not date:
date = datetime.now().strftime("%Y-%m-%d")
events_today = []
for event in self._events["default"]:
event_date = event["start"].split("T")[0]
if event_date == date:
events_today.append(event)
if not events_today:
return f"{date} 没有安排的日程"
# 按时间排序
events_today.sort(key=lambda x: x["start"])
result_lines = [f"{date} 的日程安排:", ""]
for i, event in enumerate(events_today, 1):
start_dt = datetime.fromisoformat(event["start"])
end_dt = datetime.fromisoformat(event["end"])
result_lines.append(f"{i}. {event['title']}")
result_lines.append(f" 时间:{start_dt.strftime('%H:%M')} - {end_dt.strftime('%H:%M')}")
if event["description"]:
result_lines.append(f" 描述:{event['description']}")
result_lines.append("")
return "\n".join(result_lines).strip()
@sk_function(
description="删除指定事件",
name="delete_event",
input_description="要删除的事件ID"
)
def delete_event(self, event_id: str) -> str:
"""删除日程事件"""
for i, event in enumerate(self._events["default"]):
if event["id"] == event_id:
deleted = self._events["default"].pop(i)
return f"已删除日程:{deleted['title']}"
return f"未找到事件ID:{event_id}"
@sk_function(
description="查找包含关键词的日程",
name="search_events",
input_description="搜索关键词"
)
def search_events(self, keyword: str) -> str:
"""搜索日程"""
if not keyword:
return "请提供搜索关键词"
results = []
for event in self._events["default"]:
if keyword.lower() in event["title"].lower():
results.append(event)
elif event["description"] and keyword.lower() in event["description"].lower():
results.append(event)
if not results:
return f"没有找到包含「{keyword}」的日程"
result_lines = [f"找到 {len(results)} 个相关日程:", ""]
for event in results:
start_dt = datetime.fromisoformat(event["start"])
result_lines.append(f"- {event['title']} ({start_dt.strftime('%Y-%m-%d %H:%M')})")
return "\n".join(result_lines)
4.5 搜索功能插件
这个插件展示了如何集成外部 API,让 AI Agent 能够获取实时信息。
# plugins/search_plugin.py - 搜索功能插件
import json
import httpx
from typing import Optional
from semantic_kernel.skill_definition import (
sk_function,
sk_function_context_parameter
)
class SearchPlugin:
"""搜索功能插件 - 集成搜索 API"""
def __init__(self, api_key: Optional[str] = None):
self._api_key = api_key or ""
self._http_client = httpx.AsyncClient(timeout=30.0)
async def _close(self):
"""关闭 HTTP 客户端"""
await self._http_client.aclose()
@sk_function(
description="使用必应搜索获取网络信息",
name="bing_search",
input_description="搜索查询词"
)
@sk_function_context_parameter(
name="num_results",
description="返回结果数量(1-10)",
default_value="5"
)
async def bing_search(self, context: sk.KernelContext) -> str:
"""
执行必应搜索
注意:实际使用需要配置 BING API Key
"""
query = context.input
num_results = int(context.variables.get("num_results", "5"))
if not query:
return "请提供搜索关键词"
# 如果没有配置 API key,返回示例数据
if not self._api_key:
return await self._mock_search(query, num_results)
# 实际调用 Bing Search API
try:
headers = {
"Ocp-Apim-Subscription-Key": self._api_key
}
params = {
"q": query,
"count": num_results,
"mkt": "zh-CN"
}
response = await self._http_client.get(
"https://api.bing.microsoft.com/v7.0/search",
headers=headers,
params=params
)
response.raise_for_status()
data = response.json()
return self._format_bing_results(data)
except httpx.HTTPError as e:
return f"搜索请求失败: {str(e)}"
except Exception as e:
return f"搜索出错: {str(e)}"
async def _mock_search(self, query: str, num_results: int) -> str:
"""
模拟搜索结果(用于测试和演示)
实际使用时替换为真实的 API 调用
"""
mock_data = {
"webPages": {
"value": [
{
"name": f"关于「{query}」的相关文章 1",
"url": f"https://example.com/article1",
"snippet": f"这是关于{query}的详细内容摘要..."
},
{
"name": f"关于「{query}」的相关文章 2",
"url": f"https://example.com/article2",
"snippet": f"深入探讨{query}的核心概念和应用场景..."
},
{
"name": f"「{query}」官方文档",
"url": f"https://example.com/docs",
"snippet": f"官方文档提供了{query}的完整指南..."
}
]
}
}
return self._format_bing_results(mock_data)
def _format_bing_results(self, data: dict) -> str:
"""格式化搜索结果"""
results = data.get("webPages", {}).get("value", [])
if not results:
return "未找到相关结果"
lines = ["搜索结果:", ""]
for i, result in enumerate(results[:5], 1):
lines.append(f"{i}. {result['name']}")
lines.append(f" {result['snippet']}")
lines.append(f" 链接:{result['url']}")
lines.append("")
return "\n".join(lines)
@sk_function(
description="获取网页内容的摘要",
name="get_page_summary",
input_description="网页 URL"
)
async def get_page_summary(self, url: str) -> str:
"""
获取网页内容摘要
这是一个简化版本,实际应用中可能需要更复杂的 HTML 解析
"""
if not url:
return "请提供有效的 URL"
try:
response = await self._http_client.get(url)
response.raise_for_status()
# 简化处理:只获取前 500 个字符
content = response.text[:500]
# 移除 HTML 标签(简化版)
import re
content = re.sub(r'<[^>]+>', '', content)
return (
f"页面内容摘要:\n"
f"{content.strip()}...\n"
f"(完整内容需要进一步处理)"
)
except httpx.HTTPError as e:
return f"获取页面失败: {str(e)}"
4.6 主 Agent 实现
现在我们将所有组件整合起来,创建一个完整的 AI Agent。
# core/agent.py - AI Agent 核心实现
import asyncio
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
import semantic_kernel as sk
from semantic_kernel.planning.basic_planner import BasicPlanner
from semantic_kernel.planning.stepwise_planner import StepwisePlanner
from plugins.math_plugin import MathPlugin
from plugins.calendar_plugin import CalendarPlugin
from plugins.search_plugin import SearchPlugin
@dataclass
class AgentConfig:
"""Agent 配置"""
name: str = "助手"
description: str = "一个多功能 AI 助手"
use_stepwise_planner: bool = True # 是否使用渐进式规划器
max_retry: int = 3 # 最大重试次数
class AssistantAgent:
"""
多功能 AI 助手
整合各种插件,提供智能化的问答和服务能力
"""
def __init__(self, kernel: sk.Kernel, config: Optional[AgentConfig] = None):
self.kernel = kernel
self.config = config or AgentConfig()
self.planner = None
# 初始化插件
self._initialize_plugins()
# 初始化规划器
self._initialize_planner()
# 注册提示模板
self._register_prompts()
def _initialize_plugins(self) -> None:
"""初始化所有插件"""
# 数学计算插件
math_plugin = MathPlugin()
self.kernel.import_skill(math_plugin, "math")
# 日历管理插件
calendar_plugin = CalendarPlugin()
self.kernel.import_skill(calendar_plugin, "calendar")
# 搜索插件
search_plugin = SearchPlugin()
self.kernel.import_skill(search_plugin, "search")
print("=" * 50)
print("插件加载完成")
print("- math: 数学计算")
print("- calendar: 日程管理")
print("- search: 网络搜索")
print("=" * 50)
def _initialize_planner(self) -> None:
"""初始化规划器"""
if self.config.use_stepwise_planner:
self.planner = StepwisePlanner(
self.kernel,
max_retry=self.config.max_retry
)
else:
self.planner = BasicPlanner()
print(f"规划器已就绪: {type(self.planner).__name__}")
def _register_prompts(self) -> None:
"""注册语义函数提示模板"""
# 系统提示词
self._system_prompt = """
你是一个智能助手,名字叫「{name}」。
你的能力包括:
1. 数学计算:可以进行加减乘除等运算
2. 日程管理:创建、查询、删除日程
3. 网络搜索:获取实时信息
当你需要执行具体操作时,请使用可用的函数。
回答时要简洁、有帮助,并且符合用户的语言习惯。
""".format(name=self.config.name)
async def chat(self, user_input: str, context: Optional[Dict] = None) -> str:
"""
处理用户对话
Args:
user_input: 用户输入
context: 可选的上下文信息
Returns:
Agent 的回复
"""
# 构建聊天历史
chat_history = context.get("chat_history", []) if context else []
# 使用规划器自动分解任务
if self.planner:
try:
plan = await self.planner.create_plan(user_input)
# 执行计划
result = await plan.execute()
return result
except Exception as e:
print(f"规划执行出错: {e}")
# 降级为直接调用
return await self._direct_chat(user_input)
else:
return await self._direct_chat(user_input)
async def _direct_chat(self, user_input: str) -> str:
"""直接对话模式(不使用规划器)"""
try:
# 获取 AI 服务
service = self.kernel.get_service()
# 构建完整的提示
full_prompt = self._system_prompt + f"\n\n用户:{user_input}\n\n助手:"
# 调用 AI
response = await service.complete_async(
prompt=full_prompt,
request_settings={
"temperature": 0.7,
"max_tokens": 2000
}
)
return response.text
except Exception as e:
return f"处理请求时出错: {str(e)}"
async def process_stream(self, user_input: str):
"""
流式响应模式
适用于需要实时显示响应的场景
"""
service = self.kernel.get_service()
full_prompt = self._system_prompt + f"\n\n用户:{user_input}\n\n助手:"
# 流式生成
async for token in service.complete_stream_async(
prompt=full_prompt,
request_settings={"max_tokens": 2000}
):
yield token
async def run_task(self, task: str) -> str:
"""
运行特定任务
适用于需要执行一系列操作的复杂任务
"""
if "日程" in task or "calendar" in task.lower():
return await self._handle_calendar_task(task)
if "搜索" in task or "search" in task.lower():
return await self._handle_search_task(task)
if any(op in task for op in ["加", "减", "乘", "除", "+", "-", "*", "/"]):
return await self._handle_math_task(task)
# 默认走对话流程
return await self.chat(task)
async def _handle_calendar_task(self, task: str) -> str:
"""处理日历相关任务"""
context = self.kernel.create_new_context()
if "创建" in task or "添加" in task:
context.variables.set("date", "")
context.variables.set("time", "")
context.variables.set("duration", "60")
context.variables.set("description", "")
return await self.kernel.run_async(
self.kernel.skills["calendar"]["create_event"],
input_str=task,
context=context
)
if "查询" in task or "列表" in task:
# 提取日期
import re
date_match = re.search(r'\d{4}-\d{2}-\d{2}', task)
date = date_match.group(0) if date_match else ""
return await self.kernel.run_async(
self.kernel.skills["calendar"]["list_events"],
input_str=date
)
return await self.chat(task)
async def _handle_search_task(self, task: str) -> str:
"""处理搜索任务"""
# 提取搜索关键词
keywords = task.replace("搜索", "").replace("查找", "").strip()
context = self.kernel.create_new_context()
context.variables.set("num_results", "5")
return await self.kernel.run_async(
self.kernel.skills["search"]["bing_search"],
input_str=keywords,
context=context
)
async def _handle_math_task(self, task: str) -> str:
"""处理数学计算任务"""
# 简单的数学表达式解析
import re
# 尝试匹配加法
add_match = re.search(r'(\d+)\s*\+\s*(\d+)', task)
if add_match:
a, b = add_match.groups()
return await self.kernel.run_async(
self.kernel.skills["math"]["add"],
input_str=f"{a},{b}"
)
# 尝试匹配减法
sub_match = re.search(r'(\d+)\s*-\s*(\d+)', task)
if sub_match:
a, b = sub_match.groups()
return await self.kernel.run_async(
self.kernel.skills["math"]["subtract"],
input_str=f"{a},{b}"
)
# 尝试匹配乘法
mul_match = re.search(r'(\d+)\s*\*\s*(\d+)', task)
if mul_match:
a, b = mul_match.groups()
return await self.kernel.run_async(
self.kernel.skills["math"]["multiply"],
input_str=f"{a},{b}"
)
# 尝试匹配除法
div_match = re.search(r'(\d+)\s*/\s*(\d+)', task)
if div_match:
a, b = div_match.groups()
return await self.kernel.run_async(
self.kernel.skills["math"]["divide"],
input_str=f"{a},{b}"
)
return await self.chat(task)
# 使用示例和测试
async def main():
"""主函数 - 演示 Agent 的使用"""
print("初始化 AI Agent...")
print()
# 创建 Kernel
kernel = (
KernelBuilder()
.with_azure_openai(
endpoint="https://your-resource.openai.azure.com",
api_key="your-key",
deployment_name="gpt-4"
)
.build()
)
# 创建 Agent
agent = AssistantAgent(
kernel,
config=AgentConfig(name="小助手")
)
print()
print("=" * 50)
print("Agent 已就绪!开始对话测试")
print("=" * 50)
# 测试对话
test_inputs = [
"你好,请介绍一下你自己",
"帮我计算 123 + 456",
"帮我创建一个明天上午10点的会议,主题是周例会",
"搜索一下 AI Agent 的最新发展动态"
]
for user_input in test_inputs:
print()
print(f"用户: {user_input}")
print("-" * 40)
response = await agent.chat(user_input)
print(f"助手: {response}")
print()
print("-" * 40)
if __name__ == "__main__":
asyncio.run(main())
五、常见使用场景与最佳实践
在实际应用中,Agent Framework 可以解决很多典型问题。下面我们通过几个场景来展示如何最佳实践。
5.1 场景一:构建客服机器人
客服机器人是最常见的应用场景之一。Agent Framework 的插件系统让这变得非常简单。
# scenarios/customer_service.py - 客服机器人实现
from typing import Optional, Dict, List
from enum import Enum
import semantic_kernel as sk
class Intent(Enum):
"""用户意图枚举"""
GREETING = "greeting"
PRODUCT_INQUIRY = "product_inquiry"
ORDER_STATUS = "order_status"
COMPLAINT = "complaint"
REFUND = "refund"
GOODBYE = "goodbye"
UNKNOWN = "unknown"
class ProductDatabase:
"""产品数据库(模拟)"""
def __init__(self):
self._products = {
"笔记本": {
"price": "5999",
"stock": "50",
"description": "高性能商务笔记本,搭载最新处理器"
},
"无线鼠标": {
"price": "199",
"stock": "200",
"description": "人体工学设计,支持蓝牙和USB双模连接"
},
"机械键盘": {
"price": "399",
"stock": "100",
"description": "青轴手感,支持RGB背光"
}
}
def search(self, keyword: str) -> Optional[Dict]:
"""搜索产品"""
for name, info in self._products.items():
if keyword in name:
return {"name": name, **info}
return None
def get_all(self) -> List[Dict]:
"""获取所有产品"""
return [{"name": k, **v} for k, v in self._products.items()]
class CustomerServicePlugin:
"""客服插件"""
def __init__(self):
self._products = ProductDatabase()
self._order_db = {} # 简化订单存储
@sk_function(
description="识别用户意图",
name="identify_intent",
input_description="用户的消息内容"
)
def identify_intent(self, message: str) -> str:
"""识别用户意图"""
message = message.lower()
# 意图识别逻辑
if any(word in message for word in ["你好", "hi", "hello", "嗨"]):
return Intent.GREETING.value
if any(word in message for word in ["产品", "商品", "有什么", "卖什么"]):
return Intent.PRODUCT_INQUIRY.value
if any(word in message for word in ["订单", "物流", "发货", "到了没"]):
return Intent.ORDER_STATUS.value
if any(word in message for word in ["投诉", "不满", "差", "垃圾"]):
return Intent.COMPLAINT.value
if any(word in message for word in ["退款", "退货", "取消订单"]):
return Intent.REFUND.value
if any(word in message for word in ["再见", "bye", "结束"]):
return Intent.GOODBYE.value
return Intent.UNKNOWN.value
@sk_function(
description="查询产品信息",
name="search_product",
input_description="产品名称或关键词"
)
def search_product(self, keyword: str) -> str:
"""搜索产品"""
if not keyword or keyword in ["全部", "所有", "list"]:
products = self._products.get_all()
if not products:
return "目前没有在售产品"
lines = ["目前在售的产品:"]
for p in products:
lines.append(f"- {p['name']}: ¥{p['price']}")
lines.append(f" {p['description']}")
lines.append("")
return "\n".join(lines)
product = self._products.search(keyword)
if not product:
return f"未找到「{keyword}」相关产品"
return (
f"产品:{product['name']}\n"
f"价格:¥{product['price']}\n"
f"库存:{product['stock']}件\n"
f"描述:{product['description']}"
)
@sk_function(
description="查询订单状态",
name="check_order",
input_description="订单号"
)
def check_order(self, order_id: str) -> str:
"""查询订单状态"""
# 模拟订单数据
mock_orders = {
"ORD001": {"status": "已发货", "express": "SF1234567890"},
"ORD002": {"status": "处理中", "express": ""},
"ORD003": {"status": "已完成", "express": ""}
}
if order_id in mock_orders:
order = mock_orders[order_id]
result = f"订单 {order_id} 状态:{order['status']}"
if order['express']:
result += f"\n快递单号:{order['express']}"
return result
return f"未找到订单 {order_id},请确认订单号是否正确"
@sk_function(
description="提交投诉工单",
name="submit_complaint",
input_description="投诉内容"
)
def submit_complaint(self, content: str) -> str:
"""提交投诉"""
# 生成工单号
ticket_id = f"COMP{len(self._order_db) + 1:04d}"
return (
f"您的投诉已受理\n"
f"工单号:{ticket_id}\n"
f"我们的客服人员将在24小时内与您联系\n"
f"感谢您的反馈!"
)
@sk_function(
description="处理退款请求",
name="process_refund",
input_description="订单号和退款原因"
)
def process_refund(self, request: str) -> str:
"""处理退款"""
return (
f"退款申请已提交\n"
f"订单号将在系统中核实\n"
f"退款预计在3-5个工作日内原路返回\n"
f"如有疑问,请联系人工客服"
)
class CustomerServiceAgent:
"""客服 Agent"""
def __init__(self, kernel: sk.Kernel):
self.kernel = kernel
self.plugin = CustomerServicePlugin()
self.kernel.import_skill(self.plugin, "customer_service")
self._system_prompt = """
你是一个专业、友好的在线客服助手。
回复原则:
1. 热情礼貌,用语专业
2. 站在用户角度思考问题
3. 复杂问题及时转人工
4. 禁止承诺无法兑现的事项
5. 涉及金钱、隐私等问题要谨慎处理
根据识别的用户意图,调用相应的函数来处理请求。
"""
async def handle_message(self, message: str) -> str:
"""处理用户消息"""
# 识别意图
intent = self.plugin.identify_intent(message)
# 根据意图处理
if intent == Intent.GREETING.value:
return self._handle_greeting()
elif intent == Intent.PRODUCT_INQUIRY.value:
return self._handle_product_inquiry(message)
elif intent == Intent.ORDER_STATUS.value:
return self._handle_order_inquiry(message)
elif intent == Intent.COMPLAINT.value:
return self._handle_complaint(message)
elif intent == Intent.REFUND.value:
return self._handle_refund(message)
elif intent == Intent.GOODBYE.value:
return "感谢您的咨询,祝您生活愉快!再见!"
else:
return await self._handle_general(message)
def _handle_greeting(self) -> str:
"""处理问候"""
return (
"您好!欢迎光临!😊\n\n"
"我可以帮您:\n"
"- 查询产品信息\n"
"- 查看订单状态\n"
"- 处理投诉建议\n"
"- 办理退款退货\n\n"
"请告诉我您的需求~"
)
def _handle_product_inquiry(self, message: str) -> str:
"""处理产品咨询"""
# 提取产品关键词
keywords = message.replace("产品", "").replace("商品", "")
keywords = keywords.replace("有什么", "").replace("卖什么", "")
keywords = keywords.strip()
return self.plugin.search_product(keywords)
def _handle_order_inquiry(self, message: str) -> str:
"""处理订单查询"""
import re
# 尝试提取订单号
match = re.search(r'ORD\d+', message)
if match:
order_id = match.group(0)
else:
return "请提供您的订单号,格式如:ORD001"
return self.plugin.check_order(order_id)
def _handle_complaint(self, message: str) -> str:
"""处理投诉"""
return self.plugin.submit_complaint(message)
def _handle_refund(self, message: str) -> str:
"""处理退款"""
return self.plugin.process_refund(message)
async def _handle_general(self, message: str) -> str:
"""处理通用问题"""
return (
"抱歉,我没能理解您的问题。\n"
"您可以尝试:\n"
"- 询问我们的产品\n"
"- 查询订单状态\n"
"- 提交投诉建议\n\n"
"或者直接告诉我您想做什么~"
)
5.2 场景二:文档分析与处理
Agent Framework 同样适合处理文档相关的任务,比如自动提取信息、生成摘要等。
# scenarios/document_processor.py - 文档处理示例
from typing import Optional, List, Dict
from dataclasses import dataclass
import re
@dataclass
class Document:
"""文档数据类"""
title: str
content: str
metadata: Optional[Dict] = None
@dataclass
class ExtractedInfo:
"""提取的信息"""
emails: List[str]
phone_numbers: List[str]
dates: List[str]
urls: List[str]
key_points: List[str]
class DocumentProcessor:
"""文档处理器"""
def __init__(self):
self._email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
self._phone_pattern = r'\b1[3-9]\d{9}\b'
self._date_pattern = r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}日?'
self._url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
@sk_function(
description="从文档中提取联系信息",
name="extract_contacts",
input_description="待处理的文档内容"
)
def extract_contacts(self, text: str) -> str:
"""提取联系方式"""
emails = re.findall(self._email_pattern, text)
phones = re.findall(self._phone_pattern, text)
result = []
if emails:
result.append("邮箱地址:")
for email in set(emails):
result.append(f" - {email}")
if phones:
result.append("电话号码:")
for phone in set(phones):
result.append(f" - {phone}")
if not result:
return "未在文档中找到联系信息"
return "\n".join(result)
@sk_function(
description="从文档中提取日期信息",
name="extract_dates",
input_description="待处理的文档内容"
)
def extract_dates(self, text: str) -> str:
"""提取日期信息"""
dates = re.findall(self._date_pattern, text)
if not dates:
return "未在文档中找到日期信息"
# 标准化日期格式
normalized = []
for date in set(dates):
normalized_date = date.replace("年", "-").replace("月", "-").replace("日", "")
normalized.append(normalized_date)
result = ["找到的日期信息:"]
for date in normalized:
result.append(f" - {date}")
return "\n".join(result)
@sk_function(
description="提取文档中的超链接",
name="extract_urls",
input_description="待处理的文档内容"
)
def extract_urls(self, text: str) -> str:
"""提取 URL"""
urls = re.findall(self._url_pattern, text)
if not urls:
return "未在文档中找到链接"
result = ["找到的链接:"]
for url in set(urls):
result.append(f" - {url}")
return "\n".join(result)
@sk_function(
description="提取文档中的关键信息",
name="extract_key_info",
input_description="待处理的文档内容"
)
def extract_key_info(self, text: str) -> str:
"""
提取关键信息
注意:这是一个基于规则的简化版本
实际应用中可以使用 AI 模型进行更智能的提取
"""
# 提取所有联系信息
contacts = self.extract_contacts(text)
# 提取日期
dates = self.extract_dates(text)
# 提取链接
urls = self.extract_urls(text)
# 简单统计
lines = text.split("\n")
non_empty_lines = [l for l in lines if l.strip()]
summary = f"""
文档统计:
- 总行数:{len(lines)}
- 非空行数:{len(non_empty_lines)}
- 字符数:{len(text)}
提取的信息:
{contacts}
{dates}
{urls}
"""
return summary.strip()
@sk_function(
description="检测文档语言",
name="detect_language",
input_description="待检测的文本内容"
)
def detect_language(self, text: str) -> str:
"""
检测语言
简化版本:基于中文字符比例判断
实际应用中可使用专门的语言检测库
"""
if not text:
return "无法检测空文档的语言"
# 统计中文字符
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
total_chars = len(text.strip())
chinese_ratio = chinese_chars / total_chars if total_chars > 0 else 0
if chinese_ratio > 0.3:
return f"中文(简体),置信度约 {int(chinese_ratio * 100)}%"
else:
return f"英文或其他语言,中文占比约 {int(chinese_ratio * 100)}%"
5.3 场景三:数据分析和报告生成
这个场景展示了如何构建一个数据分析助手,它可以处理数据并生成结构化的报告。
# scenarios/data_analyst.py - 数据分析助手
from typing import List, Dict, Optional
from dataclasses import dataclass
import statistics
@dataclass
class SalesRecord:
"""销售记录"""
date: str
product: str
quantity: int
unit_price: float
region: str
class DataAnalysisPlugin:
"""数据分析插件"""
def __init__(self):
# 模拟销售数据
self._data: List[SalesRecord] = [
SalesRecord("2024-01-01", "笔记本", 10, 5999, "华东"),
SalesRecord("2024-01-01", "鼠标", 50, 199, "华北"),
SalesRecord("2024-01-02", "键盘", 30, 399, "华东"),
SalesRecord("2024-01-02", "笔记本", 8, 5999, "华南"),
SalesRecord("2024-01-03", "显示器", 15, 1999, "华北"),
]
@sk_function(
description="获取销售数据概览",
name="get_overview",
input_description="无需输入,填写'概览'即可"
)
def get_overview(self, _: str) -> str:
"""获取数据概览"""
total_revenue = sum(r.quantity * r.unit_price for r in self._data)
total_orders = len(self._data)
avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
return f"""
销售数据概览
========================================
总订单数:{total_orders}
总收入:¥{total_revenue:,.2f}
平均订单金额:¥{avg_order_value:,.2f}
========================================
"""
@sk_function(
description="按产品分组统计",
name="sales_by_product",
input_description="填写'产品统计'即可"
)
def sales_by_product(self, _: str) -> str:
"""按产品统计"""
product_stats: Dict[str, Dict] = {}
for record in self._data:
if record.product not in product_stats:
product_stats[record.product] = {
"quantity": 0,
"revenue": 0
}
product_stats[record.product]["quantity"] += record.quantity
product_stats[record.product]["revenue"] += record.quantity * record.unit_price
lines = ["按产品销售统计", "=" * 40, ""]
for product, stats in sorted(product_stats.items(),
key=lambda x: x[1]["revenue"],
reverse=True):
lines.append(f"{product}:")
lines.append(f" 销量:{stats['quantity']}件")
lines.append(f" 收入:¥{stats['revenue']:,.2f}")
lines.append("")
return "\n".join(lines)
@sk_function(
description="按地区分组统计",
name="sales_by_region",
input_description="填写'地区统计'即可"
)
def sales_by_region(self, _: str) -> str:
"""按地区统计"""
region_stats: Dict[str, Dict] = {}
for record in self._data:
if record.region not in region_stats:
region_stats[record.region] = {
"quantity": 0,
"revenue": 0
}
region_stats[record.region]["quantity"] += record.quantity
region_stats[record.region]["revenue"] += record.quantity * record.unit_price
lines = ["按地区销售统计", "=" * 40, ""]
for region, stats in sorted(region_stats.items(),
key=lambda x: x[1]["revenue"],
reverse=True):
lines.append(f"{region}:")
lines.append(f" 销量:{stats['quantity']}件")
lines.append(f" 收入:¥{stats['revenue']:,.2f}")
lines.append("")
return "\n".join(lines)
@sk_function(
description="查询特定产品的销售",
name="query_product",
input_description="产品名称"
)
def query_product(self, product_name: str) -> str:
"""查询产品销售情况"""
matching = [r for r in self._data
if product_name.lower() in r.product.lower()]
if not matching:
return f"未找到「{product_name}」的销售记录"
total_quantity = sum(r.quantity for r in matching)
total_revenue = sum(r.quantity * r.unit_price for r in matching)
lines = [f"「{product_name}」销售情况", "=" * 40, ""]
lines.append(f"销售笔数:{len(matching)}")
lines.append(f"总销量:{total_quantity}件")
lines.append(f"总收入:¥{total_revenue:,.2f}")
lines.append("")
lines.append("明细:")
for record in matching:
revenue = record.quantity * record.unit_price
lines.append(f" {record.date} | {record.region} | "
f"{record.quantity}件 | ¥{revenue:,.2f}")
return "\n".join(lines)
@sk_function(
description="生成销售报告",
name="generate_report",
input_description="报告标题"
)
def generate_report(self, title: str) -> str:
"""生成完整销售报告"""
# 收集数据
total_revenue = sum(r.quantity * r.unit_price for r in self._data)
total_orders = len(self._data)
# 产品排名
product_stats: Dict[str, float] = {}
for record in self._data:
product_stats[record.product] = (
product_stats.get(record.product, 0) +
record.quantity * record.unit_price
)
top_products = sorted(product_stats.items(),
key=lambda x: x[1],
reverse=True)[:3]
# 地区排名
region_stats: Dict[str, float] = {}
for record in self._data:
region_stats[record.region] = (
region_stats.get(record.region, 0) +
record.quantity * record.unit_price
)
top_regions = sorted(region_stats.items(),
key=lambda x: x[1],
reverse=True)
# 生成报告
report = f"""
{'=' * 50}
{title}
{'=' * 50}
一、整体概览
----------------------------------------
总订单数:{total_orders}
总收入:¥{total_revenue:,.2f}
二、热销产品 TOP 3
----------------------------------------
"""
for i, (product, revenue) in enumerate(top_products, 1):
percentage = (revenue / total_revenue * 100) if total_revenue > 0 else 0
report += f"{i}. {product}: ¥{revenue:,.2f} ({percentage:.1f}%)\n"
report += "\n三、区域销售分布\n----------------------------------------\n"
for region, revenue in top_regions:
percentage = (revenue / total_revenue * 100) if total_revenue > 0 else 0
report += f"- {region}: ¥{revenue:,.2f} ({percentage:.1f}%)\n"
report += f"\n{'=' * 50}\n报告生成时间:自动生成\n{'=' * 50}\n"
return report
六、高级技巧与最佳实践
掌握这些高级技巧可以让你的 Agent 开发更加得心应手。
6.1 异步编程模式
Agent Framework 全面支持异步操作,这在处理 I/O 密集型任务时至关重要。
# advanced/async_patterns.py - 异步编程模式
import asyncio
from typing import List, Callable, Any
import semantic_kernel as sk
class AsyncAgentPatterns:
"""异步 Agent 模式示例"""
def __init__(self, kernel: sk.Kernel):
self.kernel = kernel
async def process_batch(self, items: List[str],
processor: Callable) -> List[str]:
"""
批量处理任务
使用信号量控制并发数量,避免 API 限流
"""
semaphore = asyncio.Semaphore(3) # 最多同时3个任务
async def limited_process(item: str) -> str:
async with semaphore:
return await processor(item)
tasks = [limited_process(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理可能的异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(f"处理失败: {str(result)}")
else:
processed_results.append(result)
return processed_results
async def stream_response(self, prompt: str):
"""
流式响应处理
适用于需要实时显示 AI 回复的场景
"""
service = self.kernel.get_service()
full_text = []
async for chunk in service.complete_stream_async(prompt):
full_text.append(chunk)
yield chunk
return "".join(full_text)
async def with_timeout(self, coro, timeout: float = 30.0) -> Any:
"""
带超时的异步调用
防止无限等待
"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return f"操作超时({timeout}秒)"
async def with_retry(
self,
coro,
max_retries: int = 3,
delay: float = 1.0
) -> Any:
"""
带重试的异步调用
指数退避策略
"""
last_exception = None
for attempt in range(max_retries):
try:
return await coro
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt)
await asyncio.sleep(wait_time)
print(f"重试 {attempt + 1}/{max_retries},等待 {wait_time}秒...")
raise last_exception
# 使用示例
async def main():
kernel = sk.Kernel()
patterns = AsyncAgentPatterns(kernel)
# 批量处理示例
items = ["任务1", "任务2", "任务3", "任务4", "任务5"]
async def process_item(item: str) -> str:
await asyncio.sleep(1) # 模拟处理
return f"已处理: {item}"
print("开始批量处理...")
results = await patterns.process_batch(items, process_item)
for result in results:
print(result)
6.2 错误处理与日志记录
健壮的错误处理是生产级应用的基础。
# advanced/error_handling.py - 错误处理与日志
import logging
from typing import Optional, Callable, Any
from functools import wraps
import semantic_kernel as sk
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AgentFramework")
class AgentError(Exception):
"""Agent 基础异常"""
pass
class PluginError(AgentError):
"""插件执行错误"""
pass
class PlanExecutionError(AgentError):
"""计划执行错误"""
pass
class ErrorHandler:
"""错误处理器"""
def __init__(self):
self._error_counts: dict[str, int] = {}
def record_error(self, error_type: str, error: Exception):
"""记录错误"""
self._error_counts[error_type] = self._error_counts.get(error_type, 0) + 1
logger.error(
f"错误类型: {error_type}\n"
f"错误信息: {str(error)}\n"
f"累计次数: {self._error_counts[error_type]}"
)
def get_error_stats(self) -> dict[str, int]:
"""获取错误统计"""
return self._error_counts.copy()
def should_retry(self, error_type: str, max_retries: int = 3) -> bool:
"""判断是否应该重试"""
return self._error_counts.get(error_type, 0) < max_retries
def with_error_handling(func: Callable) -> Callable:
"""
错误处理装饰器
自动捕获异常并记录日志
"""
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"执行 {func.__name__} 时出错: {e}")
raise
@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"执行 {func.__name__} 时出错: {e}")
raise
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
class SafeAgent:
"""带错误处理的 Agent"""
def __init__(self, kernel: sk.Kernel):
self.kernel = kernel
self.error_handler = ErrorHandler()
@with_error_handling
async def execute_safely(self, task: str) -> str:
"""
安全执行任务
捕获所有异常,返回友好的错误信息
"""
try:
# 尝试执行
result = await self._execute_task(task)
return result
except PluginError as e:
self.error_handler.record_error("plugin", e)
return f"插件执行出错,请稍后重试"
except PlanExecutionError as e:
self.error_handler.record_error("plan", e)
return f"任务规划失败,尝试简化请求"
except Exception as e:
self.error_handler.record_error("unknown", e)
logger.exception("未知错误")
return "发生未知错误,请联系管理员"
async def _execute_task(self, task: str) -> str:
"""实际执行任务的内部方法"""
# 这里是实际的任务执行逻辑
pass
def get_health_status(self) -> dict:
"""获取 Agent 健康状态"""
stats = self.error_handler.get_error_stats()
return {
"status": "healthy" if sum(stats.values()) == 0 else "degraded",
"total_errors": sum(stats.values()),
"error_breakdown": stats
}
6.3 性能优化建议
针对大规模部署场景的性能优化技巧。
# advanced/performance.py - 性能优化
import time
from functools import wraps
from typing import Optional, Dict, Any
import semantic_kernel as sk
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self._metrics: Dict[str, list[float]] = {}
def record(self, operation: str, duration: float):
"""记录操作耗时"""
if operation not in self._metrics:
self._metrics[operation] = []
self._metrics[operation].append(duration)
def get_stats(self, operation: str) -> Optional[Dict[str, float]]:
"""获取操作统计"""
if operation not in self._metrics:
return None
durations = self._metrics[operation]
return {
"count": len(durations),
"total": sum(durations),
"avg": sum(durations) / len(durations),
"min": min(durations),
"max": max(durations)
}
def get_all_stats(self) -> Dict[str, Dict[str, float]]:
"""获取所有统计"""
return {op: self.get_stats(op) for op in self._metrics.keys()}
def timed(func: Callable) -> Callable:
"""计时装饰器"""
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
start = time.time()
try:
return await func(*args, **kwargs)
finally:
duration = time.time() - start
# 可以在这里记录到监控器
print(f"{func.__name__} 耗时: {duration:.3f}s")
@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
start = time.time()
try:
return func(*args, **kwargs)
finally:
duration = time.time() - start
print(f"{func.__name__} 耗时: {duration:.3f}s")
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
class CachedKernel:
"""
带缓存的 Kernel 包装器
避免重复创建 Kernel 实例
"""
_instance: Optional[sk.Kernel] = None
_last_created: float = 0
_ttl: float = 3600 # 1小时过期
@classmethod
def get_instance(cls, recreate: bool = False) -> sk.Kernel:
"""
获取单例 Kernel
Args:
recreate: 是否强制重新创建
"""
now = time.time()
if cls._instance is None or recreate or (now - cls._last_created) > cls._ttl:
cls._instance = sk.Kernel()
cls._last_created = now
return cls._instance
class PromptTemplateCache:
"""Prompt 模板缓存"""
def __init__(self, max_size: int = 100):
self._cache: Dict[str, Any] = {}
self._max_size = max_size
self._access_count: Dict[str, int] = {}
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
if key in self._cache:
self._access_count[key] = self._access_count.get(key, 0) + 1
return self._cache[key]
return None
def set(self, key: str, value: Any):
"""设置缓存"""
# 如果缓存已满,删除最少使用的项
if len(self._cache) >= self._max_size:
min_key = min(self._access_count, key=self._access_count.get)
del self._cache[min_key]
del self._access_count[min_key]
self._cache[key] = value
self._access_count[key] = 0
def clear(self):
"""清空缓存"""
self._cache.clear()
self._access_count.clear()
七、总结与资源链接
微软 Agent Framework 代表了 AI 应用开发的新范式。通过将复杂的 AI 能力封装为可组合的插件,它让开发者能够专注于业务逻辑而非底层实现。无论是构建智能客服、数据分析工具,还是更复杂的 AI Agent 系统,这个框架都提供了优雅且高效的解决方案。
框架的核心优势总结:
首先,统一的抽象层大大简化了 AI 应用的开发复杂度。Kernel、Plugins、Memory 三个核心概念相互配合,形成了一个完整的开发体系。其次,对多 AI 服务的支持让你可以在同一个应用中灵活切换不同的 AI 提供商,无论是 Azure OpenAI、OpenAI 还是其他服务。最后,强大的规划能力让复杂任务的自动化处理成为可能,这正是 AI Agent 的核心价值所在。
推荐学习路径:
对于初学者,建议从本教程的基础部分开始,首先理解 Kernel 和 Plugins 的概念,然后逐步尝试构建自己的第一个 AI 应用。对于有经验的开发者,可以深入研究规划器(Planner)和记忆系统的实现,这些高级特性能够解锁更多应用场景。对于企业用户,推荐关注框架的安全特性、监控能力和生产环境部署的最佳实践。
相关资源链接:
Semantic Kernel 官方文档提供了完整的 API 参考和更多示例代码。微软 Learn 上的 AI Engineer 路径包含系统化的学习资源。GitHub 社区中有大量的开源插件和工具可以参考。Azure AI Studio 提供了云端开发和部署的完整工具链。
展望未来:
随着大语言模型能力的不断提升,Agent Framework 也在持续演进。未来版本中将支持更多智能规划能力、更好的多模态支持,以及更丰富的企业级特性。微软正在积极开发新一代的 Autonomous Agents 功能,这将让 AI Agent 能够自主完成更复杂的任务。
掌握 Agent Framework 不仅能帮助你构建当下最先进的 AI 应用,更能为迎接 AI 技术下一波浪潮做好准备。希望这份教程能成为你探索 AI Agent 开发之旅的良好起点。祝你编码愉快!
评论区