别再用 LangChain 手动编排了,微软 Agent Framework 才是 AI Agent 开发的最优解

别再用 LangChain 手动编排了,微软 Agent Framework 才是 AI Agent 开发的最优解

别再用 LangChain 手动编排了,微软 Agent Framework 才是 AI Agent 开发的最优解

一、为什么微软 Agent Framework 值得关注

在 AI Agent 开发领域,开发者们长期面临着复杂的编排困境。当我们需要让大语言模型调用工具、访问记忆、执行多步骤任务时,往往需要编写大量胶水代码,处理各种边界情况,维护成本极高。微软 Agent Framework 的出现,正是为了解决这一核心痛点。

这个项目基于微软 Semantic Kernel 核心引擎构建,提供了一套优雅的抽象层,让 AI Agent 的开发变得模块化、可组合、易维护。与其他框架相比,它的优势在于:对企业级应用场景的深度支持、与现有 .NET 生态的天然融合、以及经过微软产品团队实际验证的稳定性。

更重要的是,Agent Framework 不仅仅是另一个 LLM 编排工具,它代表了一种全新的开发范式:通过将 AI 能力封装为可复用的插件,让开发者可以像搭积木一样构建复杂的智能应用。这种设计思路极大地降低了 AI 应用的开发门槛,同时保证了代码的可测试性和可维护性。

二、环境搭建:从零开始配置开发环境

在开始之前,我们需要确保开发环境满足基本要求。推荐使用 Python 3.10 或更高版本,因为新版本的类型提示和异步支持能让代码更加清晰。以下是完整的环境配置步骤。

首先,创建一个独立的虚拟环境是最佳实践,这样可以避免依赖冲突。使用 conda 或 venv 都可以,根据个人偏好选择即可。

# 创建并激活虚拟环境(使用 conda)
conda create -n agent-framework python=3.11
conda activate agent-framework

# 或者使用 venv
python -m venv agent-env
source agent-env/bin/activate  # Linux/Mac
agent-env\Scripts\activate     # Windows

环境激活后,接下来安装 Agent Framework 的核心依赖包。通过 pip install 命令可以快速完成安装,注意要同时安装 semantic-kernel 以获得完整功能。

# 安装核心框架
pip install semantic-kernel==0.9.1b1

# 安装可选依赖(根据需要选择)
pip install semantic-kernel-plugins      # 官方插件集合
pip install azure-identity              # Azure 认证支持
pip install openai                      # OpenAI API 客户端

安装完成后,创建一个配置文件来管理 API 密钥和连接参数。这不仅便于管理敏感信息,也方便在不同环境间切换。

# config.py - 配置文件示例
import os

class Settings:
    """应用配置类"""

    # Azure OpenAI 配置
    AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "your-api-key")
    AZURE_OPENAI_ENDPOINT = os.getenv(
        "AZURE_OPENAI_ENDPOINT", 
        "https://your-resource.openai.azure.com"
    )
    AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv(
        "AZURE_OPENAI_DEPLOYMENT_NAME",
        "gpt-4"
    )
    AZURE_OPENAI_API_VERSION = "2024-02-15-preview"

    # 或者使用 OpenAI 直接配置
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key")
    OPENAI_MODEL_ID = "gpt-4-turbo"

    @classmethod
    def validate(cls) -> bool:
        """验证配置是否完整"""
        has_azure = bool(cls.AZURE_OPENAI_API_KEY and cls.AZURE_OPENAI_ENDPOINT)
        has_openai = bool(cls.OPENAI_API_KEY)
        return has_azure or has_openai

环境验证脚本可以帮助我们确认所有组件是否正确安装。这个步骤虽然简单,但能避免后续很多麻烦。

# verify_setup.py - 环境验证脚本
import semantic_kernel as sk

def verify_installation():
    """验证框架安装是否成功"""
    print("=" * 50)
    print("Agent Framework 环境验证")
    print("=" * 50)

    try:
        # 验证核心模块
        kernel = sk.Kernel()
        print("[✓] 核心 Kernel 模块加载成功")

        # 验证版本信息
        print(f"[✓] Semantic Kernel 版本: {sk.__version__}")

        # 验证插件系统
        from semantic_kernel.plugins import PluginCollection
        plugins = PluginCollection(kernel)
        print("[✓] 插件系统初始化成功")

        # 验证内存服务
        from semantic_kernel.memory import SemanticTextMemory
        print("[✓] 内存服务加载成功")

        print("\n" + "=" * 50)
        print("所有组件验证通过!可以开始开发。")
        print("=" * 50)
        return True

    except ImportError as e:
        print(f"[✗] 导入错误: {e}")
        print("请重新安装依赖: pip install semantic-kernel")
        return False
    except Exception as e:
        print(f"[✗] 验证过程出错: {e}")
        return False

if __name__ == "__main__":
    verify_installation()

三、核心概念详解:理解 Agent Framework 的设计哲学

理解 Agent Framework 的核心概念是掌握这个框架的关键。微软的设计理念是将复杂的 AI 能力分解为几个核心抽象,每个抽象都有明确的职责边界,通过组合这些抽象来完成复杂任务。

3.1 Kernel:智能编排的核心引擎

Kernel 是整个框架的心脏,它负责管理所有 AI 相关的资源和调度。你可以把它想象成一个操作系统内核,它管理着插件、内存、聊天历史,以及与各种 AI 服务的连接。一个应用中通常只需要一个 Kernel 实例,但根据架构设计,你也可以创建多个 Kernel 来处理不同的业务域。

# kernel_basics.py - Kernel 的基本使用
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion

def create_basic_kernel():
    """创建一个基础配置的 Kernel"""

    # 初始化 Kernel
    kernel = sk.Kernel()

    # 添加 AI 服务,这里以 OpenAI 为例
    # Azure OpenAI 的配置方式类似,只是连接器不同
    kernel.add_service(
        OpenAIChatCompletion(
            service_id="default-gpt",
            ai_model_id="gpt-4-turbo",
            api_key="your-api-key"
        )
    )

    # 配置日志级别(对于调试非常有用)
    kernel.Log = sk.NullLogger()

    return kernel

def create_azure_kernel():
    """创建 Azure OpenAI 配置的 Kernel"""

    kernel = sk.Kernel()

    # Azure OpenAI 需要使用特定的连接器
    from semantic_kernel.connectors.ai.azure_open_ai import (
        AzureChatCompletion,
        AzureTextCompletion
    )

    kernel.add_service(
        AzureChatCompletion(
            service_id="azure-gpt",
            deployment_name="gpt-4",
            endpoint="https://your-resource.openai.azure.com",
            api_key="your-azure-key"
        )
    )

    return kernel

Kernel 的配置非常灵活,支持多种 AI 服务的混合使用。例如,你可以同时连接 GPT-4 和 Claude,让不同的插件使用不同的模型,实现更复杂的功能。

3.2 Plugins:可复用的能力单元

插件是 Agent Framework 中最重要的概念之一。它将各种能力封装成可复用的单元,AI 模型可以像调用函数一样调用这些能力。插件的设计遵循了开放封闭原则:你不需要修改插件代码来添加新功能,只需要扩展即可。

一个插件本质上是一个类或函数的集合,每个可被 AI 调用的方法称为原生函数(Native Function)。与传统的函数调用不同,这些函数需要声明详细的功能描述,AI 模型会根据这些描述来决定何时、如何调用它们。

# plugins_example.py - 插件的定义和使用
from semantic_kernel import Kernel
from semantic_kernel.skill_definition import (
    sk_function,
    sk_function_context_parameter
)
from semantic_kernel.planning.basic_planner import BasicPlanner

class MathPlugin:
    """数学计算插件示例"""

    def __init__(self):
        self._precision = 2

    @sk_function(
        description="计算两个数的和,支持整数和浮点数",
        name="add",
        input_description="需要相加的两个数字,用逗号分隔"
    )
    def add(self, numbers: str) -> str:
        """执行加法运算"""
        try:
            a, b = map(float, numbers.split(","))
            result = a + b
            return str(round(result, self._precision))
        except ValueError as e:
            return f"输入格式错误: {e}"

    @sk_function(
        description="计算两个数的差",
        name="subtract",
        input_description="被减数和减数,用逗号分隔"
    )
    def subtract(self, numbers: str) -> str:
        """执行减法运算"""
        try:
            a, b = map(float, numbers.split(","))
            result = a - b
            return str(round(result, self._precision))
        except ValueError as e:
            return f"输入格式错误: {e}"

    @sk_function(
        description="计算两个数的乘积",
        name="multiply",
        input_description="需要相乘的两个数字,用逗号分隔"
    )
    def multiply(self, numbers: str) -> str:
        """执行乘法运算"""
        try:
            a, b = map(float, numbers.split(","))
            result = a * b
            return str(round(result, self._precision))
        except ValueError as e:
            return f"输入格式错误: {e}"

    @sk_function(
        description="计算两个数的商,结果保留两位小数",
        name="divide",
        input_description="被除数和除数,用逗号分隔"
    )
    def divide(self, numbers: str) -> str:
        """执行除法运算"""
        try:
            a, b = map(float, numbers.split(","))
            if b == 0:
                return "错误:除数不能为零"
            result = a / b
            return str(round(result, self._precision))
        except ValueError as e:
            return f"输入格式错误: {e}"

