# MCP Server 开发指南:从协议原理到生产级实现
前言
Model Context Protocol(MCP)正在成为 AI 工具生态的核心接口标准。它由 Anthropic 在 2024 年末提出,旨在解决一个根本问题:每个 AI 应用和每个数据源之间不应该需要单独编写适配代码。一套协议、无数工具、即插即用。
MCP 目前的生态已经相当丰富:官方提供文件系统、GitHub、Slack、Puppeteer 等十余种预置服务器,社区更是贡献了几百个覆盖各领域的实现。然而,当现成服务器无法满足需求时,如何自己动手开发一个?本文将深入 MCP 协议的设计细节,详细讲解如何使用 Python SDK 从零构建一个生产级的 MCP Server,涵盖 stdio 和 HTTP 两种传输方式、完整的错误处理、以及调试排障技巧。
一、协议架构解析
1.1 MCP 在 AI 工具链中的定位
传统 AI 工具集成采用点对点模式:LangChain 有自己的 Tool 接口、OpenAI 有 Function Calling、Claude 有 Tool Use。每个平台各有一套,互不兼容。当你想让同一个工具同时服务于多个 AI 平台时,需要为每个平台单独编写适配层。
MCP 试图成为这个问题的终结者。它的设计目标单一而明确:为 AI 模型与外部工具之间提供一个统一的双向通信协议。无论你用 Claude、GPT 还是 Gemini,只要它们支持 MCP,就能直接调用任何 MCP Server 暴露的工具,而不需要额外的桥接代码。
传统模式:
Claude Tool Use → 自定义 JSON Schema → 你的 API
GPT Function Calling → 不同格式 → 你的 API
Gemini Tools → 又一种格式 → 你的 API
❌ 每个 AI 平台都要单独适配
MCP 模式:
MCP Server → MCP 协议 ← Claude / GPT / Gemini / 任何兼容 AI
✅ 一次实现,所有 AI 平台通用
1.2 三层结构
MCP 协议分为三层,每层职责清晰:
传输层(Transport Layer):负责客户端与服务端之间的实际数据传递。目前支持两种传输方式:
– Stdio Transport:通过标准输入/输出进行进程间通信。AI 应用启动 MCP Server 作为子进程,通过 stdin 发送请求、stdout 接收响应。适用于本地工具集成,延迟最低。
– HTTP/SSE Transport:通过 HTTP REST 接口通信,支持 Server-Sent Events(SSE)进行服务端推送。适用于远程工具服务,可以跨网络调用。
JSON-RPC 层(Message Layer):所有 MCP 消息都遵循 JSON-RPC 2.0 规范。请求有 id、method、params;响应有 id、result 或 error。这使得协议完全无状态,易于调试和代理。
// MCP 请求示例
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "filesystem_read_file",
"arguments": {"path": "/tmp/test.txt"}
}
}
// MCP 响应示例
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{"type": "text", "text": "Hello, MCP!"}
]
}
}
应用层(Capability Layer):MCP 定义了四类核心能力,Server 可以声明自己支持哪些:
| 能力 | 描述 | 典型用途 |
roots |
文件系统根目录告知 | 限定工具可访问的目录范围 |
sampling |
服务端主动请求 LLM | 让工具调用触发 AI 推理 |
resources |
暴露可读取的数据 | 数据库内容、API 响应、文件 |
tools |
AI 可调用的函数 | 执行代码、查询数据、操作文件 |
prompts |
预定义的提示模板 | 封装复杂的多轮对话模式 |
1.3 生命周期与握手流程
MCP 客户端与服务端建立连接后,首先交换初始化握手:
客户端 → 发送 initialize 请求(包含客户端能力)
服务端 → 响应 initialized 确认(包含服务端能力)
客户端 → 发送 initialized 通知(握手完成)
双方 → 开始正常的请求/响应循环
这个设计保证了能力协商的原子性:双方在开始通信前就知道对方支持哪些功能,不会在运行时因为能力不匹配而出现静默失败。
二、Python SDK 核心 API
2.1 SDK 概览
MCP 官方提供 Python 和 TypeScript 两种 SDK。Python SDK 通过 mcp 包提供,核心类是 Server——所有 MCP Server 都是它的子类或实例。
# 安装 MCP Python SDK
pip install mcp
SDK 的设计哲学是声明式的:你通过装饰器和配置声明 Server 的能力,SDK 负责处理协议序列化和传输逻辑,你只需要专注于业务实现。
2.2 Server 的创建与配置
from mcp.server import Server
from mcp.types import Tool, Resource
# 创建 Server 实例,name 必须与传输配置中的名称一致
app = Server("my-filesystem-server")
# Server 可以声明自己的能力(可选,SDK 会自动推断)
async def list_tools():
return [
Tool(
name="read_file",
description="读取文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
)
]
2.3 工具(Tools)的定义与注册
Tools 是 MCP Server 最核心的能力。它们让 AI 模型能够执行实际操作。
from mcp.server import Server
from mcp.types import Tool, CallToolResult, TextContent
from mcp.server.stdio import stdio_server
import asyncio
app = Server("weather-server")
@app.list_tools()
async def list_tools():
"""声明服务器提供的所有工具"""
return [
Tool(
name="get_weather",
description="查询指定城市的实时天气",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称(中文或英文)"
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位",
"default": "celsius"
}
},
"required": ["city"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> CallToolResult:
"""处理工具调用请求"""
if name == "get_weather":
city = arguments["city"]
units = arguments.get("units", "celsius")
# 调用天气 API(实际项目中替换为真实 API)
temperature = await fetch_weather(city, units)
return CallToolResult(
content=[TextContent(type="text", text=f"{city} 当前温度: {temperature}°")]
)
else:
raise ValueError(f"Unknown tool: {name}")
inputSchema 字段遵循 JSON Schema 规范,这是 MCP 能够进行参数验证和 AI 自动补全的关键。Schema 定义得越精确,AI 调用时的类型安全性越高。
2.4 资源(Resources)的暴露
Resources 允许 Server 主动暴露数据供 AI 读取,与 Tools 的主动调用形成互补:
from mcp.types import Resource
@app.list_resources()
async def list_resources():
return [
Resource(
uri="weather://cities",
name="支持的城市列表",
description="所有支持实时天气的城市",
mimeType="application/json"
),
Resource(
uri="weather://api-status",
name="API 状态",
description="天气 API 的当前可用性状态",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
if uri == "weather://cities":
return json.dumps(["北京", "上海", "深圳", "广州", "杭州"])
elif uri == "weather://api-status":
status = await check_api_status()
return f"API Status: {status}"
else:
raise ValueError(f"Unknown resource: {uri}")
Resources 的一个重要特性是 AI 可以在需要时主动读取,而不需要通过 Tool 调用。这对于数据库内容查询、配置读取等只读操作特别有用。
三、从零构建:完整 MCP Server 示例
3.1 项目结构
我们构建一个支持文件搜索和内容读取的文件系统工具 Server,用于演示完整开发流程:
mcp-file-tools/
├── pyproject.toml
├── src/
│ └── file_tools/
│ ├── __init__.py
│ └── server.py
└── README.md
3.2 项目配置(pyproject.toml)
[project]
name = "mcp-file-tools"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"mcp>=1.0.0",
"aiofiles>=23.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"httpx>=0.27.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.mcp]
# MCP SDK 配置,可被 MCP 客户端识别
name = "file-tools"
version = "0.1.0"
3.3 核心实现
# src/file_tools/server.py
"""
MCP File Tools Server — 支持文件搜索和内容读取的示例 MCP Server
"""
from __future__ import annotations
import os
import asyncio
import aiofiles
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.types import (
Tool,
Resource,
CallToolResult,
TextContent,
ListToolsResult,
ListResourcesResult,
ReadResourceResult,
)
from mcp.server.stdio import stdio_server
# 创建 Server 实例
app = Server("file-tools")
# ==================== Tools 实现 ====================
@app.list_tools()
async def list_tools() -> ListToolsResult:
"""声明可用的工具"""
return [
Tool(
name="search_files",
description="在指定目录下递归搜索包含关键词的文件",
inputSchema={
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "搜索的根目录路径"
},
"pattern": {
"type": "string",
"description": "文件名匹配模式(如 .py, .md)"
},
"content_match": {
"type": "string",
"description": "文件内容必须包含的关键词(可选)"
},
"max_results": {
"type": "integer",
"description": "最多返回多少个文件",
"default": 20
}
},
"required": ["directory", "pattern"]
}
),
Tool(
name="read_file",
description="安全地读取文件内容(限制文件大小和访问路径)",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "要读取的文件路径"
},
"max_size": {
"type": "integer",
"description": "最大读取字节数",
"default": 1048576 # 1MB
},
"line_start": {
"type": "integer",
"description": "从第几行开始(1-indexed)",
"default": 1
},
"line_end": {
"type": "integer",
"description": "到第几行结束"
}
},
"required": ["path"]
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> CallToolResult:
"""处理工具调用"""
if name == "search_files":
return await _search_files(**arguments)
elif name == "read_file":
return await _read_file(**arguments)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"Unknown tool: {name}")],
isError=True
)
async def _search_files(
directory: str,
pattern: str,
content_match: str | None = None,
max_results: int = 20,
) -> CallToolResult:
"""搜索文件的核心实现"""
# 安全检查:防止路径遍历
base_path = Path(directory).resolve()
if not base_path.exists() or not base_path.is_dir():
return CallToolResult(
content=[TextContent(type="text", text=f"目录不存在或不是有效目录: {directory}")],
isError=True
)
# 限制搜索深度,防止恶意扫描
max_depth = 5
results = []
pattern_glob = pattern # 直接作为 glob pattern 使用
try:
for path in base_path.rglob(pattern_glob):
if len(results) >= max_results:
break
# 检查深度
depth = len(path.relative_to(base_path).parts)
if depth > max_depth:
continue
# 内容匹配检查
if content_match:
try:
async with aiofiles.open(path, 'r', encoding='utf-8', errors='ignore') as f:
content = await f.read(10240) # 只读前 10KB
if content_match.lower() not in content.lower():
continue
except PermissionError:
continue
results.append(str(path))
except PermissionError:
return CallToolResult(
content=[TextContent(type="text", text=f"权限不足,无法访问目录: {directory}")],
isError=True
)
if not results:
return CallToolResult(
content=[TextContent(type="text", text=f"未找到匹配 {pattern} 的文件")]
)
output = f"找到 {len(results)} 个匹配文件:\n\n"
output += "\n".join(f"- {p}" for p in results)
return CallToolResult(content=[TextContent(type="text", text=output)])
async def _read_file(
path: str,
max_size: int = 1048576,
line_start: int = 1,
line_end: int | None = None,
) -> CallToolResult:
"""读取文件的核心实现"""
file_path = Path(path).resolve()
# 安全检查:防止路径遍历
if not file_path.exists():
return CallToolResult(
content=[TextContent(type="text", text=f"文件不存在: {path}")],
isError=True
)
if not file_path.is_file():
return CallToolResult(
content=[TextContent(type="text", text=f"路径不是文件: {path}")],
isError=True
)
# 文件大小检查
file_size = file_path.stat().st_size
if file_size > max_size:
return CallToolResult(
content=[TextContent(
type="text",
text=f"文件大小 ({file_size} bytes) 超过限制 ({max_size} bytes)"
)],
isError=True
)
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = await f.read()
lines = content.split('\n')
if line_end:
selected = lines[line_start - 1:line_end]
else:
selected = lines[line_start - 1:]
selected_content = '\n'.join(selected)
preview = selected_content[:5000] # 限制返回大小
if len(selected_content) > 5000:
preview += f"\n... (内容过长,已截断前 5000 字符)"
return CallToolResult(
content=[TextContent(
type="text",
text=f"文件: {path}\n行数: {len(lines)}\n---\n{preview}"
)]
)
except PermissionError:
return CallToolResult(
content=[TextContent(type="text", text=f"权限不足,无法读取文件: {path}")],
isError=True
)
# ==================== 主入口 ====================
async def main():
"""启动 stdio 传输的 MCP Server"""
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
3.4 运行与测试
# 直接运行(stdio 模式,输出到 stdout)
python -m file_tools.server
# 使用 mcp CLI 测试
# 首先安装 MCP CLI
npm install -g @modelcontextprotocol/cli
# 测试工具列表
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' | python -m file_tools.server
# 测试工具调用
echo '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_file","arguments":{"path":"/tmp/test.txt"}}}' | python -m file_tools.server
3.5 客户端调用示例
在其他 MCP 客户端(如 Claude Desktop、Cursor、Cline)中,配置 mcp_servers:
# Claude Desktop 或其他 MCP 客户端配置
mcp_servers:
file-tools:
command: "python"
args: ["-m", "file_tools.server"]
env:
# 可以传递环境变量
FILE_TOOLS_ROOT: "/home/user/projects"
四、HTTP/SSE 传输方式
4.1 适用场景
Stdio 传输非常适合本地集成,但当你需要跨机器调用、或者希望多个 AI 应用共享同一个 Server 实例时,HTTP/SSE 传输是更好的选择。
HTTP 传输的核心区别:
– Server 作为长期运行的网络服务启动
– 客户端通过 HTTP POST 发送请求
– 服务端通过 Server-Sent Events(SSE)推送响应
– 支持身份认证和流量控制
4.2 实现 HTTP 传输
# src/file_tools/http_server.py
from mcp.server import Server
from mcp.server.fastmcp import FastMCP
from mcp.server.http import HttpServer
import asyncio
# FastMCP 是 Server 的高级封装,简化了常见模式
mcp = FastMCP("file-tools-http")
@mcp.tool()
def read_file(path: str, max_size: int = 1048576) -> str:
"""通过 HTTP 暴露的工具"""
with open(path, 'r') as f:
return f.read(max_size)
async def main():
# 在指定端口启动 HTTP 服务
server = HttpServer(
app=mcp,
host="0.0.0.0",
port=8080,
debug=True,
)
await server.run()
if __name__ == "__main__":
asyncio.run(main())
4.3 带身份验证的配置
from mcp.server import Server
from mcp.server.auth import BearerAuth
async def main():
auth = BearerAuth(
validator=lambda token: token == "your-secret-token"
)
server = HttpServer(
app=app,
host="0.0.0.0",
port=8080,
auth=auth,
)
await server.run()
客户端调用时需要携带 Bearer Token:
curl -X POST http://localhost:8080/mcp \
-H "Authorization: Bearer your-secret-token" \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{...}}'
五、生产级最佳实践
5.1 错误处理策略
MCP Server 中的错误处理有三个原则:
1. 所有工具函数必须 try/catch
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> CallToolResult:
try:
if name == "sensitive_operation":
return await _sensitive_operation(**arguments)
raise ValueError(f"Unknown tool: {name}")
except PermissionError:
return CallToolResult(
content=[TextContent(type="text", text="权限不足")],
isError=True
)
except TimeoutError:
return CallToolResult(
content=[TextContent(type="text", text="操作超时")],
isError=True
)
except Exception as e:
# 生产环境不要暴露内部错误细节
return CallToolResult(
content=[TextContent(type="text", text="操作失败,请重试")],
isError=True
)
2. Schema 验证优先于运行时检查
在 inputSchema 中严格定义参数类型和范围,让 AI 在调用前就能发现参数问题,而不是等到执行时才失败。
3. 超时控制
import asyncio
async def call_with_timeout(coro, timeout_seconds: float = 30):
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"操作超过 {timeout_seconds} 秒限制")
5.2 安全防护
路径遍历防护(必须):
from pathlib import Path
def safe_path(base_dir: str, user_path: str) -> Path:
"""确保返回的路径在 base_dir 内,防止路径遍历攻击"""
base = Path(base_dir).resolve()
target = (base / user_path).resolve()
# 验证 target 在 base 的子树下
try:
target.relative_to(base)
except ValueError:
raise PermissionError(f"访问被拒绝: {user_path} 超出允许范围")
return target
输入大小限制:
MAX_STRING_LENGTH = 100_000 # 100KB
MAX_ARRAY_LENGTH = 1000
def validate_input(data: Any, path: str = "root"):
if isinstance(data, str) and len(data) > MAX_STRING_LENGTH:
raise ValueError(f"{path}: 字符串长度超过限制")
elif isinstance(data, list) and len(data) > MAX_ARRAY_LENGTH:
raise ValueError(f"{path}: 数组长度超过限制")
elif isinstance(data, dict):
for k, v in data.items():
validate_input(v, f"{path}.{k}")
5.3 日志与可观测性
import logging
import structlog
# 结构化日志配置
logging.basicConfig(level=logging.INFO)
logger = structlog.get_logger()
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> CallToolResult:
logger.info("tool_called", tool=name, args_keys=list(arguments.keys()))
start = asyncio.get_event_loop().time()
try:
result = await _execute_tool(name, arguments)
elapsed = asyncio.get_event_loop().time() - start
logger.info("tool_completed", tool=name, elapsed_ms=round(elapsed * 1000, 2))
return result
except Exception as e:
elapsed = asyncio.get_event_loop().time() - start
logger.error("tool_failed", tool=name, error=str(e), elapsed_ms=round(elapsed * 1000, 2))
raise
5.4 性能优化
异步文件 I/O:使用 aiofiles 而非同步 open(),避免阻塞事件循环。
批量操作:如果工具需要处理大量数据,支持分页返回:
@mcp.tool()
def search_files(directory: str, pattern: str, page: int = 1, page_size: int = 50):
all_results = perform_search(directory, pattern)
total = len(all_results)
start = (page - 1) * page_size
end = start + page_size
page_results = all_results[start:end]
return {
"results": page_results,
"total": total,
"page": page,
"page_size": page_size,
"has_more": end < total
}
六、调试技巧
6.1 使用 MCP Inspector
MCP 官方提供了 mcp inspector,可以交互式地测试 Server:
# 安装并启动 inspector
npx @modelcontextprotocol/inspector python -m file_tools.server
# 浏览器会自动打开,提供图形界面测试所有工具
6.2 手动协议调试
如果 inspector 不可用,可以直接手动发送 JSON-RPC 消息测试:
# 列出所有工具
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' \
python -m file_tools.server 2>/dev/null jq .
# 调用工具
echo '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_file","arguments":{"path":"README.md"}}}' \
python -m file_tools.server 2>/dev/null jq .
6.3 常见错误排查
| 错误现象 | 原因 | 解决方案 |
Method not found |
装饰器未正确注册 | 检查 @app.list_tools() 装饰器是否在 app.run() 之前执行 |
| 参数类型错误 | inputSchema 与实际参数不匹配 |
验证 JSON Schema 中的 type 是否与 Python 类型对应 |
| Stdio 无响应 | Server 启动时向 stderr 写入了非 JSON 内容 | 确保所有调试输出到 stderr 或日志文件 |
| HTTP 429 | 请求频率超限 | 实现请求节流(rate limiting) |
| 文件读取失败 | 权限不足或路径不存在 | 添加更详细的错误日志,验证文件路径 |
6.4 集成测试框架
# tests/test_server.py
import pytest
from mcp.server.testing import create_test_client
from file_tools.server import app
@pytest.mark.asyncio
async def test_list_tools():
async with create_test_client(app) as client:
result = await client.list_tools()
assert any(t.name == "read_file" for t in result.tools)
assert any(t.name == "search_files" for t in result.tools)
@pytest.mark.asyncio
async def test_read_file_success():
async with create_test_client(app) as client:
result = await client.call_tool("read_file", {"path": "README.md"})
assert not result.isError
assert "README" in result.content[0].text
@pytest.mark.asyncio
async def test_path_traversal_blocked():
async with create_test_client(app) as client:
result = await client.call_tool("read_file", {"path": "/etc/passwd"})
assert result.isError
七、快速启动模板
如果你想快速搭建一个新 MCP Server,可以基于以下模板:
# mcp_template.py — 最小可用 MCP Server
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, CallToolResult, TextContent
import asyncio
app = Server("my-server")
@app.list_tools()
async def list_tools():
return [
Tool(
name="my_tool",
description="我的第一个 MCP 工具",
inputSchema={
"type": "object",
"properties": {
"input": {"type": "string", "description": "输入内容"}
},
"required": ["input"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "my_tool":
return CallToolResult(
content=[TextContent(type="text", text=f"收到: {arguments['input']}")]
)
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read, write):
await app.run(read, write, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())
运行测试:
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' | python mcp_template.py
总结
MCP 协议的设计简洁而强大,Python SDK 的抽象程度恰到好处——既隐藏了协议序列化的繁琐细节,又没有过度封装导致调试困难。开发一个基础 MCP Server 的核心工作量在 2-3 小时,而一旦完成,所有支持 MCP 的 AI 平台都能立即使用。
生产级 MCP Server 需要关注三个核心维度:正确性(Schema 验证、错误处理、路径安全)、可观测性(结构化日志、调用追踪)、性能(异步 I/O、超时控制)。在这些基础上,根据具体业务场景添加缓存、认证、限流等进阶能力。
MCP 的生态正在快速发展。如果你遇到无法满足的特殊需求,完全可以自己实现一个 Server——Python SDK 的学习曲线平缓,协议文档清晰,一个周末就能从零到生产可用。
评论区