n8n+MCP强强联合:让AI助手真正掌控你的自动化工作流
你是否曾为这样的场景感到困扰:想要让AI助手帮你完成复杂的工作流程,但AI只能回答问题,无法真正执行操作?想要把AI能力嵌入到自己的自动化流程中,却发现接口对接困难重重?
今天要介绍的这个开源项目——czlonkowski/n8n-mcp,正是为解决这些痛点而生。它将n8n强大的工作流自动化能力与MCP(Model Context Protocol)完美结合,让AI模型不再只是”纸上谈兵”的助手,而是能够真正操控自动化流程的执行者。
在本文中,我将带你从零开始,全面掌握这个强大工具的使用方法。
为什么值得关注
传统自动化工具的局限性
市面上的自动化工具种类繁多,从IFTTT、Zapier到n8n,每一款都有其独特优势。然而,它们普遍存在一个共同问题:AI能力的缺失或不足。
传统的自动化流程是这样的:触发条件 → 执行预设动作 → 结束。整个过程是静态的、预设的,无法根据上下文智能决策。
举例来说,当你设置一个”收到邮件 → 自动回复”的流程时,回复内容必须是预先编写好的模板。但现实情况是,每封邮件的内容各不相同,用同一个模板回复往往显得生硬甚至适得其反。
n8n-MCP带来的变革
n8n-mcp项目正是为了打破这一局限而诞生的。它的核心价值在于:
1. 赋予AI执行能力
通过MCP协议,AI模型可以直接与n8n工作流交互。AI不再只是给出建议,而是能够:
- 查询当前工作流的执行状态
- 触发特定工作流的执行
- 监控工作流运行结果
- 动态调整工作流参数
2. 双向通信机制
传统的API调用是单向的——你发送请求,服务器返回结果。而n8n-mcp实现了真正的双向通信:
- AI可以向n8n发送指令
- n8n可以向AI反馈执行结果
- AI可以根据反馈做出下一步决策
3. 降低开发门槛
项目提供了简洁的接口和丰富的示例,即使你不是专业开发者,也能快速上手。你不需要深入了解底层实现,只需要关注如何利用这一能力解决实际问题。
实际应用场景
想象一下这样的场景:
你的AI助手收到用户咨询:”帮我查询昨天的销售数据,然后用邮件发给市场部同事。”
在传统的实现中,这需要:
- 手动调用销售数据API
- 手动格式化数据
- 手动调用邮件发送API
而在n8n-mcp的加持下,AI可以:
- 调用n8n中预设的”查询销售数据”工作流
- 获取数据后,调用”发送邮件”工作流
- 整个过程自动完成,无需人工干预
这就是n8n-mcp的魅力所在——让AI从”建议者”真正变成”执行者”。
环境搭建
前置要求
在开始之前,你需要确保具备以下环境:
基础环境:
- Node.js 18.0 或更高版本
- npm 或 yarn 包管理器
- 基本的命令行操作能力
推荐环境:
- Docker(用于运行n8n)
- 基础的JavaScript/TypeScript知识
- 对API概念有基本理解
安装步骤
第一步:安装n8n
n8n是整个系统的核心工作流引擎。你可以通过多种方式安装n8n:
方式一:使用Docker(推荐)
# 创建n8n数据存储目录
mkdir -p ~/.n8n
# 启动n8n容器
docker run -d \
--name n8n \
-p 5678:5678 \
-v ~/.n8n:/home/node/.n8n \
n8nio/n8n
容器启动后,打开浏览器访问 http://localhost:5678,你将看到n8n的工作界面。
方式二:全局安装n8n
# 通过npm全局安装
npm install n8n -g
# 启动n8n
n8n start
第二步:安装n8n-mcp
项目提供了两种安装方式,你可以根据需求选择。
方式一:从源码安装
# 克隆仓库
git clone https://github.com/czlonkowski/n8n-mcp.git
# 进入项目目录
cd n8n-mcp
# 安装依赖
npm install
# 构建项目
npm run build
方式二:通过npm包安装
# 在你的项目中安装
npm install n8n-mcp
第三步:配置环境变量
项目需要配置一些环境变量才能正常工作。创建一个 .env 文件:
# n8n服务器地址
N8N_BASE_URL=http://localhost:5678
# n8n API访问令牌
N8N_API_KEY=your_n8n_api_key_here
# MCP服务器端口
MCP_SERVER_PORT=3000
# 日志级别(可选)
LOG_LEVEL=info
如何获取n8n API密钥:
- 登录n8n界面
- 点击右上角的用户头像
- 选择”Settings”
- 找到”API”选项卡
- 点击”Create API Key”生成新的密钥
- 复制生成的密钥并保存到
.env文件中
第四步:启动MCP服务器
配置完成后,启动MCP服务器:
# 开发模式(带热重载)
npm run dev
# 生产模式
npm start
服务器启动后,你将看到类似以下的输出:
[INFO] MCP Server running on port 3000
[INFO] Connected to n8n at http://localhost:5678
[INFO] Available workflows: 15
这表示MCP服务器已成功连接到n8n,并同步了你的工作流列表。
验证安装
为了确认一切正常工作,我们来做个简单的测试。创建一个测试文件 test.js:
// 导入n8n-mcp客户端
const { N8nMcpClient } = require('n8n-mcp');
async function testConnection() {
// 初始化客户端
const client = new N8nMcpClient({
baseUrl: process.env.N8N_BASE_URL,
apiKey: process.env.N8N_API_KEY
});
try {
// 测试连接
const status = await client.healthCheck();
console.log('连接状态:', status);
// 获取工作流列表
const workflows = await client.listWorkflows();
console.log('可用工作流数量:', workflows.length);
// 列出工作流名称
workflows.forEach(wf => {
console.log(`- ${wf.name} (ID: ${wf.id})`);
});
} catch (error) {
console.error('连接失败:', error.message);
}
}
testConnection();
运行测试:
node test.js
如果一切正常,你应该能看到n8n中所有工作流的列表。
核心功能详解
MCP协议概述
在深入功能之前,我们需要先理解MCP(Model Context Protocol)协议。MCP是一种专门为AI模型设计的通信协议,它定义了:
工具定义格式
MCP使用JSON Schema来定义工具,让AI能够理解每个工具的用途、参数和返回值:
{
"name": "trigger_workflow",
"description": "触发指定n8n工作流的执行",
"parameters": {
"type": "object",
"properties": {
"workflow_id": {
"type": "string",
"description": "工作流的唯一标识符"
},
"input_data": {
"type": "object",
"description": "传递给工作流的输入数据"
}
},
"required": ["workflow_id"]
}
}
通信机制
MCP采用JSON-RPC 2.0作为底层通信协议,支持:
- 请求-响应模式:客户端发送请求,服务器返回结果
- 通知模式:单向发送消息,无需响应
- 流式响应:支持实时返回执行进度
n8n-mcp的核心能力
1. 工作流管理
列出所有工作流
// 获取所有工作流的详细信息
const workflows = await client.listWorkflows({
includeInactive: false // 是否包含已禁用的工作流
});
// 工作流对象包含以下字段:
// - id: 唯一标识符
// - name: 工作流名称
// - active: 是否处于激活状态
// - createdAt: 创建时间
// - updatedAt: 最后更新时间
获取单个工作流详情
// 根据ID获取工作流详细信息
const workflow = await client.getWorkflow('workflow_id_here');
// 返回的工作流对象包含完整的节点配置
console.log('工作流节点数:', workflow.nodes.length);
console.log('工作流连接:', workflow.connections);
创建新工作流
// 通过代码创建新工作流
const newWorkflow = await client.createWorkflow({
name: '我的自动化工作流',
nodes: [
{
name: 'Webhook',
type: 'n8n-nodes-base.webhook',
parameters: {},
position: [250, 300]
},
{
name: 'HTTP Request',
type: 'n8n-nodes-base.httpRequest',
parameters: {
method: 'GET',
url: 'https://api.example.com/data'
},
position: [450, 300]
}
],
connections: {
'Webhook': {
'main': [[{
'node': 'HTTP Request',
'type': 'main',
'index': 0
}]]
}
}
});
2. 工作流执行
触发工作流执行
这是最核心的功能之一。你可以触发任意工作流并传递参数:
// 触发工作流执行
const execution = await client.executeWorkflow('workflow_id_here', {
// 输入数据,会作为webhook或初始节点的输入
userId: '12345',
action: 'process_order',
orderData: {
items: [
{ name: '产品A', quantity: 2 },
{ name: '产品B', quantity: 1 }
],
totalAmount: 299.99
}
});
console.log('执行ID:', execution.id);
console.log('执行状态:', execution.status);
获取执行结果
// 查询执行结果
const result = await client.getExecutionResult('execution_id_here');
console.log('执行状态:', result.status);
console.log('输出数据:', result.data);
console.log('执行耗时:', result.executionTime, 'ms');
// result.data包含工作流最后节点的输出
// 可以遍历查看每个节点的执行结果
if (result.data.resultData) {
Object.keys(result.data.resultData).forEach(nodeName => {
console.log(`节点 ${nodeName} 输出:`, result.data.resultData[nodeName]);
});
}
等待执行完成
对于耗时较长的工作流,你可以选择等待执行完成:
// 同步执行,等待结果返回
const result = await client.executeWorkflowAndWait('workflow_id_here', {
data: 'your input data'
}, {
timeout: 60000, // 超时时间(毫秒)
pollInterval: 1000 // 轮询间隔(毫秒)
});
console.log('最终结果:', result);
3. 实时监控
事件订阅
你可以通过事件订阅来监控工作流的状态变化:
// 订阅工作流执行事件
client.subscribe('workflow.execution.started', (event) => {
console.log('工作流开始执行:', event.workflowId);
});
client.subscribe('workflow.execution.completed', (event) => {
console.log('工作流执行完成:', {
workflowId: event.workflowId,
executionId: event.executionId,
status: event.status,
duration: event.duration
});
});
client.subscribe('workflow.execution.failed', (event) => {
console.error('工作流执行失败:', {
workflowId: event.workflowId,
error: event.error
});
});
执行历史查询
// 获取工作流的执行历史
const history = await client.getExecutionHistory('workflow_id_here', {
limit: 20, // 返回数量限制
includeData: true // 是否包含执行数据
});
history.forEach(exec => {
const status = exec.status === 'success' ? '✓' : '✗';
console.log(`${status} ${exec.id} - ${exec.startedAt} (${exec.executionTime}ms)`);
});
4. 动态工作流修改
这可能是最强大的功能——你可以在运行时动态修改工作流:
// 更新工作流的某个节点
await client.updateWorkflowNode('workflow_id_here', 'node_name', {
parameters: {
url: 'https://new-api-endpoint.com/data',
method: 'POST'
}
});
// 添加新节点到现有工作流
await client.addWorkflowNode('workflow_id_here', {
name: 'New Node',
type: 'n8n-nodes-base.httpRequest',
parameters: {
method: 'GET',
url: 'https://example.com/api'
},
position: [650, 300]
});
工具集完整列表
n8n-mcp提供了丰富的工具集,以下是完整的工具列表:
| 工具名称 | 功能描述 | 主要参数 |
|---|---|---|
list_workflows |
列出所有工作流 | include_inactive |
get_workflow |
获取工作流详情 | workflow_id |
create_workflow |
创建新工作流 | workflow_data |
update_workflow |
更新工作流 | workflow_id, data |
delete_workflow |
删除工作流 | workflow_id |
execute_workflow |
触发执行 | workflow_id, input_data |
get_execution |
获取执行详情 | execution_id |
get_execution_history |
获取执行历史 | workflow_id |
activate_workflow |
激活工作流 | workflow_id |
deactivate_workflow |
停用工作流 | workflow_id |
add_node |
添加节点 | workflow_id, node_data |
update_node |
更新节点 | workflow_id, node_name, data |
实战教程
教程一:构建AI助手控制的邮件自动化系统
在这个教程中,我们将构建一个完整的自动化系统:让AI助手能够根据用户指令,自动发送定制化的邮件。
第一步:在n8n中创建邮件工作流
- 登录n8n界面
- 点击”Create New Workflow”创建新工作流
- 添加以下节点:
节点1:Webhook(触发器)
// 配置webhook节点
{
"path": "send-custom-email",
"responseMode": "responseNode",
"options": {}
}
节点2:Set(数据格式化)
// 设置邮件内容
{
"mode": "manual",
"assignments": {
"assignments": [
{
"name": "to",
"value": "={{ $json.body.to }}"
},
{
"name": "subject",
"value": "={{ $json.body.subject }}"
},
{
"name": "content",
"value": "={{ $json.body.content }}"
}
]
},
"options": {}
}
节点3:Email Send(发送邮件)
// 配置邮件发送
{
"fromEmail": "noreply@yourcompany.com",
"to": "={{ $json.to }}",
"subject": "={{ $json.subject }}",
"text": "={{ $json.content }}",
"emailFormat": "html"
}
- 保存工作流并激活
第二步:创建MCP集成代码
创建一个完整的集成文件 email-automation.js:
/**
* AI邮件自动化助手
*
* 这个模块展示了如何将n8n-mcp与AI助手结合,
* 实现智能化的邮件发送功能
*/
const { N8nMcpClient } = require('n8n-mcp');
class EmailAutomationAssistant {
constructor(config) {
this.client = new N8nMcpClient(config);
this.webhookBaseUrl = config.webhookBaseUrl;
}
/**
* 初始化连接
*/
async initialize() {
console.log('正在连接到n8n...');
const status = await this.client.healthCheck();
console.log('连接成功! n8n版本:', status.version);
// 获取邮件工作流
const workflows = await this.client.listWorkflows();
this.emailWorkflow = workflows.find(w =>
w.name.includes('email') || w.name.includes('邮件')
);
if (!this.emailWorkflow) {
throw new Error('未找到邮件工作流,请先在n8n中创建');
}
console.log('邮件工作流已就绪:', this.emailWorkflow.name);
}
/**
* 发送简单邮件
*
* @param {string} to - 收件人邮箱
* @param {string} subject - 邮件主题
* @param {string} content - 邮件内容
*/
async sendSimpleEmail(to, subject, content) {
try {
const result = await this.client.executeWorkflow(
this.emailWorkflow.id,
{
to,
subject,
content,
timestamp: new Date().toISOString()
}
);
console.log('邮件发送任务已提交');
console.log('执行ID:', result.id);
return {
success: true,
executionId: result.id
};
} catch (error) {
console.error('发送失败:', error.message);
return {
success: false,
error: error.message
};
}
}
/**
* 发送批量邮件
*
* @param {Array} recipients - 收件人列表
* @param {string} subject - 邮件主题
* @param {Function} contentGenerator - 内容生成函数
*/
async sendBatchEmails(recipients, subject, contentGenerator) {
const results = [];
for (const recipient of recipients) {
const content = await contentGenerator(recipient);
const result = await this.sendSimpleEmail(
recipient.email,
subject,
content
);
results.push({
recipient: recipient.email,
...result
});
// 避免发送过快
await this.delay(1000);
}
return {
total: recipients.length,
success: results.filter(r => r.success).length,
failed: results.filter(r => !r.success).length,
details: results
};
}
/**
* AI驱动的智能邮件发送
* 根据用户指令自动处理邮件发送
*
* @param {string} instruction - AI指令
*/
async handleAIInstruction(instruction) {
// 模拟AI解析指令
const parsed = this.parseInstruction(instruction);
if (parsed.action === 'send_email') {
return await this.sendSimpleEmail(
parsed.to,
parsed.subject,
parsed.content
);
}
if (parsed.action === 'send_batch') {
return await this.sendBatchEmails(
parsed.recipients,
parsed.subject,
parsed.contentGenerator
);
}
return { error: '无法识别的指令' };
}
/**
* 解析AI指令(简化版本)
* 实际应用中,这里会接入真正的AI模型
*/
parseInstruction(instruction) {
// 简化实现,实际应使用AI模型解析
const lower = instruction.toLowerCase();
if (lower.includes('发送') && lower.includes('邮件')) {
// 提取收件人、主题、内容(实际应用中由AI提取)
return {
action: 'send_email',
to: 'recipient@example.com',
subject: '自动发送的邮件',
content: '这是由AI助手自动生成的邮件内容'
};
}
throw new Error('无法解析指令');
}
// 工具方法:延时
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 主程序
async function main() {
const assistant = new EmailAutomationAssistant({
baseUrl: process.env.N8N_BASE_URL,
apiKey: process.env.N8N_API_KEY,
webhookBaseUrl: process.env.WEBHOOK_BASE_URL
});
try {
// 初始化
await assistant.initialize();
// 示例1:发送简单邮件
console.log('\n--- 示例1:发送简单邮件 ---');
const simpleResult = await assistant.sendSimpleEmail(
'user@example.com',
'测试邮件',
'<h1>这是一封测试邮件</h1><p>来自n8n-mcp自动化系统</p>'
);
console.log('结果:', simpleResult);
// 示例2:批量发送
console.log('\n--- 示例2:批量发送邮件 ---');
const batchRecipients = [
{ email: 'user1@example.com', name: '用户1' },
{ email: 'user2@example.com', name: '用户2' },
{ email: 'user3@example.com', name: '用户3' }
];
const batchResult = await assistant.sendBatchEmails(
batchRecipients,
'批量测试邮件',
(recipient) => `亲爱的 ${recipient.name},这是一封个性化邮件!`
);
console.log('批量发送结果:', batchResult);
} catch (error) {
console.error('程序执行错误:', error);
}
}
// 运行主程序
main();
第三步:运行测试
node email-automation.js
预期输出:
正在连接到n8n...
连接成功! n8n版本: 1.0.0
邮件工作流已就绪: Email Automation Workflow
--- 示例1:发送简单邮件 ---
邮件发送任务已提交
执行ID: abc123-def456
结果: { success: true, executionId: 'abc123-def456' }
--- 示例2:批量发送邮件 ---
批量发送结果: { total: 3, success: 3, failed: 0, details: [...] }
教程二:构建AI数据分析助手
在这个教程中,我们将创建一个AI数据分析助手,让用户可以用自然语言查询数据。
第一步:创建数据查询工作流
在n8n中创建以下工作流:
节点1:Webhook
{
"path": "data-query",
"httpMethod": "POST"
}
节点2:Code(数据处理)
// 这个节点模拟数据查询,实际可能连接数据库或API
function processQuery($input, $vars) {
const query = $input.body.query;
const params = $input.body.params || {};
// 模拟数据分析逻辑
const results = {
query: query,
results: generateSampleData(query, params),
timestamp: new Date().toISOString(),
recordCount: 10
};
return results;
}
// 生成示例数据(实际应用中替换为真实数据查询)
function generateSampleData(query, params) {
const data = [];
const count = params.limit || 10;
for (let i = 0; i < count; i++) {
data.push({
id: i + 1,
value: Math.random() * 1000,
category: ['A', 'B', 'C'][i % 3],
date: new Date(Date.now() - i * 86400000).toISOString()
});
}
return data;
}
return processQuery($input, $vars);
节点3:AI Processing(可选)
如果安装了AI相关节点,可以添加自然语言处理能力:
{
"model": "gpt-4",
"prompt": "分析以下数据,用简洁的语言总结关键洞察:\n{{ $json.results }}",
"temperature": 0.3
}
第二步:创建数据查询客户端
/**
* AI数据分析助手
*
* 允许用户使用自然语言查询数据,
* 系统自动调用后端工作流进行分析
*/
const { N8nMcpClient } = require('n8n-mcp');
class DataAnalysisAssistant {
constructor(config) {
this.client = new N8nMcpClient(config);
}
async initialize() {
await this.client.healthCheck();
console.log('数据助手初始化完成');
// 查找数据查询工作流
const workflows = await this.client.listWorkflows();
this.queryWorkflow = workflows.find(w =>
w.name.toLowerCase().includes('data') ||
w.name.toLowerCase().includes('query') ||
w.name.toLowerCase().includes('分析')
);
if (!this.queryWorkflow) {
throw new Error('未找到数据查询工作流');
}
}
/**
* 执行数据查询
*
* @param {string} naturalQuery - 自然语言查询
* @param {Object} options - 查询选项
*/
async query(naturalQuery, options = {}) {
console.log('处理查询:', naturalQuery);
// 解析自然语言为结构化查询
const structuredQuery = this.nlpToQuery(naturalQuery);
// 执行工作流
const execution = await this.client.executeWorkflow(
this.queryWorkflow.id,
{
query: structuredQuery.query,
params: {
limit: options.limit || 10,
startDate: structuredQuery.dateRange?.start,
endDate: structuredQuery.dateRange?.end,
filters: structuredQuery.filters
},
originalQuery: naturalQuery
}
);
// 等待结果
const result = await this.client.waitForExecution(
execution.id,
{ timeout: 30000 }
);
return this.formatResults(result, naturalQuery);
}
/**
* 简单的NLP解析(简化版本)
* 实际应用中应接入专门的NLP服务
*/
nlpToQuery(naturalQuery) {
const query = {};
const lower = naturalQuery.toLowerCase();
// 提取日期范围
if (lower.includes('昨天')) {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
query.dateRange = {
start: this.startOfDay(yesterday),
end: this.endOfDay(yesterday)
};
}
if (lower.includes('本周')) {
const now = new Date();
query.dateRange = {
start: this.startOfWeek(now),
end: this.endOfWeek(now)
};
}
// 提取数量限制
const limitMatch = naturalQuery.match(/前(\d+)条|显示(\d+)条|取(\d+)条/);
if (limitMatch) {
query.limit = parseInt(limitMatch[1] || limitMatch[2] || limitMatch[3]);
}
// 提取筛选条件
query.filters = {};
if (lower.includes('类别A')) {
query.filters.category = 'A';
}
if (lower.includes('类别B')) {
query.filters.category = 'B';
}
// 生成最终查询字符串
query.query = this.buildQueryString(query);
return query;
}
/**
* 构建查询字符串
*/
buildQueryString(parsedQuery) {
let query = 'SELECT * FROM data';
const conditions = [];
if (parsedQuery.dateRange) {
conditions.push(
`date >= '${parsedQuery.dateRange.start}' AND date <= '${parsedQuery.dateRange.end}'`
);
}
if (parsedQuery.filters) {
Object.entries(parsedQuery.filters).forEach(([key, value]) => {
conditions.push(`${key} = '${value}'`);
});
}
if (conditions.length > 0) {
query += ' WHERE ' + conditions.join(' AND ');
}
if (parsedQuery.limit) {
query += ` LIMIT ${parsedQuery.limit}`;
}
return query;
}
/**
* 格式化结果
*/
formatResults(rawResults, originalQuery) {
return {
originalQuery,
query: rawResults.query,
recordCount: rawResults.recordCount,
timestamp: rawResults.timestamp,
data: rawResults.results,
summary: this.generateSummary(rawResults)
};
}
/**
* 生成数据摘要
*/
generateSummary(data) {
if (!data.results || data.results.length === 0) {
return '没有找到匹配的数据';
}
const count = data.results.length;
const avg = data.results.reduce((sum, r) => sum + r.value, 0) / count;
const max = Math.max(...data.results.map(r => r.value));
const min = Math.min(...data.results.map(r => r.value));
return `共 ${count} 条记录,数值范围 ${min.toFixed(2)} - ${max.toFixed(2)},平均值 ${avg.toFixed(2)}`;
}
// 日期工具方法
startOfDay(date) {
return new Date(date.setHours(0, 0, 0, 0)).toISOString();
}
endOfDay(date) {
return new Date(date.setHours(23, 59, 59, 999)).toISOString();
}
startOfWeek(date) {
const d = new Date(date);
const day = d.getDay();
const diff = d.getDate() - day + (day === 0 ? -6 : 1);
return this.startOfDay(new Date(d.setDate(diff)));
}
endOfWeek(date) {
const start = new Date(this.startOfWeek(date));
start.setDate(start.getDate() + 6);
return this.endOfDay(start);
}
}
// 演示
async function demo() {
const assistant = new DataAnalysisAssistant({
baseUrl: process.env.N8N_BASE_URL,
apiKey: process.env.N8N_API_KEY
});
await assistant.initialize();
// 示例查询
const queries = [
'查询昨天前10条数据',
'本周类别A的数据',
'显示最新的5条记录'
];
for (const q of queries) {
console.log(`\n处理查询: "${q}"`);
const result = await assistant.query(q);
console.log('结果:', JSON.stringify(result, null, 2));
}
}
demo();
教程三:构建AI任务调度系统
这个教程展示如何创建一个智能任务调度系统,让AI能够根据任务优先级和资源状况自动调度工作流。
系统架构
┌─────────────────────────────────────────────────────────┐
│ AI 调度器 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │
│ │ 任务解析器 │→ │ 优先级评估 │→ │ 调度决策引擎 │ │
│ └─────────────┘ └─────────────┘ └─────────────────┘ │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ n8n-mcp 层 │
│ ┌─────────────────────────────────────────────────────┐│
│ │ 任务队列管理器 │ 工作流注册表 │ 执行状态监控 ││
│ └─────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ n8n 引擎 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 工作流1 │ │ 工作流2 │ │ 工作流3 │ │ 工作流N │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
完整实现代码
/**
* AI智能任务调度系统
*
* 这个系统可以根据任务描述自动:
* 1. 解析任务类型和参数
* 2. 评估任务优先级
* 3. 选择合适的工作流执行
* 4. 监控执行状态并反馈结果
*/
const { N8nMcpClient } = require('n8n-mcp');
/**
* 任务优先级枚举
*/
const Priority = {
CRITICAL: 1, // 紧急重要
HIGH: 2, // 重要
NORMAL: 3, // 普通
LOW: 4 // 低
};
/**
* 任务状态枚举
*/
const TaskStatus = {
PENDING: 'pending',
RUNNING: 'running',
COMPLETED: 'completed',
FAILED: 'failed',
CANCELLED: 'cancelled'
};
/**
* 任务定义类
*/
class Task {
constructor(id, type, description, params) {
this.id = id;
this.type = type;
this.description = description;
this.params = params;
this.priority = Priority.NORMAL;
this.status = TaskStatus.PENDING;
this.createdAt = new Date();
this.startedAt = null;
this.completedAt = null;
this.result = null;
this.error = null;
}
}
/**
* AI任务调度器主类
*/
class AITaskScheduler {
constructor(config) {
this.client = new N8nMcpClient(config);
this.tasks = new Map();
this.workflowRegistry = new Map();
this.eventHandlers = new Map();
}
/**
* 初始化调度器
*/
async initialize() {
console.log('正在初始化AI任务调度器...');
// 检查n8n连接
const status = await this.client.healthCheck();
console.log('n8n连接状态:', status.status);
// 加载工作流注册表
await this.loadWorkflowRegistry();
// 启动任务处理循环
this.startProcessingLoop();
console.log('调度器初始化完成');
console.log('已注册工作流:', this.workflowRegistry.size);
}
/**
* 加载工作流注册表
* 将n8n中的工作流映射为可调度的任务类型
*/
async loadWorkflowRegistry() {
const workflows = await this.client.listWorkflows({ includeInactive: false });
workflows.forEach(wf => {
// 根据工作流名称自动分类
const category = this.categorizeWorkflow(wf.name);
this.workflowRegistry.set(category, {
id: wf.id,
name: wf.name,
category: category,
active: wf.active,
avgExecutionTime: 0 // 可后续添加统计
});
});
// 注册默认的工作流映射
this.registerDefaultMappings();
}
/**
* 注册默认的工作流映射
*/
registerDefaultMappings() {
// 如果没有找到对应的工作流,可以使用这些默认处理
const defaults = {
'email': { handler: this.handleEmailTask.bind(this) },
'data': { handler: this.handleDataTask.bind(this) },
'report': { handler: this.handleReportTask.bind(this) },
'notification': { handler: this.handleNotificationTask.bind(this) },
'sync': { handler: this.handleSyncTask.bind(this) }
};
Object.entries(defaults).forEach(([key, value]) => {
if (!this.workflowRegistry.has(key)) {
this.workflowRegistry.set(key, {
name: `Default ${key} handler`,
category: key,
handler: value.handler
});
}
});
}
/**
* 根据名称自动分类工作流
*/
categorizeWorkflow(name) {
const lower = name.toLowerCase();
if (lower.includes('email') || lower.includes('邮件')) return 'email';
if (lower.includes('data') || lower.includes('数据')) return 'data';
if (lower.includes('report') || lower.includes('报告')) return 'report';
if (lower.includes('notify') || lower.includes('通知')) return 'notification';
if (lower.includes('sync') || lower.includes('同步')) return 'sync';
return 'general';
}
/**
* 接收AI指令并创建任务
*
* @param {string} instruction - AI给出的指令
* @returns {Task} 创建的任务对象
*/
async createTaskFromInstruction(instruction) {
// 解析指令
const parsed = this.parseInstruction(instruction);
// 创建任务
const task = new Task(
this.generateTaskId(),
parsed.type,
instruction,
parsed.params
);
// 评估优先级
task.priority = this.evaluatePriority(parsed);
// 存储任务
this.tasks.set(task.id, task);
// 触发事件
this.emit('task.created', task);
console.log(`任务已创建: [${task.id}] ${task.type} (优先级: ${task.priority})`);
return task;
}
/**
* 解析AI指令
* 实际应用中应接入AI模型
*/
parseInstruction(instruction) {
const result = {
type: 'general',
params: {}
};
const lower = instruction.toLowerCase();
// 识别任务类型
if (lower.includes('发送邮件') || lower.includes('发邮件')) {
result.type = 'email';
// 提取邮件相关参数
const toMatch = instruction.match(/给(.+?)发|至(.+?)[,,]/);
if (toMatch) result.params.to = toMatch[1] || toMatch[2];
}
if (lower.includes('查询') || lower.includes('获取数据')) {
result.type = 'data';
}
if (lower.includes('报告') || lower.includes('汇总')) {
result.type = 'report';
}
if (lower.includes('通知') || lower.includes('提醒')) {
result.type = 'notification';
}
// 识别急切程度
if (lower.includes('紧急') || lower.includes('立即') || lower.includes('马上')) {
result.urgency = 'high';
}
return result;
}
/**
* 评估任务优先级
*/
evaluatePriority(parsed) {
// 紧急程度高
if (parsed.urgency === 'high') {
return Priority.CRITICAL;
}
// 根据任务类型调整
switch (parsed.type) {
case 'notification':
return Priority.HIGH;
case 'email':
return Priority.NORMAL;
case 'report':
return Priority.LOW;
default:
return Priority.NORMAL;
}
}
/**
* 执行任务
*/
async executeTask(taskId) {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`任务不存在: ${taskId}`);
}
console.log(`开始执行任务: [${task.id}]`);
task.status = TaskStatus.RUNNING;
task.startedAt = new Date();
this.emit('task.started', task);
try {
// 获取对应的工作流
const workflow = this.workflowRegistry.get(task.type);
if (!workflow) {
throw new Error(`未找到处理类型 '${task.type}' 的工作流`);
}
// 如果有处理器函数,直接调用
if (workflow.handler) {
task.result = await workflow.handler(task);
} else {
// 否则通过n8n-mcp执行
const execution = await this.client.executeWorkflow(
workflow.id,
task.params
);
task.result = await this.client.waitForExecution(execution.id);
}
task.status = TaskStatus.COMPLETED;
task.completedAt = new Date();
this.emit('task.completed', task);
console.log(`任务完成: [${task.id}]`);
} catch (error) {
task.status = TaskStatus.FAILED;
task.error = error.message;
task.completedAt = new Date();
this.emit('task.failed', { task, error });
console.error(`任务失败: [${task.id}] - ${error.message}`);
}
return task;
}
/**
* 任务处理循环
*/
startProcessingLoop() {
this.processingInterval = setInterval(async () => {
await this.processPendingTasks();
}, 1000); // 每秒检查一次
}
/**
* 处理待执行的任务
*/
async processPendingTasks() {
// 获取所有待执行的任务
const pendingTasks = Array.from(this.tasks.values())
.filter(t => t.status === TaskStatus.PENDING)
.sort((a, b) => a.priority - b.priority); // 按优先级排序
// 只处理最高优先级的任务
if (pendingTasks.length > 0) {
await this.executeTask(pendingTasks[0].id);
}
}
/**
* 获取任务状态
*/
getTaskStatus(taskId) {
const task = this.tasks.get(taskId);
if (!task) return null;
return {
id: task.id,
type: task.type,
status: task.status,
priority: task.priority,
createdAt: task.createdAt,
duration: task.startedAt
? (task.completedAt - task.startedAt) / 1000 + 's'
: null,
result: task.result,
error: task.error
};
}
/**
* 获取调度器统计信息
*/
getStats() {
const tasks = Array.from(this.tasks.values());
return {
total: tasks.length,
pending: tasks.filter(t => t.status === TaskStatus.PENDING).length,
running: tasks.filter(t => t.status === TaskStatus.RUNNING).length,
completed: tasks.filter(t => t.status === TaskStatus.COMPLETED).length,
failed: tasks.filter(t => t.status === TaskStatus.FAILED).length,
registeredWorkflows: this.workflowRegistry.size
};
}
// 事件系统
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
emit(event, data) {
const handlers = this.eventHandlers.get(event) || [];
handlers.forEach(handler => handler(data));
}
// 工具方法
generateTaskId() {
return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 关闭调度器
*/
shutdown() {
if (this.processingInterval) {
clearInterval(this.processingInterval);
}
console.log('调度器已关闭');
}
// 任务类型处理器
async handleEmailTask(task) {
console.log('处理邮件任务...');
// 邮件处理逻辑
return { sent: true };
}
async handleDataTask(task) {
console.log('处理数据任务...');
// 数据处理逻辑
return { queried: true };
}
async handleReportTask(task) {
console.log('处理报告任务...');
// 报告生成逻辑
return { generated: true };
}
async handleNotificationTask(task) {
console.log('处理通知任务...');
// 通知发送逻辑
return { notified: true };
}
async handleSyncTask(task) {
console.log('处理同步任务...');
// 数据同步逻辑
return { synced: true };
}
}
// 演示程序
async function demo() {
const scheduler = new AITaskScheduler({
baseUrl: process.env.N8N_BASE_URL,
apiKey: process.env.N8N_API_KEY
});
// 注册事件监听
scheduler.on('task.created', task => {
console.log(`[事件] 任务创建: ${task.id}`);
});
scheduler.on('task.started', task => {
console.log(`[事件] 任务开始: ${task.id}`);
});
scheduler.on('task.completed', task => {
console.log(`[事件] 任务完成: ${task.id}`);
});
scheduler.on('task.failed', ({ task, error }) => {
console.log(`[事件] 任务失败: ${task.id} - ${error}`);
});
try {
// 初始化
await scheduler.initialize();
// 模拟AI指令
const instructions = [
'立即给张三发送一封邮件,内容是明天的会议提醒',
'查询本月的销售数据并生成汇总报告',
'同步所有客户信息到CRM系统'
];
// 创建任务
console.log('\n--- 创建任务 ---');
const tasks = [];
for (const instruction of instructions) {
const task = await scheduler.createTaskFromInstruction(instruction);
tasks.push(task);
}
// 等待任务执行
console.log('\n--- 等待任务执行 ---');
await new Promise(resolve => setTimeout(resolve, 3000));
// 查看结果
console.log('\n--- 任务结果 ---');
tasks.forEach(task => {
const status = scheduler.getTaskStatus(task.id);
console.log(`${task.id}: ${status.status}`);
});
// 查看统计
console.log('\n--- 调度器统计 ---');
console.log(scheduler.getStats());
} finally {
scheduler.shutdown();
}
}
// 运行演示
demo();
常见使用场景
场景一:智能客服系统
需求描述
构建一个智能客服系统,让AI能够自动处理客户咨询,对于简单问题直接回复,复杂问题转人工处理。
解决方案
// 客服工作流配置
const customerServiceWorkflow = {
name: '智能客服系统',
nodes: [
{
name: 'Webhook入口',
type: 'n8n-nodes-base.webhook',
parameters: {
path: 'customer-service',
responseMode: 'lastNode'
}
},
{
name: 'AI意图识别',
type: '@n8n/n8n-nodes-langchain.agent',
parameters: {
model: 'gpt-4',
systemPrompt: `
你是一个客服助手,需要判断客户的问题类型:
1. 如果是简单咨询,直接回答
2. 如果需要操作数据,使用tools
3. 如果无法回答,转人工
`,
tools: ['query_order', 'query_product', 'create_ticket']
}
},
{
name: '工单创建',
type: 'n8n-nodes-base.httpRequest',
parameters: {
method: 'POST',
url: 'https://helpdesk-api.example.com/tickets',
body: '={{ {customer: $json.customer, issue: $json.issue} }}'
}
}
]
};
场景二:数据同步与ETL
需求描述
定期从多个数据源同步数据,进行清洗转换后存入数据仓库。
解决方案
/**
* 数据同步任务配置
*/
class DataSyncScheduler {
constructor(mcpClient) {
this.client = mcpClient;
this.syncTasks = new Map();
}
/**
* 配置同步任务
*/
async setupSyncTask(taskName, config) {
const workflow = await this.client.createWorkflow({
name: `数据同步 - ${taskName}`,
nodes: [
// 源数据节点
{
name: '数据源',
type: 'n8n-nodes-base.httpRequest',
parameters: {
method: 'GET',
url: config.source.url,
authentication: 'genericCredentialType',
genericAuthType: config.source.authType
}
},
// 数据转换节点
{
name: '数据清洗',
type: 'n8n-nodes-base.code',
parameters: {
jsCode: `
const data = $input.all();
// 数据清洗逻辑
const cleaned = data.map(item => ({
id: item.json.id,
value: item.json.value,
timestamp: new Date().toISOString(),
synced: true
}));
return cleaned.map(d => ({ json: d }));
`
}
},
// 目标存储节点
{
name: '数据仓库',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'insert',
table: config.target.table,
columns: 'id, value, timestamp, synced'
}
}
]
});
this.syncTasks.set(taskName, {
workflowId: workflow.id,
schedule: config.schedule,
status: 'active'
});
return workflow;
}
/**
* 执行同步任务
*/
async executeSync(taskName, options = {}) {
const task = this.syncTasks.get(taskName);
if (!task) {
throw new Error(`同步任务不存在: ${taskName}`);
}
console.log(`开始同步: ${taskName}`);
const execution = await this.client.executeWorkflow(
task.workflowId,
{
syncMode: options.incremental ? 'incremental' : 'full',
lastSyncTime: options.lastSyncTime
}
);
return {
taskName,
executionId: execution.id,
startedAt: new Date()
};
}
}
场景三:定时报表生成与分发
需求描述
每天早上9点自动生成昨日数据报表,发送给相关人员。
解决方案
/**
* 报表自动化系统
*/
class ReportAutomation {
constructor(mcpClient) {
this.client = mcpClient;
}
/**
* 创建报表生成工作流
*/
async createReportWorkflow(reportConfig) {
const workflow = await this.client.createWorkflow({
name: `报表 - ${reportConfig.name}`,
nodes: [
{
name: '定时触发',
type: 'n8n-nodes-base.schedule',
parameters: {
rule: {
interval: [reportConfig.schedule]
}
}
},
{
name: '数据查询',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'executeQuery',
query: reportConfig.query
}
},
{
name: '报表生成',
type: 'n8n-nodes-base.code',
parameters: {
jsCode: `
const data = $input.all();
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
return [{
json: {
reportDate: yesterday.toISOString().split('T')[0],
totalRecords: data.length,
summary: calculateSummary(data),
details: data
}
}];
function calculateSummary(data) {
return {
count: data.length,
sum: data.reduce((s, d) => s + d.json.value, 0),
avg: data.reduce((s, d) => s + d.json.value, 0) / data.length
};
}
`
}
},
{
name: '邮件发送',
type: 'n8n-nodes-base.email',
parameters: {
to: reportConfig.recipients.join(','),
subject: `={{ $json.reportDate }} ${reportConfig.name}`,
body: `={{ $json.summary }}`
}
}
]
});
// 激活工作流
await this.client.activateWorkflow(workflow.id);
return workflow;
}
/**
* 手动触发报表生成
*/
async generateReport(workflowId, dateRange) {
const execution = await this.client.executeWorkflow(workflowId, {
dateRange: dateRange,
manualTrigger: true
});
// 等待执行完成
const result = await this.client.waitForExecution(execution.id, {
timeout: 60000
});
return result;
}
}
技巧与最佳实践
性能优化技巧
1. 批量处理减少API调用
不要频繁调用API,尽量批量处理:
// 低效的做法
for (const item of items) {
await client.executeWorkflow(workflowId, item);
}
// 高效的做法 - 使用批量工作流
await client.executeWorkflow(batchWorkflowId, {
items: items,
batchSize: 100
});
2. 合理设置超时时间
对于不同类型的工作流,设置合适的超时:
// 快速任务
const quickResult = await client.executeWorkflowAndWait(
quickWorkflowId,
data,
{ timeout: 5000 } // 5秒超时
);
// 耗时任务
const longResult = await client.executeWorkflowAndWait(
longWorkflowId,
data,
{ timeout: 300000 } // 5分钟超时
);
3. 使用缓存减少重复查询
class CachedMcpClient {
constructor(client) {
this.client = client;
this.cache = new Map();
this.cacheTimeout = 60000; // 1分钟缓存
}
async listWorkflows(forceRefresh = false) {
const cacheKey = 'workflows';
if (!forceRefresh) {
const cached = this.cache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < this.cacheTimeout) {
return cached.data;
}
}
const data = await this.client.listWorkflows();
this.cache.set(cacheKey, {
data,
timestamp: Date.now()
});
return data;
}
}
错误处理最佳实践
1. 实现重试机制
async function executeWithRetry(fn, maxRetries = 3, delay = 1000) {
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (error) {
lastError = error;
console.log(`尝试 ${i + 1} 失败: ${error.message}`);
if (i < maxRetries - 1) {
await new Promise(r => setTimeout(r, delay * (i + 1)));
}
}
}
throw new Error(`执行失败,已重试 ${maxRetries} 次: ${lastError.message}`);
}
// 使用示例
const result = await executeWithRetry(
() => client.executeWorkflow(workflowId, data),
3,
2000
);
2. 优雅的错误处理
class RobustMcpClient {
constructor(config) {
this.client = new N8nMcpClient(config);
this.errorHandlers = new Map();
}
/**
* 注册错误处理器
*/
onError(errorType, handler) {
this.errorHandlers.set(errorType, handler);
}
/**
* 安全执行
*/
async safeExecute(workflowId, data, options = {}) {
try {
const result = await this.client.executeWorkflow(workflowId, data);
return { success: true, data: result };
} catch (error) {
// 尝试查找特定的错误处理器
const handler = this.errorHandlers.get(error.code);
if (handler) {
return await handler(error, { workflowId, data });
}
// 通用错误处理
console.error('工作流执行失败:', error);
return {
success: false,
error: {
message: error.message,
code: error.code,
workflowId
}
};
}
}
}
安全性建议
1. 敏感信息管理
// 使用环境变量存储敏感信息
const config = {
baseUrl: process.env.N8N_BASE_URL,
apiKey: process.env.N8N_API_KEY, // 不要硬编码
webhookSecret: process.env.WEBHOOK_SECRET
};
// 验证webhook签名
function verifyWebhookSignature(payload, signature, secret) {
const crypto = require('crypto');
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(JSON.stringify(payload))
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
2. 权限控制
class PermissionControlledClient {
constructor(client, permissions) {
this.client = client;
this.permissions = permissions;
}
async executeWorkflow(workflowId, data) {
// 检查权限
if (!this.hasPermission('execute', workflowId)) {
throw new Error('权限不足: 无法执行此工作流');
}
return this.client.executeWorkflow(workflowId, data);
}
hasPermission(action, resourceId) {
const key = `${action}:${resourceId}`;
return this.permissions.includes(key) ||
this.permissions.includes(`${action}:*`);
}
}
监控与日志
1. 完整的日志记录
class LoggedMcpClient {
constructor(client, logger) {
this.client = client;
this.logger = logger;
}
async executeWorkflow(workflowId, data) {
const executionId = this.generateId();
this.logger.info('开始执行工作流', {
executionId,
workflowId,
timestamp: new Date().toISOString()
});
try {
const startTime = Date.now();
const result = await this.client.executeWorkflow(workflowId, data);
const duration = Date.now() - startTime;
this.logger.info('工作流执行成功', {
executionId,
workflowId,
duration,
timestamp: new Date().toISOString()
});
return result;
} catch (error) {
this.logger.error('工作流执行失败', {
executionId,
workflowId,
error: error.message,
timestamp: new Date().toISOString()
});
throw error;
}
}
}
2. 性能指标收集
class MetricsCollector {
constructor() {
this.metrics = {
executions: [],
errors: [],
latencies: []
};
}
recordExecution(execution) {
this.metrics.executions.push({
workflowId: execution.workflowId,
status: execution.status,
timestamp: new Date()
});
}
recordLatency(workflowId, duration) {
this.metrics.latencies.push({
workflowId,
duration,
timestamp: new Date()
});
}
getStats() {
const recentLatencies = this.metrics.latencies
.filter(l => Date.now() - l.timestamp < 3600000); // 最近1小时
return {
totalExecutions: this.metrics.executions.length,
successRate: this.calculateSuccessRate(),
avgLatency: this.calculateAvgLatency(recentLatencies),
p95Latency: this.calculatePercentile(recentLatencies, 95)
};
}
}
总结与相关资源
核心要点回顾
通过本文的学习,你应该已经掌握了以下关键知识点:
1. n8n-mcp是什么
n8n-mcp是一个将n8n工作流自动化引擎与MCP协议结合的开源项目,它让AI模型能够真正执行自动化任务,而不仅仅是给出建议。
2. 核心功能
- 工作流的创建、查询、修改和删除
- 异步执行和同步等待
- 实时事件订阅
- 批量操作支持
3. 实战应用
- AI邮件自动化
- 智能数据分析
- AI任务调度系统
- 客服系统、数据同步、报表生成等场景
4. 最佳实践
- 性能优化:批量处理、缓存、合理超时
- 错误处理:重试机制、优雅降级
- 安全性:敏感信息管理、权限控制
- 可观测性:日志记录、性能监控
进阶学习路径
如果你想要更深入地学习这个领域,以下是一些建议的学习方向:
深入n8n本身
- 学习n8n的高级节点和表达式
- 掌握n8n的出错处理和重试机制
- 了解n8n的集群部署方案
MCP协议深入
- 阅读MCP协议规范文档
- 参与MCP开源项目贡献
- 探索MCP在不同场景的应用
AI集成
- 学习LangChain等AI框架
- 掌握提示工程技巧
- 了解AI Agent的设计模式
相关资源链接
官方资源
- n8n官方文档: https://docs.n8n.io/
- n8n-mcp GitHub: https://github.com/czlonkowski/n8n-mcp
- MCP协议规范: https://modelcontextprotocol.io/
社区资源
- n8n社区论坛: https://community.n8n.io/
- n8n工作流模板库: https://n8n.io/workflows
相关开源项目
- n8n-nodes-langchain: n8n的LangChain集成节点
- n8n-nodes-openai: OpenAI集成节点
- flowise: 低代码LLM应用构建工具
未来展望
n8n-mcp代表了AI与自动化工具融合的一个重要方向。随着AI能力的不断提升,这种”AI+自动化”的模式将会变得越来越重要。
你可以期待的未来发展方向包括:
- 更智能的工作流生成:AI根据需求自动设计工作流
- 更强的上下文理解:AI能够理解整个工作流的状态和历史
- 更自然的交互方式:语音、对话式的自动化配置
- 跨平台的自动化:统一的自动化标准,覆盖更多应用场景
无论你是想提升个人效率,还是为企业构建自动化解决方案,n8n-mcp都是一个值得深入研究的工具。它不仅提供了强大的功能,更重要的是,它开启了一种全新的人机协作模式——让AI从”旁观者”变成”参与者”,真正成为我们工作中的得力助手。
现在,就动手开始你的n8n-mcp之旅吧!
评论区