class DateTimePlugin:
    """日期时间处理插件"""

    @sk_function(
        description="获取当前日期和时间",
        name="get_current_datetime",
        input_description="不需要输入参数"
    )
    def get_current_datetime(self) -> str:
        """返回当前日期时间"""
        from datetime import datetime
        now = datetime.now()
        return now.strftime("%Y年%m月%d日 %H:%M:%S")

    @sk_function(
        description="计算两个日期之间的天数差",
        name="days_between",
        input_description="两个日期,格式为 YYYY-MM-DD,用逗号分隔"
    )
    def days_between(self, dates: str) -> str:
        """计算日期差"""
        from datetime import datetime
        try:
            date1_str, date2_str = dates.split(",")
            date1 = datetime.strptime(date1_str.strip(), "%Y-%m-%d")
            date2 = datetime.strptime(date2_str.strip(), "%Y-%m-%d")
            delta = abs((date2 - date1).days)
            return f"相差 {delta} 天"
        except ValueError as e:
            return f"日期格式错误: {e}"

3.3 语义函数:自然语言与代码的桥梁

除了原生函数,Agent Framework 还支持语义函数(Semantic Function)。与原生函数不同,语义函数的核心是一个 Prompt 模板,AI 模型会结合输入和上下文来生成结果。这种方式特别适合需要灵活性或创造性输出的场景。

# semantic_functions.py - 语义函数的定义和使用
import semantic_kernel as sk

def create_semantic_functions(kernel: sk.Kernel):
    """创建语义函数集合"""

    # 创建提示模板目录
    prompts_dir = "./prompts"

    # 注册一个翻译助手函数
    # 这是通过目录结构定义的语义函数
    # prompts_dir/translator/skprompt.txt - 提示模板
    # prompts_dir/translator/config.json - 配置文件

    kernel.import_semantic_skill_from_directory(
        prompts_dir,
        "translator"
    )

    return kernel

# 目录结构示例:
# prompts/
# └── translator/
#     ├── skprompt.txt
#     └── config.json

# skprompt.txt 内容示例:
"""
你是一个专业的翻译助手。

任务:将用户输入的文本翻译成目标语言。

目标语言:{{$target_language}}
翻译风格:{{$style}}

原文:{{$input}}

请提供准确、自然的翻译结果。
"""

# config.json 内容示例:
"""
{
    "description": "多语言翻译助手",
    "completion": {
        "max_tokens": 1000,
        "temperature": 0.3,
        "top_p": 0.8
    }
}

语义函数的优势在于它的灵活性。由于核心是 Prompt 模板,你可以随时调整 AI 的行为而无需修改代码。这对于需要频繁优化的场景特别有用。

3.4 记忆系统:赋予 Agent 持久化上下文的能力

Agent Framework 的记忆系统是其核心能力之一。它允许 AI 模型在对话之间保持状态,记住用户偏好、历史交互等信息。这对于构建真正智能的助手至关重要。

# memory_system.py - 记忆系统的使用
from semantic_kernel import Kernel
from semantic_kernel.memory import (
    SemanticTextMemory,
    VolatileMemoryStore,
    MemoryQueryResult
)
from semantic_kernel.connectors.memory.azure_cognitive_search import (
    AzureCognitiveSearchMemoryStore
)

class AgentMemory:
    """Agent 记忆管理类"""

    def __init__(self, kernel: Kernel, use_azure: bool = False):
        self.kernel = kernel
        self.use_azure = use_azure
        self.memory = self._setup_memory()

    def _setup_memory(self) -> SemanticTextMemory:
        """配置记忆存储"""

        # 方式一:使用内存存储(仅当前会话有效)
        if not self.use_azure:
            memory_store = VolatileMemoryStore()
            memory = SemanticTextMemory(storage=memory_store)

        # 方式二:使用 Azure Cognitive Search(持久化存储)
        else:
            memory_store = AzureCognitiveSearchMemoryStore(
                search_service_endpoint="your-endpoint",
                api_key="your-key",
                index_name="agent-memories"
            )
            memory = SemanticTextMemory(storage=memory_store)

        # 将记忆系统注册到 Kernel
        self.kernel.register_memory(memory)

        return memory

    async def save_memory(self, collection: str, key: str, text: str, 
                          description: str = "") -> None:
        """保存信息到记忆库"""
        await self.memory.save_information(
            collection=collection,
            id=key,
            text=text,
            description=description
        )
        print(f"已保存记忆: [{key}] {description}")

    async def search_memory(self, collection: str, query: str, 
                           limit: int = 5) -> list[MemoryQueryResult]:
        """从记忆库搜索相关信息"""
        results = await self.memory.search(
            collection=collection,
            query=query,
            limit=limit
        )
        return results

    async def save_user_preference(self, user_id: str, preference: dict) -> None:
        """保存用户偏好设置"""
        import json

        preference_text = json.dumps(preference, ensure_ascii=False)
        await self.save_memory(
            collection="user_preferences",
            key=f"user_{user_id}",
            text=preference_text,
            description=f"用户 {user_id} 的偏好设置"
        )

    async def get_user_context(self, user_id: str, query: str) -> str:
        """获取与用户相关的上下文信息"""
        results = await self.search_memory(
            collection="user_preferences",
            query=query
        )

        context_parts = []
        for result in results:
            context_parts.append(result.text)

        return "\n".join(context_parts) if context_parts else "未找到相关记忆"

四、实战教程:从零构建一个多功能 AI 助手

现在我们已经理解了核心概念,接下来通过一个完整的实战项目来巩固所学。我们将构建一个多功能 AI 助手,它具备以下能力:回答用户问题、执行计算任务、管理日程、搜索网络信息,以及记住用户偏好。

4.1 项目架构设计

在开始编码之前,先规划一下整体架构。良好的架构能让代码更容易维护和扩展。

# project_structure.py - 项目结构定义
"""
项目架构:

assistant/
├── config/
│   ├── __init__.py
│   ├── settings.py        # 配置管理
│   └── prompts.py         # Prompt 模板
├── plugins/
│   ├── __init__.py
│   ├── math_plugin.py     # 数学计算插件
│   ├── calendar_plugin.py # 日历管理插件
│   ├── search_plugin.py   # 搜索功能插件
│   └── memory_plugin.py   # 记忆管理插件
├── skills/
│   └── semantic/          # 语义函数目录
├── core/
│   ├── __init__.py
│   ├── kernel_builder.py  # Kernel 构建器
│   └── agent.py           # Agent 基类
├── services/
│   ├── __init__.py
│   └── memory_service.py  # 记忆服务
├── main.py                # 入口文件
└── requirements.txt       # 依赖清单
"""

# 这个示例展示了推荐的目录结构组织方式

4.2 配置管理模块

配置管理是任何项目的基础。我们创建一个灵活的配置类,支持从环境变量、配置文件和代码默认值多层次配置。

# config/settings.py - 配置管理
import os
import json
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field

@dataclass
class AIConfig:
    """AI 服务配置"""
    provider: str = "azure"  # azure 或 openai
    api_key: str = ""
    endpoint: str = ""
    deployment_name: str = "gpt-4"
    api_version: str = "2024-02-15-preview"
    model_id: str = "gpt-4-turbo"
    temperature: float = 0.7
    max_tokens: int = 2000

@dataclass
class MemoryConfig:
    """记忆存储配置"""
    use_azure: bool = False
    connection_string: str = ""
    index_name: str = "agent-memories"

@dataclass
class AppConfig:
    """应用整体配置"""
    ai: AIConfig = field(default_factory=AIConfig)
    memory: MemoryConfig = field(default_factory=MemoryConfig)
    log_level: str = "INFO"
    enable_streaming: bool = False

    @classmethod
    def from_env(cls) -> "AppConfig":
        """从环境变量加载配置"""
        return cls(
            ai=AIConfig(
                provider=os.getenv("AI_PROVIDER", "azure"),
                api_key=os.getenv("API_KEY", ""),
                endpoint=os.getenv("AZURE_ENDPOINT", ""),
                deployment_name=os.getenv("DEPLOYMENT_NAME", "gpt-4")
            ),
            memory=MemoryConfig(
                use_azure=os.getenv("USE_AZURE_MEMORY", "false").lower() == "true",
                connection_string=os.getenv("AZURE_COSMOS_CONNECTION", ""),
                index_name=os.getenv("MEMORY_INDEX", "agent-memories")
            ),
            log_level=os.getenv("LOG_LEVEL", "INFO")
        )

    @classmethod
    def from_file(cls, path: str) -> "AppConfig":
        """从配置文件加载"""
        config_path = Path(path)
        if not config_path.exists():
            raise FileNotFoundError(f"配置文件不存在: {path}")

        with open(config_path, "r", encoding="utf-8") as f:
            data = json.load(f)

        return cls(
            ai=AIConfig(**data.get("ai", {})),
            memory=MemoryConfig(**data.get("memory", {})),
            log_level=data.get("log_level", "INFO")
        )

    def validate(self) -> tuple[bool, list[str]]:
        """验证配置完整性"""
        errors = []

        if not self.ai.api_key:
            errors.append("API Key 未设置")

        if self.ai.provider == "azure" and not self.ai.endpoint:
            errors.append("Azure Endpoint 未设置")

        return len(errors) == 0, errors

# 配置使用示例
def main():
    # 方式一:从环境变量加载
    # config = AppConfig.from_env()

    # 方式二:从配置文件加载
    # config = AppConfig.from_file("config.json")

    # 方式三:直接创建
    config = AppConfig(
        ai=AIConfig(
            provider="openai",
            api_key="sk-xxx",
            model_id="gpt-4-turbo"
        )
    )

    is_valid, errors = config.validate()
    if not is_valid:
        for error in errors:
            print(f"配置错误: {error}")
        return

    print("配置验证通过")
    print(f"AI Provider: {config.ai.provider}")
    print(f"Model: {config.ai.model_id}")

if __name__ == "__main__":
    main()

4.3 核心 Kernel 构建器

Kernel 构建器封装了 Kernel 的创建逻辑,提供了统一且可配置的初始化方式。

# core/kernel_builder.py - Kernel 构建器
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.connectors.ai.azure_open_ai import AzureChatCompletion
from typing import Optional, Dict, Any

class KernelBuilder:
    """Kernel 构建器,提供链式配置接口"""

    def __init__(self):
        self._kernel = sk.Kernel()
        self._services: Dict[str, Any] = {}
        self._plugins: Dict[str, Any] = {}

    def with_azure_openai(
        self,
        endpoint: str,
        api_key: str,
        deployment_name: str,
        api_version: str = "2024-02-15-preview",
        service_id: str = "azure-gpt"
    ) -> "KernelBuilder":
        """添加 Azure OpenAI 服务"""

        service = AzureChatCompletion(
            service_id=service_id,
            deployment_name=deployment_name,
            endpoint=endpoint,
            api_key=api_key,
            api_version=api_version
        )

        self._kernel.add_service(service)
        self._services["azure"] = service

        return self

    def with_openai(
        self,
        api_key: str,
        model_id: str = "gpt-4-turbo",
        org_id: Optional[str] = None,
        service_id: str = "openai-gpt"
    ) -> "KernelBuilder":
        """添加 OpenAI 服务"""

        service = OpenAIChatCompletion(
            service_id=service_id,
            ai_model_id=model_id,
            api_key=api_key,
            org_id=org_id
        )

        self._kernel.add_service(service)
        self._services["openai"] = service

        return self

    def with_plugin(self, plugin: Any, skill_name: str) -> "KernelBuilder":
        """添加原生插件"""

        self._kernel.import_skill(plugin, skill_name)
        self._plugins[skill_name] = plugin

        return self

    def with_memory(
        self,
        store: Any,
        collection_name: str = "generic_memory"
    ) -> "KernelBuilder":
        """添加记忆系统"""

        from semantic_kernel.memory import SemanticTextMemory

        memory = SemanticTextMemory(storage=store)
        self._kernel.register_memory(memory)

        return self

    def with_configuration(
        self,
        temperature: float = 0.7,
        max_tokens: int = 2000,
        top_p: float = 1.0,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0
    ) -> "KernelBuilder":
        """设置 AI 生成参数"""

        request_settings = {
            "temperature": temperature,
            "max_tokens": max_tokens,
            "top_p": top_p,
            "presence_penalty": presence_penalty,
            "frequency_penalty": frequency_penalty
        }

        # 应用到所有已添加的服务
        for service in self._services.values():
            service.request_settings = request_settings

        return self

    def build(self) -> sk.Kernel:
        """构建并返回配置好的 Kernel"""

        if not self._services:
            raise ValueError("必须至少添加一个 AI 服务")

        return self._kernel

    def get_plugin_count(self) -> int:
        """获取已添加的插件数量"""
        return len(self._plugins)

    def get_service_names(self) -> list[str]:
        """获取已添加的服务名称"""
        return list(self._services.keys())


# 使用示例
def create_production_kernel() -> sk.Kernel:
    """创建生产环境 Kernel"""

    kernel = (
        KernelBuilder()
        .with_azure_openai(
            endpoint="https://your-resource.openai.azure.com",
            api_key="your-key",
            deployment_name="gpt-4",
            api_version="2024-02-15-preview"
        )
        .with_configuration(
            temperature=0.7,
            max_tokens=2000
        )
        .build()
    )

    return kernel

4.4 日历管理插件

这是一个功能完整的日历管理插件,展示了如何创建实用的原生函数。

# plugins/calendar_plugin.py - 日历管理插件
from datetime import datetime, timedelta
from typing import Optional, List
from semantic_kernel.skill_definition import (
    sk_function,
    sk_function_context_parameter
)

class CalendarPlugin:
    """日历管理插件 - 管理日程安排"""

    def __init__(self):
        # 使用内存存储日程,实际应用中可用数据库
        self._events: dict[str, list[dict]] = {
            "default": []
        }

    @sk_function(
        description="创建新的日程事件",
        name="create_event",
        input_description="日程标题"
    )
    @sk_function_context_parameter(
        name="date",
        description="日期,格式为 YYYY-MM-DD",
        default_value=""
    )
    @sk_function_context_parameter(
        name="time",
        description="时间,格式为 HH:MM",
        default_value=""
    )
    @sk_function_context_parameter(
        name="duration",
        description="持续时间(分钟)",
        default_value="60"
    )
    @sk_function_context_parameter(
        name="description",
        description="日程描述",
        default_value=""
    )
    def create_event(
        self,
        context: sk.KernelContext
    ) -> str:
        """创建新日程"""

        title = context.input
        date = context.variables.get("date", "")
        time = context.variables.get("time", "")
        duration = int(context.variables.get("duration", "60"))
        description = context.variables.get("description", "")

        # 验证输入
        if not title:
            return "错误:日程标题不能为空"

        if not date:
            date = datetime.now().strftime("%Y-%m-%d")

        if not time:
            time = datetime.now().strftime("%H:%M")

        # 解析时间
        try:
            start_dt = datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M")
            end_dt = start_dt + timedelta(minutes=duration)
        except ValueError:
            return "错误:日期或时间格式不正确"

        # 创建事件
        event = {
            "id": f"evt_{len(self._events['default']) + 1}",
            "title": title,
            "start": start_dt.isoformat(),
            "end": end_dt.isoformat(),
            "description": description
        }

        self._events["default"].append(event)

        return (
            f"日程已创建:\n"
            f"标题:{title}\n"
            f"时间:{start_dt.strftime('%Y-%m-%d %H:%M')} - {end_dt.strftime('%H:%M')}\n"
            f"时长:{duration} 分钟\n"
            f"事件ID:{event['id']}"
        )

    @sk_function(
        description="查询指定日期的日程",
        name="list_events",
        input_description="查询日期,格式为 YYYY-MM-DD,不填则查询今天"
    )
    def list_events(self, date: str) -> str:
        """列出指定日期的日程"""

        if not date:
            date = datetime.now().strftime("%Y-%m-%d")

        events_today = []
        for event in self._events["default"]:
            event_date = event["start"].split("T")[0]
            if event_date == date:
                events_today.append(event)

        if not events_today:
            return f"{date} 没有安排的日程"

        # 按时间排序
        events_today.sort(key=lambda x: x["start"])

        result_lines = [f"{date} 的日程安排:", ""]
        for i, event in enumerate(events_today, 1):
            start_dt = datetime.fromisoformat(event["start"])
            end_dt = datetime.fromisoformat(event["end"])

            result_lines.append(f"{i}. {event['title']}")
            result_lines.append(f"   时间:{start_dt.strftime('%H:%M')} - {end_dt.strftime('%H:%M')}")

            if event["description"]:
                result_lines.append(f"   描述:{event['description']}")

            result_lines.append("")

        return "\n".join(result_lines).strip()

    @sk_function(
        description="删除指定事件",
        name="delete_event",
        input_description="要删除的事件ID"
    )
    def delete_event(self, event_id: str) -> str:
        """删除日程事件"""

        for i, event in enumerate(self._events["default"]):
            if event["id"] == event_id:
                deleted = self._events["default"].pop(i)
                return f"已删除日程:{deleted['title']}"

        return f"未找到事件ID:{event_id}"

    @sk_function(
        description="查找包含关键词的日程",
        name="search_events",
        input_description="搜索关键词"
    )
    def search_events(self, keyword: str) -> str:
        """搜索日程"""

        if not keyword:
            return "请提供搜索关键词"

        results = []
        for event in self._events["default"]:
            if keyword.lower() in event["title"].lower():
                results.append(event)
            elif event["description"] and keyword.lower() in event["description"].lower():
                results.append(event)

        if not results:
            return f"没有找到包含「{keyword}」的日程"

        result_lines = [f"找到 {len(results)} 个相关日程:", ""]
        for event in results:
            start_dt = datetime.fromisoformat(event["start"])
            result_lines.append(f"- {event['title']} ({start_dt.strftime('%Y-%m-%d %H:%M')})")

        return "\n".join(result_lines)

4.5 搜索功能插件

这个插件展示了如何集成外部 API,让 AI Agent 能够获取实时信息。

# plugins/search_plugin.py - 搜索功能插件
import json
import httpx
from typing import Optional
from semantic_kernel.skill_definition import (
    sk_function,
    sk_function_context_parameter
)

class SearchPlugin:
    """搜索功能插件 - 集成搜索 API"""

    def __init__(self, api_key: Optional[str] = None):
        self._api_key = api_key or ""
        self._http_client = httpx.AsyncClient(timeout=30.0)

    async def _close(self):
        """关闭 HTTP 客户端"""
        await self._http_client.aclose()

    @sk_function(
        description="使用必应搜索获取网络信息",
        name="bing_search",
        input_description="搜索查询词"
    )
    @sk_function_context_parameter(
        name="num_results",
        description="返回结果数量(1-10)",
        default_value="5"
    )
    async def bing_search(self, context: sk.KernelContext) -> str:
        """
        执行必应搜索

        注意:实际使用需要配置 BING API Key
        """

        query = context.input
        num_results = int(context.variables.get("num_results", "5"))

        if not query:
            return "请提供搜索关键词"

        # 如果没有配置 API key,返回示例数据
        if not self._api_key:
            return await self._mock_search(query, num_results)

        # 实际调用 Bing Search API
        try:
            headers = {
                "Ocp-Apim-Subscription-Key": self._api_key
            }

            params = {
                "q": query,
                "count": num_results,
                "mkt": "zh-CN"
            }

            response = await self._http_client.get(
                "https://api.bing.microsoft.com/v7.0/search",
                headers=headers,
                params=params
            )

            response.raise_for_status()
            data = response.json()

            return self._format_bing_results(data)

        except httpx.HTTPError as e:
            return f"搜索请求失败: {str(e)}"
        except Exception as e:
            return f"搜索出错: {str(e)}"

    async def _mock_search(self, query: str, num_results: int) -> str:
        """
        模拟搜索结果(用于测试和演示)
        实际使用时替换为真实的 API 调用
        """

        mock_data = {
            "webPages": {
                "value": [
                    {
                        "name": f"关于「{query}」的相关文章 1",
                        "url": f"https://example.com/article1",
                        "snippet": f"这是关于{query}的详细内容摘要..."
                    },
                    {
                        "name": f"关于「{query}」的相关文章 2",
                        "url": f"https://example.com/article2",
                        "snippet": f"深入探讨{query}的核心概念和应用场景..."
                    },
                    {
                        "name": f"「{query}」官方文档",
                        "url": f"https://example.com/docs",
                        "snippet": f"官方文档提供了{query}的完整指南..."
                    }
                ]
            }
        }

        return self._format_bing_results(mock_data)

    def _format_bing_results(self, data: dict) -> str:
        """格式化搜索结果"""

        results = data.get("webPages", {}).get("value", [])

        if not results:
            return "未找到相关结果"

        lines = ["搜索结果:", ""]

        for i, result in enumerate(results[:5], 1):
            lines.append(f"{i}. {result['name']}")
            lines.append(f"   {result['snippet']}")
            lines.append(f"   链接:{result['url']}")
            lines.append("")

        return "\n".join(lines)

    @sk_function(
        description="获取网页内容的摘要",
        name="get_page_summary",
        input_description="网页 URL"
    )
    async def get_page_summary(self, url: str) -> str:
        """
        获取网页内容摘要

        这是一个简化版本,实际应用中可能需要更复杂的 HTML 解析
        """

        if not url:
            return "请提供有效的 URL"

        try:
            response = await self._http_client.get(url)
            response.raise_for_status()

            # 简化处理:只获取前 500 个字符
            content = response.text[:500]

            # 移除 HTML 标签(简化版)
            import re
            content = re.sub(r'<[^>]+>', '', content)

            return (
                f"页面内容摘要:\n"
                f"{content.strip()}...\n"
                f"(完整内容需要进一步处理)"
            )

        except httpx.HTTPError as e:
            return f"获取页面失败: {str(e)}"

4.6 主 Agent 实现

现在我们将所有组件整合起来,创建一个完整的 AI Agent。

# core/agent.py - AI Agent 核心实现
import asyncio
from typing import Optional, List, Dict, Any
from dataclasses import dataclass

import semantic_kernel as sk
from semantic_kernel.planning.basic_planner import BasicPlanner
from semantic_kernel.planning.stepwise_planner import StepwisePlanner

from plugins.math_plugin import MathPlugin
from plugins.calendar_plugin import CalendarPlugin
from plugins.search_plugin import SearchPlugin

@dataclass
class AgentConfig:
    """Agent 配置"""
    name: str = "助手"
    description: str = "一个多功能 AI 助手"
    use_stepwise_planner: bool = True  # 是否使用渐进式规划器
    max_retry: int = 3  # 最大重试次数

class AssistantAgent:
    """
    多功能 AI 助手

    整合各种插件,提供智能化的问答和服务能力
    """

    def __init__(self, kernel: sk.Kernel, config: Optional[AgentConfig] = None):
        self.kernel = kernel
        self.config = config or AgentConfig()
        self.planner = None

        # 初始化插件
        self._initialize_plugins()

        # 初始化规划器
        self._initialize_planner()

        # 注册提示模板
        self._register_prompts()

    def _initialize_plugins(self) -> None:
        """初始化所有插件"""

        # 数学计算插件
        math_plugin = MathPlugin()
        self.kernel.import_skill(math_plugin, "math")

        # 日历管理插件
        calendar_plugin = CalendarPlugin()
        self.kernel.import_skill(calendar_plugin, "calendar")

        # 搜索插件
        search_plugin = SearchPlugin()
        self.kernel.import_skill(search_plugin, "search")

        print("=" * 50)
        print("插件加载完成")
        print("- math: 数学计算")
        print("- calendar: 日程管理")
        print("- search: 网络搜索")
        print("=" * 50)

    def _initialize_planner(self) -> None:
        """初始化规划器"""

        if self.config.use_stepwise_planner:
            self.planner = StepwisePlanner(
                self.kernel,
                max_retry=self.config.max_retry
            )
        else:
            self.planner = BasicPlanner()

        print(f"规划器已就绪: {type(self.planner).__name__}")

    def _register_prompts(self) -> None:
        """注册语义函数提示模板"""

        # 系统提示词
        self._system_prompt = """
你是一个智能助手,名字叫「{name}」。

你的能力包括:
1. 数学计算:可以进行加减乘除等运算
2. 日程管理:创建、查询、删除日程
3. 网络搜索:获取实时信息

当你需要执行具体操作时,请使用可用的函数。
回答时要简洁、有帮助,并且符合用户的语言习惯。
""".format(name=self.config.name)

    async def chat(self, user_input: str, context: Optional[Dict] = None) -> str:
        """
        处理用户对话

        Args:
            user_input: 用户输入
            context: 可选的上下文信息

        Returns:
            Agent 的回复
        """

        # 构建聊天历史
        chat_history = context.get("chat_history", []) if context else []

        # 使用规划器自动分解任务
        if self.planner:
            try:
                plan = await self.planner.create_plan(user_input)

                # 执行计划
                result = await plan.execute()

                return result

            except Exception as e:
                print(f"规划执行出错: {e}")
                # 降级为直接调用
                return await self._direct_chat(user_input)
        else:
            return await self._direct_chat(user_input)

    async def _direct_chat(self, user_input: str) -> str:
        """直接对话模式(不使用规划器)"""

        try:
            # 获取 AI 服务
            service = self.kernel.get_service()

            # 构建完整的提示
            full_prompt = self._system_prompt + f"\n\n用户:{user_input}\n\n助手:"

            # 调用 AI
            response = await service.complete_async(
                prompt=full_prompt,
                request_settings={
                    "temperature": 0.7,
                    "max_tokens": 2000
                }
            )

            return response.text

        except Exception as e:
            return f"处理请求时出错: {str(e)}"

    async def process_stream(self, user_input: str):
        """
        流式响应模式

        适用于需要实时显示响应的场景
        """

        service = self.kernel.get_service()

        full_prompt = self._system_prompt + f"\n\n用户:{user_input}\n\n助手:"

        # 流式生成
        async for token in service.complete_stream_async(
            prompt=full_prompt,
            request_settings={"max_tokens": 2000}
        ):
            yield token

    async def run_task(self, task: str) -> str:
        """
        运行特定任务

        适用于需要执行一系列操作的复杂任务
        """

        if "日程" in task or "calendar" in task.lower():
            return await self._handle_calendar_task(task)

        if "搜索" in task or "search" in task.lower():
            return await self._handle_search_task(task)

        if any(op in task for op in ["加", "减", "乘", "除", "+", "-", "*", "/"]):
            return await self._handle_math_task(task)

        # 默认走对话流程
        return await self.chat(task)

    async def _handle_calendar_task(self, task: str) -> str:
        """处理日历相关任务"""

        context = self.kernel.create_new_context()

        if "创建" in task or "添加" in task:
            context.variables.set("date", "")
            context.variables.set("time", "")
            context.variables.set("duration", "60")
            context.variables.set("description", "")
            return await self.kernel.run_async(
                self.kernel.skills["calendar"]["create_event"],
                input_str=task,
                context=context
            )

        if "查询" in task or "列表" in task:
            # 提取日期
            import re
            date_match = re.search(r'\d{4}-\d{2}-\d{2}', task)
            date = date_match.group(0) if date_match else ""

            return await self.kernel.run_async(
                self.kernel.skills["calendar"]["list_events"],
                input_str=date
            )

        return await self.chat(task)

    async def _handle_search_task(self, task: str) -> str:
        """处理搜索任务"""

        # 提取搜索关键词
        keywords = task.replace("搜索", "").replace("查找", "").strip()

        context = self.kernel.create_new_context()
        context.variables.set("num_results", "5")

        return await self.kernel.run_async(
            self.kernel.skills["search"]["bing_search"],
            input_str=keywords,
            context=context
        )

    async def _handle_math_task(self, task: str) -> str:
        """处理数学计算任务"""

        # 简单的数学表达式解析
        import re

        # 尝试匹配加法
        add_match = re.search(r'(\d+)\s*\+\s*(\d+)', task)
        if add_match:
            a, b = add_match.groups()
            return await self.kernel.run_async(
                self.kernel.skills["math"]["add"],
                input_str=f"{a},{b}"
            )

        # 尝试匹配减法
        sub_match = re.search(r'(\d+)\s*-\s*(\d+)', task)
        if sub_match:
            a, b = sub_match.groups()
            return await self.kernel.run_async(
                self.kernel.skills["math"]["subtract"],
                input_str=f"{a},{b}"
            )

        # 尝试匹配乘法
        mul_match = re.search(r'(\d+)\s*\*\s*(\d+)', task)
        if mul_match:
            a, b = mul_match.groups()
            return await self.kernel.run_async(
                self.kernel.skills["math"]["multiply"],
                input_str=f"{a},{b}"
            )

        # 尝试匹配除法
        div_match = re.search(r'(\d+)\s*/\s*(\d+)', task)
        if div_match:
            a, b = div_match.groups()
            return await self.kernel.run_async(
                self.kernel.skills["math"]["divide"],
                input_str=f"{a},{b}"
            )

        return await self.chat(task)


# 使用示例和测试
async def main():
    """主函数 - 演示 Agent 的使用"""

    print("初始化 AI Agent...")
    print()

    # 创建 Kernel
    kernel = (
        KernelBuilder()
        .with_azure_openai(
            endpoint="https://your-resource.openai.azure.com",
            api_key="your-key",
            deployment_name="gpt-4"
        )
        .build()
    )

    # 创建 Agent
    agent = AssistantAgent(
        kernel,
        config=AgentConfig(name="小助手")
    )

    print()
    print("=" * 50)
    print("Agent 已就绪!开始对话测试")
    print("=" * 50)

    # 测试对话
    test_inputs = [
        "你好,请介绍一下你自己",
        "帮我计算 123 + 456",
        "帮我创建一个明天上午10点的会议,主题是周例会",
        "搜索一下 AI Agent 的最新发展动态"
    ]

    for user_input in test_inputs:
        print()
        print(f"用户: {user_input}")
        print("-" * 40)

        response = await agent.chat(user_input)
        print(f"助手: {response}")

        print()
        print("-" * 40)

if __name__ == "__main__":
    asyncio.run(main())

五、常见使用场景与最佳实践

在实际应用中,Agent Framework 可以解决很多典型问题。下面我们通过几个场景来展示如何最佳实践。

5.1 场景一:构建客服机器人

客服机器人是最常见的应用场景之一。Agent Framework 的插件系统让这变得非常简单。

# scenarios/customer_service.py - 客服机器人实现
from typing import Optional, Dict, List
from enum import Enum
import semantic_kernel as sk

class Intent(Enum):
    """用户意图枚举"""
    GREETING = "greeting"
    PRODUCT_INQUIRY = "product_inquiry"
    ORDER_STATUS = "order_status"
    COMPLAINT = "complaint"
    REFUND = "refund"
    GOODBYE = "goodbye"
    UNKNOWN = "unknown"

class ProductDatabase:
    """产品数据库(模拟)"""

    def __init__(self):
        self._products = {
            "笔记本": {
                "price": "5999",
                "stock": "50",
                "description": "高性能商务笔记本,搭载最新处理器"
            },
            "无线鼠标": {
                "price": "199",
                "stock": "200",
                "description": "人体工学设计,支持蓝牙和USB双模连接"
            },
            "机械键盘": {
                "price": "399",
                "stock": "100",
                "description": "青轴手感,支持RGB背光"
            }
        }

    def search(self, keyword: str) -> Optional[Dict]:
        """搜索产品"""
        for name, info in self._products.items():
            if keyword in name:
                return {"name": name, **info}
        return None

    def get_all(self) -> List[Dict]:
        """获取所有产品"""
        return [{"name": k, **v} for k, v in self._products.items()]

class CustomerServicePlugin:
    """客服插件"""

    def __init__(self):
        self._products = ProductDatabase()
        self._order_db = {}  # 简化订单存储

    @sk_function(
        description="识别用户意图",
        name="identify_intent",
        input_description="用户的消息内容"
    )
    def identify_intent(self, message: str) -> str:
        """识别用户意图"""

        message = message.lower()

        # 意图识别逻辑
        if any(word in message for word in ["你好", "hi", "hello", "嗨"]):
            return Intent.GREETING.value

        if any(word in message for word in ["产品", "商品", "有什么", "卖什么"]):
            return Intent.PRODUCT_INQUIRY.value

        if any(word in message for word in ["订单", "物流", "发货", "到了没"]):
            return Intent.ORDER_STATUS.value

        if any(word in message for word in ["投诉", "不满", "差", "垃圾"]):
            return Intent.COMPLAINT.value

        if any(word in message for word in ["退款", "退货", "取消订单"]):
            return Intent.REFUND.value

        if any(word in message for word in ["再见", "bye", "结束"]):
            return Intent.GOODBYE.value

        return Intent.UNKNOWN.value

    @sk_function(
        description="查询产品信息",
        name="search_product",
        input_description="产品名称或关键词"
    )
    def search_product(self, keyword: str) -> str:
        """搜索产品"""

        if not keyword or keyword in ["全部", "所有", "list"]:
            products = self._products.get_all()
            if not products:
                return "目前没有在售产品"

            lines = ["目前在售的产品:"]
            for p in products:
                lines.append(f"- {p['name']}: ¥{p['price']}")
                lines.append(f"  {p['description']}")
                lines.append("")
            return "\n".join(lines)

        product = self._products.search(keyword)
        if not product:
            return f"未找到「{keyword}」相关产品"

        return (
            f"产品:{product['name']}\n"
            f"价格:¥{product['price']}\n"
            f"库存:{product['stock']}\n"
            f"描述:{product['description']}"
        )

    @sk_function(
        description="查询订单状态",
        name="check_order",
        input_description="订单号"
    )
    def check_order(self, order_id: str) -> str:
        """查询订单状态"""

        # 模拟订单数据
        mock_orders = {
            "ORD001": {"status": "已发货", "express": "SF1234567890"},
            "ORD002": {"status": "处理中", "express": ""},
            "ORD003": {"status": "已完成", "express": ""}
        }

        if order_id in mock_orders:
            order = mock_orders[order_id]
            result = f"订单 {order_id} 状态:{order['status']}"
            if order['express']:
                result += f"\n快递单号:{order['express']}"
            return result

        return f"未找到订单 {order_id},请确认订单号是否正确"

    @sk_function(
        description="提交投诉工单",
        name="submit_complaint",
        input_description="投诉内容"
    )
    def submit_complaint(self, content: str) -> str:
        """提交投诉"""

        # 生成工单号
        ticket_id = f"COMP{len(self._order_db) + 1:04d}"

        return (
            f"您的投诉已受理\n"
            f"工单号:{ticket_id}\n"
            f"我们的客服人员将在24小时内与您联系\n"
            f"感谢您的反馈!"
        )

    @sk_function(
        description="处理退款请求",
        name="process_refund",
        input_description="订单号和退款原因"
    )
    def process_refund(self, request: str) -> str:
        """处理退款"""

        return (
            f"退款申请已提交\n"
            f"订单号将在系统中核实\n"
            f"退款预计在3-5个工作日内原路返回\n"
            f"如有疑问,请联系人工客服"
        )

class CustomerServiceAgent:
    """客服 Agent"""

    def __init__(self, kernel: sk.Kernel):
        self.kernel = kernel
        self.plugin = CustomerServicePlugin()
        self.kernel.import_skill(self.plugin, "customer_service")

        self._system_prompt = """
你是一个专业、友好的在线客服助手。

回复原则:
1. 热情礼貌,用语专业
2. 站在用户角度思考问题
3. 复杂问题及时转人工
4. 禁止承诺无法兑现的事项
5. 涉及金钱、隐私等问题要谨慎处理

根据识别的用户意图,调用相应的函数来处理请求。
"""

    async def handle_message(self, message: str) -> str:
        """处理用户消息"""

        # 识别意图
        intent = self.plugin.identify_intent(message)

        # 根据意图处理
        if intent == Intent.GREETING.value:
            return self._handle_greeting()

        elif intent == Intent.PRODUCT_INQUIRY.value:
            return self._handle_product_inquiry(message)

        elif intent == Intent.ORDER_STATUS.value:
            return self._handle_order_inquiry(message)

        elif intent == Intent.COMPLAINT.value:
            return self._handle_complaint(message)

        elif intent == Intent.REFUND.value:
            return self._handle_refund(message)

        elif intent == Intent.GOODBYE.value:
            return "感谢您的咨询,祝您生活愉快!再见!"

        else:
            return await self._handle_general(message)

    def _handle_greeting(self) -> str:
        """处理问候"""
        return (
            "您好!欢迎光临!😊\n\n"
            "我可以帮您:\n"
            "- 查询产品信息\n"
            "- 查看订单状态\n"
            "- 处理投诉建议\n"
            "- 办理退款退货\n\n"
            "请告诉我您的需求~"
        )

    def _handle_product_inquiry(self, message: str) -> str:
        """处理产品咨询"""
        # 提取产品关键词
        keywords = message.replace("产品", "").replace("商品", "")
        keywords = keywords.replace("有什么", "").replace("卖什么", "")
        keywords = keywords.strip()

        return self.plugin.search_product(keywords)

    def _handle_order_inquiry(self, message: str) -> str:
        """处理订单查询"""
        import re
        # 尝试提取订单号
        match = re.search(r'ORD\d+', message)
        if match:
            order_id = match.group(0)
        else:
            return "请提供您的订单号,格式如:ORD001"

        return self.plugin.check_order(order_id)

    def _handle_complaint(self, message: str) -> str:
        """处理投诉"""
        return self.plugin.submit_complaint(message)

    def _handle_refund(self, message: str) -> str:
        """处理退款"""
        return self.plugin.process_refund(message)

    async def _handle_general(self, message: str) -> str:
        """处理通用问题"""
        return (
            "抱歉,我没能理解您的问题。\n"
            "您可以尝试:\n"
            "- 询问我们的产品\n"
            "- 查询订单状态\n"
            "- 提交投诉建议\n\n"
            "或者直接告诉我您想做什么~"
        )

5.2 场景二:文档分析与处理

Agent Framework 同样适合处理文档相关的任务,比如自动提取信息、生成摘要等。

# scenarios/document_processor.py - 文档处理示例
from typing import Optional, List, Dict
from dataclasses import dataclass
import re

@dataclass
class Document:
    """文档数据类"""
    title: str
    content: str
    metadata: Optional[Dict] = None

@dataclass
class ExtractedInfo:
    """提取的信息"""
    emails: List[str]
    phone_numbers: List[str]
    dates: List[str]
    urls: List[str]
    key_points: List[str]

class DocumentProcessor:
    """文档处理器"""

    def __init__(self):
        self._email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        self._phone_pattern = r'\b1[3-9]\d{9}\b'
        self._date_pattern = r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}日?'
        self._url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'

    @sk_function(
        description="从文档中提取联系信息",
        name="extract_contacts",
        input_description="待处理的文档内容"
    )
    def extract_contacts(self, text: str) -> str:
        """提取联系方式"""

        emails = re.findall(self._email_pattern, text)
        phones = re.findall(self._phone_pattern, text)

        result = []

        if emails:
            result.append("邮箱地址:")
            for email in set(emails):
                result.append(f"  - {email}")

        if phones:
            result.append("电话号码:")
            for phone in set(phones):
                result.append(f"  - {phone}")

        if not result:
            return "未在文档中找到联系信息"

        return "\n".join(result)

    @sk_function(
        description="从文档中提取日期信息",
        name="extract_dates",
        input_description="待处理的文档内容"
    )
    def extract_dates(self, text: str) -> str:
        """提取日期信息"""

        dates = re.findall(self._date_pattern, text)

        if not dates:
            return "未在文档中找到日期信息"

        # 标准化日期格式
        normalized = []
        for date in set(dates):
            normalized_date = date.replace("年", "-").replace("月", "-").replace("日", "")
            normalized.append(normalized_date)

        result = ["找到的日期信息:"]
        for date in normalized:
            result.append(f"  - {date}")

        return "\n".join(result)

    @sk_function(
        description="提取文档中的超链接",
        name="extract_urls",
        input_description="待处理的文档内容"
    )
    def extract_urls(self, text: str) -> str:
        """提取 URL"""

        urls = re.findall(self._url_pattern, text)

        if not urls:
            return "未在文档中找到链接"

        result = ["找到的链接:"]
        for url in set(urls):
            result.append(f"  - {url}")

        return "\n".join(result)

    @sk_function(
        description="提取文档中的关键信息",
        name="extract_key_info",
        input_description="待处理的文档内容"
    )
    def extract_key_info(self, text: str) -> str:
        """
        提取关键信息

        注意:这是一个基于规则的简化版本
        实际应用中可以使用 AI 模型进行更智能的提取
        """

        # 提取所有联系信息
        contacts = self.extract_contacts(text)

        # 提取日期
        dates = self.extract_dates(text)

        # 提取链接
        urls = self.extract_urls(text)

        # 简单统计
        lines = text.split("\n")
        non_empty_lines = [l for l in lines if l.strip()]

        summary = f"""
文档统计:
- 总行数:{len(lines)}
- 非空行数:{len(non_empty_lines)}
- 字符数:{len(text)}

提取的信息:
{contacts}

{dates}

{urls}
"""
        return summary.strip()

    @sk_function(
        description="检测文档语言",
        name="detect_language",
        input_description="待检测的文本内容"
    )
    def detect_language(self, text: str) -> str:
        """
        检测语言

        简化版本:基于中文字符比例判断
        实际应用中可使用专门的语言检测库
        """

        if not text:
            return "无法检测空文档的语言"

        # 统计中文字符
        chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
        total_chars = len(text.strip())

        chinese_ratio = chinese_chars / total_chars if total_chars > 0 else 0

        if chinese_ratio > 0.3:
            return f"中文(简体),置信度约 {int(chinese_ratio * 100)}%"
        else:
            return f"英文或其他语言,中文占比约 {int(chinese_ratio * 100)}%"

5.3 场景三:数据分析和报告生成

这个场景展示了如何构建一个数据分析助手,它可以处理数据并生成结构化的报告。

# scenarios/data_analyst.py - 数据分析助手
from typing import List, Dict, Optional
from dataclasses import dataclass
import statistics

@dataclass
class SalesRecord:
    """销售记录"""
    date: str
    product: str
    quantity: int
    unit_price: float
    region: str

class DataAnalysisPlugin:
    """数据分析插件"""

    def __init__(self):
        # 模拟销售数据
        self._data: List[SalesRecord] = [
            SalesRecord("2024-01-01", "笔记本", 10, 5999, "华东"),
            SalesRecord("2024-01-01", "鼠标", 50, 199, "华北"),
            SalesRecord("2024-01-02", "键盘", 30, 399, "华东"),
            SalesRecord("2024-01-02", "笔记本", 8, 5999, "华南"),
            SalesRecord("2024-01-03", "显示器", 15, 1999, "华北"),
        ]

    @sk_function(
        description="获取销售数据概览",
        name="get_overview",
        input_description="无需输入,填写'概览'即可"
    )
    def get_overview(self, _: str) -> str:
        """获取数据概览"""

        total_revenue = sum(r.quantity * r.unit_price for r in self._data)
        total_orders = len(self._data)
        avg_order_value = total_revenue / total_orders if total_orders > 0 else 0

        return f"""
销售数据概览
========================================
总订单数:{total_orders}
总收入:¥{total_revenue:,.2f}
平均订单金额:¥{avg_order_value:,.2f}
========================================
"""

    @sk_function(
        description="按产品分组统计",
        name="sales_by_product",
        input_description="填写'产品统计'即可"
    )
    def sales_by_product(self, _: str) -> str:
        """按产品统计"""

        product_stats: Dict[str, Dict] = {}

        for record in self._data:
            if record.product not in product_stats:
                product_stats[record.product] = {
                    "quantity": 0,
                    "revenue": 0
                }

            product_stats[record.product]["quantity"] += record.quantity
            product_stats[record.product]["revenue"] += record.quantity * record.unit_price

        lines = ["按产品销售统计", "=" * 40, ""]

        for product, stats in sorted(product_stats.items(), 
                                      key=lambda x: x[1]["revenue"], 
                                      reverse=True):
            lines.append(f"{product}:")
            lines.append(f"  销量:{stats['quantity']}件")
            lines.append(f"  收入:¥{stats['revenue']:,.2f}")
            lines.append("")

        return "\n".join(lines)

    @sk_function(
        description="按地区分组统计",
        name="sales_by_region",
        input_description="填写'地区统计'即可"
    )
    def sales_by_region(self, _: str) -> str:
        """按地区统计"""

        region_stats: Dict[str, Dict] = {}

        for record in self._data:
            if record.region not in region_stats:
                region_stats[record.region] = {
                    "quantity": 0,
                    "revenue": 0
                }

            region_stats[record.region]["quantity"] += record.quantity
            region_stats[record.region]["revenue"] += record.quantity * record.unit_price

        lines = ["按地区销售统计", "=" * 40, ""]

        for region, stats in sorted(region_stats.items(), 
                                     key=lambda x: x[1]["revenue"], 
                                     reverse=True):
            lines.append(f"{region}:")
            lines.append(f"  销量:{stats['quantity']}件")
            lines.append(f"  收入:¥{stats['revenue']:,.2f}")
            lines.append("")

        return "\n".join(lines)

    @sk_function(
        description="查询特定产品的销售",
        name="query_product",
        input_description="产品名称"
    )
    def query_product(self, product_name: str) -> str:
        """查询产品销售情况"""

        matching = [r for r in self._data 
                   if product_name.lower() in r.product.lower()]

        if not matching:
            return f"未找到「{product_name}」的销售记录"

        total_quantity = sum(r.quantity for r in matching)
        total_revenue = sum(r.quantity * r.unit_price for r in matching)

        lines = [f"「{product_name}」销售情况", "=" * 40, ""]
        lines.append(f"销售笔数:{len(matching)}")
        lines.append(f"总销量:{total_quantity}件")
        lines.append(f"总收入:¥{total_revenue:,.2f}")
        lines.append("")
        lines.append("明细:")

        for record in matching:
            revenue = record.quantity * record.unit_price
            lines.append(f"  {record.date} | {record.region} | "
                        f"{record.quantity}件 | ¥{revenue:,.2f}")

        return "\n".join(lines)

    @sk_function(
        description="生成销售报告",
        name="generate_report",
        input_description="报告标题"
    )
    def generate_report(self, title: str) -> str:
        """生成完整销售报告"""

        # 收集数据
        total_revenue = sum(r.quantity * r.unit_price for r in self._data)
        total_orders = len(self._data)

        # 产品排名
        product_stats: Dict[str, float] = {}
        for record in self._data:
            product_stats[record.product] = (
                product_stats.get(record.product, 0) + 
                record.quantity * record.unit_price
            )

        top_products = sorted(product_stats.items(), 
                              key=lambda x: x[1], 
                              reverse=True)[:3]

        # 地区排名
        region_stats: Dict[str, float] = {}
        for record in self._data:
            region_stats[record.region] = (
                region_stats.get(record.region, 0) + 
                record.quantity * record.unit_price
            )

        top_regions = sorted(region_stats.items(), 
                            key=lambda x: x[1], 
                            reverse=True)

        # 生成报告
        report = f"""
{'=' * 50}
{title}
{'=' * 50}

一、整体概览
----------------------------------------
总订单数:{total_orders}
总收入:¥{total_revenue:,.2f}

二、热销产品 TOP 3
----------------------------------------
"""

        for i, (product, revenue) in enumerate(top_products, 1):
            percentage = (revenue / total_revenue * 100) if total_revenue > 0 else 0
            report += f"{i}. {product}: ¥{revenue:,.2f} ({percentage:.1f}%)\n"

        report += "\n三、区域销售分布\n----------------------------------------\n"
        for region, revenue in top_regions:
            percentage = (revenue / total_revenue * 100) if total_revenue > 0 else 0
            report += f"- {region}: ¥{revenue:,.2f} ({percentage:.1f}%)\n"

        report += f"\n{'=' * 50}\n报告生成时间:自动生成\n{'=' * 50}\n"

        return report

六、高级技巧与最佳实践

掌握这些高级技巧可以让你的 Agent 开发更加得心应手。

6.1 异步编程模式

Agent Framework 全面支持异步操作,这在处理 I/O 密集型任务时至关重要。

# advanced/async_patterns.py - 异步编程模式
import asyncio
from typing import List, Callable, Any
import semantic_kernel as sk

class AsyncAgentPatterns:
    """异步 Agent 模式示例"""

    def __init__(self, kernel: sk.Kernel):
        self.kernel = kernel

    async def process_batch(self, items: List[str], 
                           processor: Callable) -> List[str]:
        """
        批量处理任务

        使用信号量控制并发数量,避免 API 限流
        """

        semaphore = asyncio.Semaphore(3)  # 最多同时3个任务

        async def limited_process(item: str) -> str:
            async with semaphore:
                return await processor(item)

        tasks = [limited_process(item) for item in items]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 处理可能的异常
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append(f"处理失败: {str(result)}")
            else:
                processed_results.append(result)

        return processed_results

    async def stream_response(self, prompt: str):
        """
        流式响应处理

        适用于需要实时显示 AI 回复的场景
        """

        service = self.kernel.get_service()

        full_text = []

        async for chunk in service.complete_stream_async(prompt):
            full_text.append(chunk)
            yield chunk

        return "".join(full_text)

    async def with_timeout(self, coro, timeout: float = 30.0) -> Any:
        """
        带超时的异步调用

        防止无限等待
        """

        try:
            return await asyncio.wait_for(coro, timeout=timeout)
        except asyncio.TimeoutError:
            return f"操作超时({timeout}秒)"

    async def with_retry(
        self, 
        coro, 
        max_retries: int = 3, 
        delay: float = 1.0
    ) -> Any:
        """
        带重试的异步调用

        指数退避策略
        """

        last_exception = None

        for attempt in range(max_retries):
            try:
                return await coro
            except Exception as e:
                last_exception = e

                if attempt < max_retries - 1:
                    wait_time = delay * (2 ** attempt)
                    await asyncio.sleep(wait_time)
                    print(f"重试 {attempt + 1}/{max_retries},等待 {wait_time}秒...")

        raise last_exception

# 使用示例
async def main():
    kernel = sk.Kernel()
    patterns = AsyncAgentPatterns(kernel)

    # 批量处理示例
    items = ["任务1", "任务2", "任务3", "任务4", "任务5"]

    async def process_item(item: str) -> str:
        await asyncio.sleep(1)  # 模拟处理
        return f"已处理: {item}"

    print("开始批量处理...")
    results = await patterns.process_batch(items, process_item)

    for result in results:
        print(result)

6.2 错误处理与日志记录

健壮的错误处理是生产级应用的基础。

# advanced/error_handling.py - 错误处理与日志
import logging
from typing import Optional, Callable, Any
from functools import wraps
import semantic_kernel as sk

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AgentFramework")

class AgentError(Exception):
    """Agent 基础异常"""
    pass

class PluginError(AgentError):
    """插件执行错误"""
    pass

class PlanExecutionError(AgentError):
    """计划执行错误"""
    pass

class ErrorHandler:
    """错误处理器"""

    def __init__(self):
        self._error_counts: dict[str, int] = {}

    def record_error(self, error_type: str, error: Exception):
        """记录错误"""

        self._error_counts[error_type] = self._error_counts.get(error_type, 0) + 1

        logger.error(
            f"错误类型: {error_type}\n"
            f"错误信息: {str(error)}\n"
            f"累计次数: {self._error_counts[error_type]}"
        )

    def get_error_stats(self) -> dict[str, int]:
        """获取错误统计"""
        return self._error_counts.copy()

    def should_retry(self, error_type: str, max_retries: int = 3) -> bool:
        """判断是否应该重试"""
        return self._error_counts.get(error_type, 0) < max_retries

def with_error_handling(func: Callable) -> Callable:
    """
    错误处理装饰器

    自动捕获异常并记录日志
    """

    @wraps(func)
    async def async_wrapper(*args, **kwargs) -> Any:
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            logger.error(f"执行 {func.__name__} 时出错: {e}")
            raise

    @wraps(func)
    def sync_wrapper(*args, **kwargs) -> Any:
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f"执行 {func.__name__} 时出错: {e}")
            raise

    import asyncio
    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    else:
        return sync_wrapper

class SafeAgent:
    """带错误处理的 Agent"""

    def __init__(self, kernel: sk.Kernel):
        self.kernel = kernel
        self.error_handler = ErrorHandler()

    @with_error_handling
    async def execute_safely(self, task: str) -> str:
        """
        安全执行任务

        捕获所有异常,返回友好的错误信息
        """

        try:
            # 尝试执行
            result = await self._execute_task(task)
            return result

        except PluginError as e:
            self.error_handler.record_error("plugin", e)
            return f"插件执行出错,请稍后重试"

        except PlanExecutionError as e:
            self.error_handler.record_error("plan", e)
            return f"任务规划失败,尝试简化请求"

        except Exception as e:
            self.error_handler.record_error("unknown", e)
            logger.exception("未知错误")
            return "发生未知错误,请联系管理员"

    async def _execute_task(self, task: str) -> str:
        """实际执行任务的内部方法"""

        # 这里是实际的任务执行逻辑
        pass

    def get_health_status(self) -> dict:
        """获取 Agent 健康状态"""

        stats = self.error_handler.get_error_stats()

        return {
            "status": "healthy" if sum(stats.values()) == 0 else "degraded",
            "total_errors": sum(stats.values()),
            "error_breakdown": stats
        }

6.3 性能优化建议

针对大规模部署场景的性能优化技巧。

# advanced/performance.py - 性能优化
import time
from functools import wraps
from typing import Optional, Dict, Any
import semantic_kernel as sk

class PerformanceMonitor:
    """性能监控器"""

    def __init__(self):
        self._metrics: Dict[str, list[float]] = {}

    def record(self, operation: str, duration: float):
        """记录操作耗时"""
        if operation not in self._metrics:
            self._metrics[operation] = []
        self._metrics[operation].append(duration)

    def get_stats(self, operation: str) -> Optional[Dict[str, float]]:
        """获取操作统计"""

        if operation not in self._metrics:
            return None

        durations = self._metrics[operation]

        return {
            "count": len(durations),
            "total": sum(durations),
            "avg": sum(durations) / len(durations),
            "min": min(durations),
            "max": max(durations)
        }

    def get_all_stats(self) -> Dict[str, Dict[str, float]]:
        """获取所有统计"""
        return {op: self.get_stats(op) for op in self._metrics.keys()}

def timed(func: Callable) -> Callable:
    """计时装饰器"""

    @wraps(func)
    async def async_wrapper(*args, **kwargs) -> Any:
        start = time.time()
        try:
            return await func(*args, **kwargs)
        finally:
            duration = time.time() - start
            # 可以在这里记录到监控器
            print(f"{func.__name__} 耗时: {duration:.3f}s")

    @wraps(func)
    def sync_wrapper(*args, **kwargs) -> Any:
        start = time.time()
        try:
            return func(*args, **kwargs)
        finally:
            duration = time.time() - start
            print(f"{func.__name__} 耗时: {duration:.3f}s")

    import asyncio
    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    else:
        return sync_wrapper

class CachedKernel:
    """
    带缓存的 Kernel 包装器

    避免重复创建 Kernel 实例
    """

    _instance: Optional[sk.Kernel] = None
    _last_created: float = 0
    _ttl: float = 3600  # 1小时过期

    @classmethod
    def get_instance(cls, recreate: bool = False) -> sk.Kernel:
        """
        获取单例 Kernel

        Args:
            recreate: 是否强制重新创建
        """

        now = time.time()

        if cls._instance is None or recreate or (now - cls._last_created) > cls._ttl:
            cls._instance = sk.Kernel()
            cls._last_created = now

        return cls._instance

class PromptTemplateCache:
    """Prompt 模板缓存"""

    def __init__(self, max_size: int = 100):
        self._cache: Dict[str, Any] = {}
        self._max_size = max_size
        self._access_count: Dict[str, int] = {}

    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""

        if key in self._cache:
            self._access_count[key] = self._access_count.get(key, 0) + 1
            return self._cache[key]

        return None

    def set(self, key: str, value: Any):
        """设置缓存"""

        # 如果缓存已满,删除最少使用的项
        if len(self._cache) >= self._max_size:
            min_key = min(self._access_count, key=self._access_count.get)
            del self._cache[min_key]
            del self._access_count[min_key]

        self._cache[key] = value
        self._access_count[key] = 0

    def clear(self):
        """清空缓存"""
        self._cache.clear()
        self._access_count.clear()

七、总结与资源链接

微软 Agent Framework 代表了 AI 应用开发的新范式。通过将复杂的 AI 能力封装为可组合的插件,它让开发者能够专注于业务逻辑而非底层实现。无论是构建智能客服、数据分析工具,还是更复杂的 AI Agent 系统,这个框架都提供了优雅且高效的解决方案。

框架的核心优势总结

首先,统一的抽象层大大简化了 AI 应用的开发复杂度。Kernel、Plugins、Memory 三个核心概念相互配合,形成了一个完整的开发体系。其次,对多 AI 服务的支持让你可以在同一个应用中灵活切换不同的 AI 提供商,无论是 Azure OpenAI、OpenAI 还是其他服务。最后,强大的规划能力让复杂任务的自动化处理成为可能,这正是 AI Agent 的核心价值所在。

推荐学习路径

对于初学者,建议从本教程的基础部分开始,首先理解 Kernel 和 Plugins 的概念,然后逐步尝试构建自己的第一个 AI 应用。对于有经验的开发者,可以深入研究规划器(Planner)和记忆系统的实现,这些高级特性能够解锁更多应用场景。对于企业用户,推荐关注框架的安全特性、监控能力和生产环境部署的最佳实践。

相关资源链接

Semantic Kernel 官方文档提供了完整的 API 参考和更多示例代码。微软 Learn 上的 AI Engineer 路径包含系统化的学习资源。GitHub 社区中有大量的开源插件和工具可以参考。Azure AI Studio 提供了云端开发和部署的完整工具链。

展望未来

随着大语言模型能力的不断提升,Agent Framework 也在持续演进。未来版本中将支持更多智能规划能力、更好的多模态支持,以及更丰富的企业级特性。微软正在积极开发新一代的 Autonomous Agents 功能,这将让 AI Agent 能够自主完成更复杂的任务。

掌握 Agent Framework 不仅能帮助你构建当下最先进的 AI 应用,更能为迎接 AI 技术下一波浪潮做好准备。希望这份教程能成为你探索 AI Agent 开发之旅的良好起点。祝你编码愉快!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注