n8n+MCP强强联合:让AI助手真正掌控你的自动化工作流

n8n+MCP强强联合:让AI助手真正掌控你的自动化工作流

n8n+MCP强强联合:让AI助手真正掌控你的自动化工作流

你是否曾为这样的场景感到困扰:想要让AI助手帮你完成复杂的工作流程,但AI只能回答问题,无法真正执行操作?想要把AI能力嵌入到自己的自动化流程中,却发现接口对接困难重重?

今天要介绍的这个开源项目——czlonkowski/n8n-mcp,正是为解决这些痛点而生。它将n8n强大的工作流自动化能力与MCP(Model Context Protocol)完美结合,让AI模型不再只是”纸上谈兵”的助手,而是能够真正操控自动化流程的执行者。

在本文中,我将带你从零开始,全面掌握这个强大工具的使用方法。


为什么值得关注

传统自动化工具的局限性

市面上的自动化工具种类繁多,从IFTTT、Zapier到n8n,每一款都有其独特优势。然而,它们普遍存在一个共同问题:AI能力的缺失或不足

传统的自动化流程是这样的:触发条件 → 执行预设动作 → 结束。整个过程是静态的、预设的,无法根据上下文智能决策。

举例来说,当你设置一个”收到邮件 → 自动回复”的流程时,回复内容必须是预先编写好的模板。但现实情况是,每封邮件的内容各不相同,用同一个模板回复往往显得生硬甚至适得其反。

n8n-MCP带来的变革

n8n-mcp项目正是为了打破这一局限而诞生的。它的核心价值在于:

1. 赋予AI执行能力

通过MCP协议,AI模型可以直接与n8n工作流交互。AI不再只是给出建议,而是能够:

  • 查询当前工作流的执行状态
  • 触发特定工作流的执行
  • 监控工作流运行结果
  • 动态调整工作流参数

2. 双向通信机制

传统的API调用是单向的——你发送请求,服务器返回结果。而n8n-mcp实现了真正的双向通信:

  • AI可以向n8n发送指令
  • n8n可以向AI反馈执行结果
  • AI可以根据反馈做出下一步决策

3. 降低开发门槛

项目提供了简洁的接口和丰富的示例,即使你不是专业开发者,也能快速上手。你不需要深入了解底层实现,只需要关注如何利用这一能力解决实际问题。

实际应用场景

想象一下这样的场景:

你的AI助手收到用户咨询:”帮我查询昨天的销售数据,然后用邮件发给市场部同事。”

在传统的实现中,这需要:

  1. 手动调用销售数据API
  2. 手动格式化数据
  3. 手动调用邮件发送API

而在n8n-mcp的加持下,AI可以:

  1. 调用n8n中预设的”查询销售数据”工作流
  2. 获取数据后,调用”发送邮件”工作流
  3. 整个过程自动完成,无需人工干预

这就是n8n-mcp的魅力所在——让AI从”建议者”真正变成”执行者”。


环境搭建

前置要求

在开始之前,你需要确保具备以下环境:

基础环境:

  • Node.js 18.0 或更高版本
  • npm 或 yarn 包管理器
  • 基本的命令行操作能力

推荐环境:

  • Docker(用于运行n8n)
  • 基础的JavaScript/TypeScript知识
  • 对API概念有基本理解

安装步骤

第一步:安装n8n

n8n是整个系统的核心工作流引擎。你可以通过多种方式安装n8n:

方式一:使用Docker(推荐)

# 创建n8n数据存储目录
mkdir -p ~/.n8n

# 启动n8n容器
docker run -d \
  --name n8n \
  -p 5678:5678 \
  -v ~/.n8n:/home/node/.n8n \
  n8nio/n8n

容器启动后,打开浏览器访问 http://localhost:5678,你将看到n8n的工作界面。

方式二:全局安装n8n

# 通过npm全局安装
npm install n8n -g

# 启动n8n
n8n start

第二步:安装n8n-mcp

项目提供了两种安装方式,你可以根据需求选择。

方式一:从源码安装

# 克隆仓库
git clone https://github.com/czlonkowski/n8n-mcp.git

# 进入项目目录
cd n8n-mcp

# 安装依赖
npm install

# 构建项目
npm run build

方式二:通过npm包安装

# 在你的项目中安装
npm install n8n-mcp

第三步:配置环境变量

项目需要配置一些环境变量才能正常工作。创建一个 .env 文件:

# n8n服务器地址
N8N_BASE_URL=http://localhost:5678

# n8n API访问令牌
N8N_API_KEY=your_n8n_api_key_here

# MCP服务器端口
MCP_SERVER_PORT=3000

# 日志级别(可选)
LOG_LEVEL=info

如何获取n8n API密钥:

  1. 登录n8n界面
  2. 点击右上角的用户头像
  3. 选择”Settings”
  4. 找到”API”选项卡
  5. 点击”Create API Key”生成新的密钥
  6. 复制生成的密钥并保存到 .env 文件中

第四步:启动MCP服务器

配置完成后,启动MCP服务器:

# 开发模式(带热重载)
npm run dev

# 生产模式
npm start

服务器启动后,你将看到类似以下的输出:

[INFO] MCP Server running on port 3000
[INFO] Connected to n8n at http://localhost:5678
[INFO] Available workflows: 15

这表示MCP服务器已成功连接到n8n,并同步了你的工作流列表。

验证安装

为了确认一切正常工作,我们来做个简单的测试。创建一个测试文件 test.js

// 导入n8n-mcp客户端
const { N8nMcpClient } = require('n8n-mcp');

async function testConnection() {
  // 初始化客户端
  const client = new N8nMcpClient({
    baseUrl: process.env.N8N_BASE_URL,
    apiKey: process.env.N8N_API_KEY
  });

  try {
    // 测试连接
    const status = await client.healthCheck();
    console.log('连接状态:', status);

    // 获取工作流列表
    const workflows = await client.listWorkflows();
    console.log('可用工作流数量:', workflows.length);

    // 列出工作流名称
    workflows.forEach(wf => {
      console.log(`- ${wf.name} (ID: ${wf.id})`);
    });

  } catch (error) {
    console.error('连接失败:', error.message);
  }
}

testConnection();

运行测试:

node test.js

如果一切正常,你应该能看到n8n中所有工作流的列表。


核心功能详解

MCP协议概述

在深入功能之前,我们需要先理解MCP(Model Context Protocol)协议。MCP是一种专门为AI模型设计的通信协议,它定义了:

工具定义格式

MCP使用JSON Schema来定义工具,让AI能够理解每个工具的用途、参数和返回值:

{
  "name": "trigger_workflow",
  "description": "触发指定n8n工作流的执行",
  "parameters": {
    "type": "object",
    "properties": {
      "workflow_id": {
        "type": "string",
        "description": "工作流的唯一标识符"
      },
      "input_data": {
        "type": "object",
        "description": "传递给工作流的输入数据"
      }
    },
    "required": ["workflow_id"]
  }
}

通信机制

MCP采用JSON-RPC 2.0作为底层通信协议,支持:

  • 请求-响应模式:客户端发送请求,服务器返回结果
  • 通知模式:单向发送消息,无需响应
  • 流式响应:支持实时返回执行进度

n8n-mcp的核心能力

1. 工作流管理

列出所有工作流

// 获取所有工作流的详细信息
const workflows = await client.listWorkflows({
  includeInactive: false  // 是否包含已禁用的工作流
});

// 工作流对象包含以下字段:
// - id: 唯一标识符
// - name: 工作流名称
// - active: 是否处于激活状态
// - createdAt: 创建时间
// - updatedAt: 最后更新时间

获取单个工作流详情

// 根据ID获取工作流详细信息
const workflow = await client.getWorkflow('workflow_id_here');

// 返回的工作流对象包含完整的节点配置
console.log('工作流节点数:', workflow.nodes.length);
console.log('工作流连接:', workflow.connections);

创建新工作流

// 通过代码创建新工作流
const newWorkflow = await client.createWorkflow({
  name: '我的自动化工作流',
  nodes: [
    {
      name: 'Webhook',
      type: 'n8n-nodes-base.webhook',
      parameters: {},
      position: [250, 300]
    },
    {
      name: 'HTTP Request',
      type: 'n8n-nodes-base.httpRequest',
      parameters: {
        method: 'GET',
        url: 'https://api.example.com/data'
      },
      position: [450, 300]
    }
  ],
  connections: {
    'Webhook': {
      'main': [[{
        'node': 'HTTP Request',
        'type': 'main',
        'index': 0
      }]]
    }
  }
});

2. 工作流执行

触发工作流执行

这是最核心的功能之一。你可以触发任意工作流并传递参数:

// 触发工作流执行
const execution = await client.executeWorkflow('workflow_id_here', {
  // 输入数据,会作为webhook或初始节点的输入
  userId: '12345',
  action: 'process_order',
  orderData: {
    items: [
      { name: '产品A', quantity: 2 },
      { name: '产品B', quantity: 1 }
    ],
    totalAmount: 299.99
  }
});

console.log('执行ID:', execution.id);
console.log('执行状态:', execution.status);

获取执行结果

// 查询执行结果
const result = await client.getExecutionResult('execution_id_here');

console.log('执行状态:', result.status);
console.log('输出数据:', result.data);
console.log('执行耗时:', result.executionTime, 'ms');

// result.data包含工作流最后节点的输出
// 可以遍历查看每个节点的执行结果
if (result.data.resultData) {
  Object.keys(result.data.resultData).forEach(nodeName => {
    console.log(`节点 ${nodeName} 输出:`, result.data.resultData[nodeName]);
  });
}

等待执行完成

对于耗时较长的工作流,你可以选择等待执行完成:

// 同步执行,等待结果返回
const result = await client.executeWorkflowAndWait('workflow_id_here', {
  data: 'your input data'
}, {
  timeout: 60000,  // 超时时间(毫秒)
  pollInterval: 1000  // 轮询间隔(毫秒)
});

console.log('最终结果:', result);

3. 实时监控

事件订阅

你可以通过事件订阅来监控工作流的状态变化:

// 订阅工作流执行事件
client.subscribe('workflow.execution.started', (event) => {
  console.log('工作流开始执行:', event.workflowId);
});

client.subscribe('workflow.execution.completed', (event) => {
  console.log('工作流执行完成:', {
    workflowId: event.workflowId,
    executionId: event.executionId,
    status: event.status,
    duration: event.duration
  });
});

client.subscribe('workflow.execution.failed', (event) => {
  console.error('工作流执行失败:', {
    workflowId: event.workflowId,
    error: event.error
  });
});

执行历史查询

// 获取工作流的执行历史
const history = await client.getExecutionHistory('workflow_id_here', {
  limit: 20,  // 返回数量限制
  includeData: true  // 是否包含执行数据
});

history.forEach(exec => {
  const status = exec.status === 'success' ? '✓' : '✗';
  console.log(`${status} ${exec.id} - ${exec.startedAt} (${exec.executionTime}ms)`);
});

4. 动态工作流修改

这可能是最强大的功能——你可以在运行时动态修改工作流:

// 更新工作流的某个节点
await client.updateWorkflowNode('workflow_id_here', 'node_name', {
  parameters: {
    url: 'https://new-api-endpoint.com/data',
    method: 'POST'
  }
});

// 添加新节点到现有工作流
await client.addWorkflowNode('workflow_id_here', {
  name: 'New Node',
  type: 'n8n-nodes-base.httpRequest',
  parameters: {
    method: 'GET',
    url: 'https://example.com/api'
  },
  position: [650, 300]
});

工具集完整列表

n8n-mcp提供了丰富的工具集,以下是完整的工具列表:

工具名称 功能描述 主要参数
list_workflows 列出所有工作流 include_inactive
get_workflow 获取工作流详情 workflow_id
create_workflow 创建新工作流 workflow_data
update_workflow 更新工作流 workflow_id, data
delete_workflow 删除工作流 workflow_id
execute_workflow 触发执行 workflow_id, input_data
get_execution 获取执行详情 execution_id
get_execution_history 获取执行历史 workflow_id
activate_workflow 激活工作流 workflow_id
deactivate_workflow 停用工作流 workflow_id
add_node 添加节点 workflow_id, node_data
update_node 更新节点 workflow_id, node_name, data

实战教程

教程一:构建AI助手控制的邮件自动化系统

在这个教程中,我们将构建一个完整的自动化系统:让AI助手能够根据用户指令,自动发送定制化的邮件。

第一步:在n8n中创建邮件工作流

  1. 登录n8n界面
  2. 点击”Create New Workflow”创建新工作流
  3. 添加以下节点:

节点1:Webhook(触发器)

// 配置webhook节点
{
  "path": "send-custom-email",
  "responseMode": "responseNode",
  "options": {}
}

节点2:Set(数据格式化)

// 设置邮件内容
{
  "mode": "manual",
  "assignments": {
    "assignments": [
      {
        "name": "to",
        "value": "={{ $json.body.to }}"
      },
      {
        "name": "subject",
        "value": "={{ $json.body.subject }}"
      },
      {
        "name": "content",
        "value": "={{ $json.body.content }}"
      }
    ]
  },
  "options": {}
}

节点3:Email Send(发送邮件)

// 配置邮件发送
{
  "fromEmail": "noreply@yourcompany.com",
  "to": "={{ $json.to }}",
  "subject": "={{ $json.subject }}",
  "text": "={{ $json.content }}",
  "emailFormat": "html"
}
  1. 保存工作流并激活

第二步:创建MCP集成代码

创建一个完整的集成文件 email-automation.js

/**
 * AI邮件自动化助手
 * 
 * 这个模块展示了如何将n8n-mcp与AI助手结合,
 * 实现智能化的邮件发送功能
 */

const { N8nMcpClient } = require('n8n-mcp');

class EmailAutomationAssistant {
  constructor(config) {
    this.client = new N8nMcpClient(config);
    this.webhookBaseUrl = config.webhookBaseUrl;
  }

  /**
   * 初始化连接
   */
  async initialize() {
    console.log('正在连接到n8n...');
    const status = await this.client.healthCheck();
    console.log('连接成功! n8n版本:', status.version);

    // 获取邮件工作流
    const workflows = await this.client.listWorkflows();
    this.emailWorkflow = workflows.find(w => 
      w.name.includes('email') || w.name.includes('邮件')
    );

    if (!this.emailWorkflow) {
      throw new Error('未找到邮件工作流,请先在n8n中创建');
    }

    console.log('邮件工作流已就绪:', this.emailWorkflow.name);
  }

  /**
   * 发送简单邮件
   * 
   * @param {string} to - 收件人邮箱
   * @param {string} subject - 邮件主题
   * @param {string} content - 邮件内容
   */
  async sendSimpleEmail(to, subject, content) {
    try {
      const result = await this.client.executeWorkflow(
        this.emailWorkflow.id,
        {
          to,
          subject,
          content,
          timestamp: new Date().toISOString()
        }
      );

      console.log('邮件发送任务已提交');
      console.log('执行ID:', result.id);

      return {
        success: true,
        executionId: result.id
      };
    } catch (error) {
      console.error('发送失败:', error.message);
      return {
        success: false,
        error: error.message
      };
    }
  }

  /**
   * 发送批量邮件
   * 
   * @param {Array} recipients - 收件人列表
   * @param {string} subject - 邮件主题
   * @param {Function} contentGenerator - 内容生成函数
   */
  async sendBatchEmails(recipients, subject, contentGenerator) {
    const results = [];

    for (const recipient of recipients) {
      const content = await contentGenerator(recipient);

      const result = await this.sendSimpleEmail(
        recipient.email,
        subject,
        content
      );

      results.push({
        recipient: recipient.email,
        ...result
      });

      // 避免发送过快
      await this.delay(1000);
    }

    return {
      total: recipients.length,
      success: results.filter(r => r.success).length,
      failed: results.filter(r => !r.success).length,
      details: results
    };
  }

  /**
   * AI驱动的智能邮件发送
   * 根据用户指令自动处理邮件发送
   * 
   * @param {string} instruction - AI指令
   */
  async handleAIInstruction(instruction) {
    // 模拟AI解析指令
    const parsed = this.parseInstruction(instruction);

    if (parsed.action === 'send_email') {
      return await this.sendSimpleEmail(
        parsed.to,
        parsed.subject,
        parsed.content
      );
    }

    if (parsed.action === 'send_batch') {
      return await this.sendBatchEmails(
        parsed.recipients,
        parsed.subject,
        parsed.contentGenerator
      );
    }

    return { error: '无法识别的指令' };
  }

  /**
   * 解析AI指令(简化版本)
   * 实际应用中,这里会接入真正的AI模型
   */
  parseInstruction(instruction) {
    // 简化实现,实际应使用AI模型解析
    const lower = instruction.toLowerCase();

    if (lower.includes('发送') && lower.includes('邮件')) {
      // 提取收件人、主题、内容(实际应用中由AI提取)
      return {
        action: 'send_email',
        to: 'recipient@example.com',
        subject: '自动发送的邮件',
        content: '这是由AI助手自动生成的邮件内容'
      };
    }

    throw new Error('无法解析指令');
  }

  // 工具方法:延时
  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 主程序
async function main() {
  const assistant = new EmailAutomationAssistant({
    baseUrl: process.env.N8N_BASE_URL,
    apiKey: process.env.N8N_API_KEY,
    webhookBaseUrl: process.env.WEBHOOK_BASE_URL
  });

  try {
    // 初始化
    await assistant.initialize();

    // 示例1:发送简单邮件
    console.log('\n--- 示例1:发送简单邮件 ---');
    const simpleResult = await assistant.sendSimpleEmail(
      'user@example.com',
      '测试邮件',
      '<h1>这是一封测试邮件</h1><p>来自n8n-mcp自动化系统</p>'
    );
    console.log('结果:', simpleResult);

    // 示例2:批量发送
    console.log('\n--- 示例2:批量发送邮件 ---');
    const batchRecipients = [
      { email: 'user1@example.com', name: '用户1' },
      { email: 'user2@example.com', name: '用户2' },
      { email: 'user3@example.com', name: '用户3' }
    ];

    const batchResult = await assistant.sendBatchEmails(
      batchRecipients,
      '批量测试邮件',
      (recipient) => `亲爱的 ${recipient.name},这是一封个性化邮件!`
    );
    console.log('批量发送结果:', batchResult);

  } catch (error) {
    console.error('程序执行错误:', error);
  }
}

// 运行主程序
main();

第三步:运行测试

node email-automation.js

预期输出:

正在连接到n8n...
连接成功! n8n版本: 1.0.0
邮件工作流已就绪: Email Automation Workflow

--- 示例1:发送简单邮件 ---
邮件发送任务已提交
执行ID: abc123-def456
结果: { success: true, executionId: 'abc123-def456' }

--- 示例2:批量发送邮件 ---
批量发送结果: { total: 3, success: 3, failed: 0, details: [...] }

教程二:构建AI数据分析助手

在这个教程中,我们将创建一个AI数据分析助手,让用户可以用自然语言查询数据。

第一步:创建数据查询工作流

在n8n中创建以下工作流:

节点1:Webhook

{
  "path": "data-query",
  "httpMethod": "POST"
}

节点2:Code(数据处理)

// 这个节点模拟数据查询,实际可能连接数据库或API
function processQuery($input, $vars) {
  const query = $input.body.query;
  const params = $input.body.params || {};

  // 模拟数据分析逻辑
  const results = {
    query: query,
    results: generateSampleData(query, params),
    timestamp: new Date().toISOString(),
    recordCount: 10
  };

  return results;
}

// 生成示例数据(实际应用中替换为真实数据查询)
function generateSampleData(query, params) {
  const data = [];
  const count = params.limit || 10;

  for (let i = 0; i < count; i++) {
    data.push({
      id: i + 1,
      value: Math.random() * 1000,
      category: ['A', 'B', 'C'][i % 3],
      date: new Date(Date.now() - i * 86400000).toISOString()
    });
  }

  return data;
}

return processQuery($input, $vars);

节点3:AI Processing(可选)

如果安装了AI相关节点,可以添加自然语言处理能力:

{
  "model": "gpt-4",
  "prompt": "分析以下数据,用简洁的语言总结关键洞察:\n{{ $json.results }}",
  "temperature": 0.3
}

第二步:创建数据查询客户端

/**
 * AI数据分析助手
 * 
 * 允许用户使用自然语言查询数据,
 * 系统自动调用后端工作流进行分析
 */

const { N8nMcpClient } = require('n8n-mcp');

class DataAnalysisAssistant {
  constructor(config) {
    this.client = new N8nMcpClient(config);
  }

  async initialize() {
    await this.client.healthCheck();
    console.log('数据助手初始化完成');

    // 查找数据查询工作流
    const workflows = await this.client.listWorkflows();
    this.queryWorkflow = workflows.find(w => 
      w.name.toLowerCase().includes('data') ||
      w.name.toLowerCase().includes('query') ||
      w.name.toLowerCase().includes('分析')
    );

    if (!this.queryWorkflow) {
      throw new Error('未找到数据查询工作流');
    }
  }

  /**
   * 执行数据查询
   * 
   * @param {string} naturalQuery - 自然语言查询
   * @param {Object} options - 查询选项
   */
  async query(naturalQuery, options = {}) {
    console.log('处理查询:', naturalQuery);

    // 解析自然语言为结构化查询
    const structuredQuery = this.nlpToQuery(naturalQuery);

    // 执行工作流
    const execution = await this.client.executeWorkflow(
      this.queryWorkflow.id,
      {
        query: structuredQuery.query,
        params: {
          limit: options.limit || 10,
          startDate: structuredQuery.dateRange?.start,
          endDate: structuredQuery.dateRange?.end,
          filters: structuredQuery.filters
        },
        originalQuery: naturalQuery
      }
    );

    // 等待结果
    const result = await this.client.waitForExecution(
      execution.id,
      { timeout: 30000 }
    );

    return this.formatResults(result, naturalQuery);
  }

  /**
   * 简单的NLP解析(简化版本)
   * 实际应用中应接入专门的NLP服务
   */
  nlpToQuery(naturalQuery) {
    const query = {};
    const lower = naturalQuery.toLowerCase();

    // 提取日期范围
    if (lower.includes('昨天')) {
      const yesterday = new Date();
      yesterday.setDate(yesterday.getDate() - 1);
      query.dateRange = {
        start: this.startOfDay(yesterday),
        end: this.endOfDay(yesterday)
      };
    }

    if (lower.includes('本周')) {
      const now = new Date();
      query.dateRange = {
        start: this.startOfWeek(now),
        end: this.endOfWeek(now)
      };
    }

    // 提取数量限制
    const limitMatch = naturalQuery.match(/前(\d+)条|显示(\d+)条|取(\d+)条/);
    if (limitMatch) {
      query.limit = parseInt(limitMatch[1] || limitMatch[2] || limitMatch[3]);
    }

    // 提取筛选条件
    query.filters = {};
    if (lower.includes('类别A')) {
      query.filters.category = 'A';
    }
    if (lower.includes('类别B')) {
      query.filters.category = 'B';
    }

    // 生成最终查询字符串
    query.query = this.buildQueryString(query);

    return query;
  }

  /**
   * 构建查询字符串
   */
  buildQueryString(parsedQuery) {
    let query = 'SELECT * FROM data';
    const conditions = [];

    if (parsedQuery.dateRange) {
      conditions.push(
        `date >= '${parsedQuery.dateRange.start}' AND date <= '${parsedQuery.dateRange.end}'`
      );
    }

    if (parsedQuery.filters) {
      Object.entries(parsedQuery.filters).forEach(([key, value]) => {
        conditions.push(`${key} = '${value}'`);
      });
    }

    if (conditions.length > 0) {
      query += ' WHERE ' + conditions.join(' AND ');
    }

    if (parsedQuery.limit) {
      query += ` LIMIT ${parsedQuery.limit}`;
    }

    return query;
  }

  /**
   * 格式化结果
   */
  formatResults(rawResults, originalQuery) {
    return {
      originalQuery,
      query: rawResults.query,
      recordCount: rawResults.recordCount,
      timestamp: rawResults.timestamp,
      data: rawResults.results,
      summary: this.generateSummary(rawResults)
    };
  }

  /**
   * 生成数据摘要
   */
  generateSummary(data) {
    if (!data.results || data.results.length === 0) {
      return '没有找到匹配的数据';
    }

    const count = data.results.length;
    const avg = data.results.reduce((sum, r) => sum + r.value, 0) / count;
    const max = Math.max(...data.results.map(r => r.value));
    const min = Math.min(...data.results.map(r => r.value));

    return `共 ${count} 条记录,数值范围 ${min.toFixed(2)} - ${max.toFixed(2)},平均值 ${avg.toFixed(2)}`;
  }

  // 日期工具方法
  startOfDay(date) {
    return new Date(date.setHours(0, 0, 0, 0)).toISOString();
  }

  endOfDay(date) {
    return new Date(date.setHours(23, 59, 59, 999)).toISOString();
  }

  startOfWeek(date) {
    const d = new Date(date);
    const day = d.getDay();
    const diff = d.getDate() - day + (day === 0 ? -6 : 1);
    return this.startOfDay(new Date(d.setDate(diff)));
  }

  endOfWeek(date) {
    const start = new Date(this.startOfWeek(date));
    start.setDate(start.getDate() + 6);
    return this.endOfDay(start);
  }
}

// 演示
async function demo() {
  const assistant = new DataAnalysisAssistant({
    baseUrl: process.env.N8N_BASE_URL,
    apiKey: process.env.N8N_API_KEY
  });

  await assistant.initialize();

  // 示例查询
  const queries = [
    '查询昨天前10条数据',
    '本周类别A的数据',
    '显示最新的5条记录'
  ];

  for (const q of queries) {
    console.log(`\n处理查询: "${q}"`);
    const result = await assistant.query(q);
    console.log('结果:', JSON.stringify(result, null, 2));
  }
}

demo();

教程三:构建AI任务调度系统

这个教程展示如何创建一个智能任务调度系统,让AI能够根据任务优先级和资源状况自动调度工作流。

系统架构

┌─────────────────────────────────────────────────────────┐
│                    AI 调度器                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐  │
│  │ 任务解析器   │→ │ 优先级评估   │→ │ 调度决策引擎    │  │
│  └─────────────┘  └─────────────┘  └─────────────────┘  │
└──────────────────────────┬──────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                   n8n-mcp 层                             │
│  ┌─────────────────────────────────────────────────────┐│
│  │  任务队列管理器  │  工作流注册表  │  执行状态监控   ││
│  └─────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                      n8n 引擎                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
│  │ 工作流1  │  │ 工作流2  │  │ 工作流3  │  │ 工作流N  │ │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘ │
└─────────────────────────────────────────────────────────┘

完整实现代码

/**
 * AI智能任务调度系统
 * 
 * 这个系统可以根据任务描述自动:
 * 1. 解析任务类型和参数
 * 2. 评估任务优先级
 * 3. 选择合适的工作流执行
 * 4. 监控执行状态并反馈结果
 */

const { N8nMcpClient } = require('n8n-mcp');

/**
 * 任务优先级枚举
 */
const Priority = {
  CRITICAL: 1,  // 紧急重要
  HIGH: 2,      // 重要
  NORMAL: 3,    // 普通
  LOW: 4        // 低
};

/**
 * 任务状态枚举
 */
const TaskStatus = {
  PENDING: 'pending',
  RUNNING: 'running',
  COMPLETED: 'completed',
  FAILED: 'failed',
  CANCELLED: 'cancelled'
};

/**
 * 任务定义类
 */
class Task {
  constructor(id, type, description, params) {
    this.id = id;
    this.type = type;
    this.description = description;
    this.params = params;
    this.priority = Priority.NORMAL;
    this.status = TaskStatus.PENDING;
    this.createdAt = new Date();
    this.startedAt = null;
    this.completedAt = null;
    this.result = null;
    this.error = null;
  }
}

/**
 * AI任务调度器主类
 */
class AITaskScheduler {
  constructor(config) {
    this.client = new N8nMcpClient(config);
    this.tasks = new Map();
    this.workflowRegistry = new Map();
    this.eventHandlers = new Map();
  }

  /**
   * 初始化调度器
   */
  async initialize() {
    console.log('正在初始化AI任务调度器...');

    // 检查n8n连接
    const status = await this.client.healthCheck();
    console.log('n8n连接状态:', status.status);

    // 加载工作流注册表
    await this.loadWorkflowRegistry();

    // 启动任务处理循环
    this.startProcessingLoop();

    console.log('调度器初始化完成');
    console.log('已注册工作流:', this.workflowRegistry.size);
  }

  /**
   * 加载工作流注册表
   * 将n8n中的工作流映射为可调度的任务类型
   */
  async loadWorkflowRegistry() {
    const workflows = await this.client.listWorkflows({ includeInactive: false });

    workflows.forEach(wf => {
      // 根据工作流名称自动分类
      const category = this.categorizeWorkflow(wf.name);
      this.workflowRegistry.set(category, {
        id: wf.id,
        name: wf.name,
        category: category,
        active: wf.active,
        avgExecutionTime: 0  // 可后续添加统计
      });
    });

    // 注册默认的工作流映射
    this.registerDefaultMappings();
  }

  /**
   * 注册默认的工作流映射
   */
  registerDefaultMappings() {
    // 如果没有找到对应的工作流,可以使用这些默认处理
    const defaults = {
      'email': { handler: this.handleEmailTask.bind(this) },
      'data': { handler: this.handleDataTask.bind(this) },
      'report': { handler: this.handleReportTask.bind(this) },
      'notification': { handler: this.handleNotificationTask.bind(this) },
      'sync': { handler: this.handleSyncTask.bind(this) }
    };

    Object.entries(defaults).forEach(([key, value]) => {
      if (!this.workflowRegistry.has(key)) {
        this.workflowRegistry.set(key, {
          name: `Default ${key} handler`,
          category: key,
          handler: value.handler
        });
      }
    });
  }

  /**
   * 根据名称自动分类工作流
   */
  categorizeWorkflow(name) {
    const lower = name.toLowerCase();

    if (lower.includes('email') || lower.includes('邮件')) return 'email';
    if (lower.includes('data') || lower.includes('数据')) return 'data';
    if (lower.includes('report') || lower.includes('报告')) return 'report';
    if (lower.includes('notify') || lower.includes('通知')) return 'notification';
    if (lower.includes('sync') || lower.includes('同步')) return 'sync';

    return 'general';
  }

  /**
   * 接收AI指令并创建任务
   * 
   * @param {string} instruction - AI给出的指令
   * @returns {Task} 创建的任务对象
   */
  async createTaskFromInstruction(instruction) {
    // 解析指令
    const parsed = this.parseInstruction(instruction);

    // 创建任务
    const task = new Task(
      this.generateTaskId(),
      parsed.type,
      instruction,
      parsed.params
    );

    // 评估优先级
    task.priority = this.evaluatePriority(parsed);

    // 存储任务
    this.tasks.set(task.id, task);

    // 触发事件
    this.emit('task.created', task);

    console.log(`任务已创建: [${task.id}] ${task.type} (优先级: ${task.priority})`);

    return task;
  }

  /**
   * 解析AI指令
   * 实际应用中应接入AI模型
   */
  parseInstruction(instruction) {
    const result = {
      type: 'general',
      params: {}
    };

    const lower = instruction.toLowerCase();

    // 识别任务类型
    if (lower.includes('发送邮件') || lower.includes('发邮件')) {
      result.type = 'email';
      // 提取邮件相关参数
      const toMatch = instruction.match(/给(.+?)发|至(.+?)[,,]/);
      if (toMatch) result.params.to = toMatch[1] || toMatch[2];
    }

    if (lower.includes('查询') || lower.includes('获取数据')) {
      result.type = 'data';
    }

    if (lower.includes('报告') || lower.includes('汇总')) {
      result.type = 'report';
    }

    if (lower.includes('通知') || lower.includes('提醒')) {
      result.type = 'notification';
    }

    // 识别急切程度
    if (lower.includes('紧急') || lower.includes('立即') || lower.includes('马上')) {
      result.urgency = 'high';
    }

    return result;
  }

  /**
   * 评估任务优先级
   */
  evaluatePriority(parsed) {
    // 紧急程度高
    if (parsed.urgency === 'high') {
      return Priority.CRITICAL;
    }

    // 根据任务类型调整
    switch (parsed.type) {
      case 'notification':
        return Priority.HIGH;
      case 'email':
        return Priority.NORMAL;
      case 'report':
        return Priority.LOW;
      default:
        return Priority.NORMAL;
    }
  }

  /**
   * 执行任务
   */
  async executeTask(taskId) {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`任务不存在: ${taskId}`);
    }

    console.log(`开始执行任务: [${task.id}]`);

    task.status = TaskStatus.RUNNING;
    task.startedAt = new Date();
    this.emit('task.started', task);

    try {
      // 获取对应的工作流
      const workflow = this.workflowRegistry.get(task.type);

      if (!workflow) {
        throw new Error(`未找到处理类型 '${task.type}' 的工作流`);
      }

      // 如果有处理器函数,直接调用
      if (workflow.handler) {
        task.result = await workflow.handler(task);
      } else {
        // 否则通过n8n-mcp执行
        const execution = await this.client.executeWorkflow(
          workflow.id,
          task.params
        );
        task.result = await this.client.waitForExecution(execution.id);
      }

      task.status = TaskStatus.COMPLETED;
      task.completedAt = new Date();

      this.emit('task.completed', task);
      console.log(`任务完成: [${task.id}]`);

    } catch (error) {
      task.status = TaskStatus.FAILED;
      task.error = error.message;
      task.completedAt = new Date();

      this.emit('task.failed', { task, error });
      console.error(`任务失败: [${task.id}] - ${error.message}`);
    }

    return task;
  }

  /**
   * 任务处理循环
   */
  startProcessingLoop() {
    this.processingInterval = setInterval(async () => {
      await this.processPendingTasks();
    }, 1000);  // 每秒检查一次
  }

  /**
   * 处理待执行的任务
   */
  async processPendingTasks() {
    // 获取所有待执行的任务
    const pendingTasks = Array.from(this.tasks.values())
      .filter(t => t.status === TaskStatus.PENDING)
      .sort((a, b) => a.priority - b.priority);  // 按优先级排序

    // 只处理最高优先级的任务
    if (pendingTasks.length > 0) {
      await this.executeTask(pendingTasks[0].id);
    }
  }

  /**
   * 获取任务状态
   */
  getTaskStatus(taskId) {
    const task = this.tasks.get(taskId);
    if (!task) return null;

    return {
      id: task.id,
      type: task.type,
      status: task.status,
      priority: task.priority,
      createdAt: task.createdAt,
      duration: task.startedAt 
        ? (task.completedAt - task.startedAt) / 1000 + 's' 
        : null,
      result: task.result,
      error: task.error
    };
  }

  /**
   * 获取调度器统计信息
   */
  getStats() {
    const tasks = Array.from(this.tasks.values());

    return {
      total: tasks.length,
      pending: tasks.filter(t => t.status === TaskStatus.PENDING).length,
      running: tasks.filter(t => t.status === TaskStatus.RUNNING).length,
      completed: tasks.filter(t => t.status === TaskStatus.COMPLETED).length,
      failed: tasks.filter(t => t.status === TaskStatus.FAILED).length,
      registeredWorkflows: this.workflowRegistry.size
    };
  }

  // 事件系统
  on(event, handler) {
    if (!this.eventHandlers.has(event)) {
      this.eventHandlers.set(event, []);
    }
    this.eventHandlers.get(event).push(handler);
  }

  emit(event, data) {
    const handlers = this.eventHandlers.get(event) || [];
    handlers.forEach(handler => handler(data));
  }

  // 工具方法
  generateTaskId() {
    return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  /**
   * 关闭调度器
   */
  shutdown() {
    if (this.processingInterval) {
      clearInterval(this.processingInterval);
    }
    console.log('调度器已关闭');
  }

  // 任务类型处理器
  async handleEmailTask(task) {
    console.log('处理邮件任务...');
    // 邮件处理逻辑
    return { sent: true };
  }

  async handleDataTask(task) {
    console.log('处理数据任务...');
    // 数据处理逻辑
    return { queried: true };
  }

  async handleReportTask(task) {
    console.log('处理报告任务...');
    // 报告生成逻辑
    return { generated: true };
  }

  async handleNotificationTask(task) {
    console.log('处理通知任务...');
    // 通知发送逻辑
    return { notified: true };
  }

  async handleSyncTask(task) {
    console.log('处理同步任务...');
    // 数据同步逻辑
    return { synced: true };
  }
}

// 演示程序
async function demo() {
  const scheduler = new AITaskScheduler({
    baseUrl: process.env.N8N_BASE_URL,
    apiKey: process.env.N8N_API_KEY
  });

  // 注册事件监听
  scheduler.on('task.created', task => {
    console.log(`[事件] 任务创建: ${task.id}`);
  });

  scheduler.on('task.started', task => {
    console.log(`[事件] 任务开始: ${task.id}`);
  });

  scheduler.on('task.completed', task => {
    console.log(`[事件] 任务完成: ${task.id}`);
  });

  scheduler.on('task.failed', ({ task, error }) => {
    console.log(`[事件] 任务失败: ${task.id} - ${error}`);
  });

  try {
    // 初始化
    await scheduler.initialize();

    // 模拟AI指令
    const instructions = [
      '立即给张三发送一封邮件,内容是明天的会议提醒',
      '查询本月的销售数据并生成汇总报告',
      '同步所有客户信息到CRM系统'
    ];

    // 创建任务
    console.log('\n--- 创建任务 ---');
    const tasks = [];
    for (const instruction of instructions) {
      const task = await scheduler.createTaskFromInstruction(instruction);
      tasks.push(task);
    }

    // 等待任务执行
    console.log('\n--- 等待任务执行 ---');
    await new Promise(resolve => setTimeout(resolve, 3000));

    // 查看结果
    console.log('\n--- 任务结果 ---');
    tasks.forEach(task => {
      const status = scheduler.getTaskStatus(task.id);
      console.log(`${task.id}: ${status.status}`);
    });

    // 查看统计
    console.log('\n--- 调度器统计 ---');
    console.log(scheduler.getStats());

  } finally {
    scheduler.shutdown();
  }
}

// 运行演示
demo();

常见使用场景

场景一:智能客服系统

需求描述

构建一个智能客服系统,让AI能够自动处理客户咨询,对于简单问题直接回复,复杂问题转人工处理。

解决方案

// 客服工作流配置
const customerServiceWorkflow = {
  name: '智能客服系统',
  nodes: [
    {
      name: 'Webhook入口',
      type: 'n8n-nodes-base.webhook',
      parameters: {
        path: 'customer-service',
        responseMode: 'lastNode'
      }
    },
    {
      name: 'AI意图识别',
      type: '@n8n/n8n-nodes-langchain.agent',
      parameters: {
        model: 'gpt-4',
        systemPrompt: `
          你是一个客服助手,需要判断客户的问题类型:
          1. 如果是简单咨询,直接回答
          2. 如果需要操作数据,使用tools
          3. 如果无法回答,转人工
        `,
        tools: ['query_order', 'query_product', 'create_ticket']
      }
    },
    {
      name: '工单创建',
      type: 'n8n-nodes-base.httpRequest',
      parameters: {
        method: 'POST',
        url: 'https://helpdesk-api.example.com/tickets',
        body: '={{ {customer: $json.customer, issue: $json.issue} }}'
      }
    }
  ]
};

场景二:数据同步与ETL

需求描述

定期从多个数据源同步数据,进行清洗转换后存入数据仓库。

解决方案

/**
 * 数据同步任务配置
 */

class DataSyncScheduler {
  constructor(mcpClient) {
    this.client = mcpClient;
    this.syncTasks = new Map();
  }

  /**
   * 配置同步任务
   */
  async setupSyncTask(taskName, config) {
    const workflow = await this.client.createWorkflow({
      name: `数据同步 - ${taskName}`,
      nodes: [
        // 源数据节点
        {
          name: '数据源',
          type: 'n8n-nodes-base.httpRequest',
          parameters: {
            method: 'GET',
            url: config.source.url,
            authentication: 'genericCredentialType',
            genericAuthType: config.source.authType
          }
        },
        // 数据转换节点
        {
          name: '数据清洗',
          type: 'n8n-nodes-base.code',
          parameters: {
            jsCode: `
              const data = $input.all();
              // 数据清洗逻辑
              const cleaned = data.map(item => ({
                id: item.json.id,
                value: item.json.value,
                timestamp: new Date().toISOString(),
                synced: true
              }));
              return cleaned.map(d => ({ json: d }));
            `
          }
        },
        // 目标存储节点
        {
          name: '数据仓库',
          type: 'n8n-nodes-base.postgres',
          parameters: {
            operation: 'insert',
            table: config.target.table,
            columns: 'id, value, timestamp, synced'
          }
        }
      ]
    });

    this.syncTasks.set(taskName, {
      workflowId: workflow.id,
      schedule: config.schedule,
      status: 'active'
    });

    return workflow;
  }

  /**
   * 执行同步任务
   */
  async executeSync(taskName, options = {}) {
    const task = this.syncTasks.get(taskName);
    if (!task) {
      throw new Error(`同步任务不存在: ${taskName}`);
    }

    console.log(`开始同步: ${taskName}`);

    const execution = await this.client.executeWorkflow(
      task.workflowId,
      {
        syncMode: options.incremental ? 'incremental' : 'full',
        lastSyncTime: options.lastSyncTime
      }
    );

    return {
      taskName,
      executionId: execution.id,
      startedAt: new Date()
    };
  }
}

场景三:定时报表生成与分发

需求描述

每天早上9点自动生成昨日数据报表,发送给相关人员。

解决方案

/**
 * 报表自动化系统
 */

class ReportAutomation {
  constructor(mcpClient) {
    this.client = mcpClient;
  }

  /**
   * 创建报表生成工作流
   */
  async createReportWorkflow(reportConfig) {
    const workflow = await this.client.createWorkflow({
      name: `报表 - ${reportConfig.name}`,
      nodes: [
        {
          name: '定时触发',
          type: 'n8n-nodes-base.schedule',
          parameters: {
            rule: {
              interval: [reportConfig.schedule]
            }
          }
        },
        {
          name: '数据查询',
          type: 'n8n-nodes-base.postgres',
          parameters: {
            operation: 'executeQuery',
            query: reportConfig.query
          }
        },
        {
          name: '报表生成',
          type: 'n8n-nodes-base.code',
          parameters: {
            jsCode: `
              const data = $input.all();
              const yesterday = new Date();
              yesterday.setDate(yesterday.getDate() - 1);

              return [{
                json: {
                  reportDate: yesterday.toISOString().split('T')[0],
                  totalRecords: data.length,
                  summary: calculateSummary(data),
                  details: data
                }
              }];

              function calculateSummary(data) {
                return {
                  count: data.length,
                  sum: data.reduce((s, d) => s + d.json.value, 0),
                  avg: data.reduce((s, d) => s + d.json.value, 0) / data.length
                };
              }
            `
          }
        },
        {
          name: '邮件发送',
          type: 'n8n-nodes-base.email',
          parameters: {
            to: reportConfig.recipients.join(','),
            subject: `={{ $json.reportDate }} ${reportConfig.name}`,
            body: `={{ $json.summary }}`
          }
        }
      ]
    });

    // 激活工作流
    await this.client.activateWorkflow(workflow.id);

    return workflow;
  }

  /**
   * 手动触发报表生成
   */
  async generateReport(workflowId, dateRange) {
    const execution = await this.client.executeWorkflow(workflowId, {
      dateRange: dateRange,
      manualTrigger: true
    });

    // 等待执行完成
    const result = await this.client.waitForExecution(execution.id, {
      timeout: 60000
    });

    return result;
  }
}

技巧与最佳实践

性能优化技巧

1. 批量处理减少API调用

不要频繁调用API,尽量批量处理:

// 低效的做法
for (const item of items) {
  await client.executeWorkflow(workflowId, item);
}

// 高效的做法 - 使用批量工作流
await client.executeWorkflow(batchWorkflowId, {
  items: items,
  batchSize: 100
});

2. 合理设置超时时间

对于不同类型的工作流,设置合适的超时:

// 快速任务
const quickResult = await client.executeWorkflowAndWait(
  quickWorkflowId,
  data,
  { timeout: 5000 }  // 5秒超时
);

// 耗时任务
const longResult = await client.executeWorkflowAndWait(
  longWorkflowId,
  data,
  { timeout: 300000 }  // 5分钟超时
);

3. 使用缓存减少重复查询

class CachedMcpClient {
  constructor(client) {
    this.client = client;
    this.cache = new Map();
    this.cacheTimeout = 60000;  // 1分钟缓存
  }

  async listWorkflows(forceRefresh = false) {
    const cacheKey = 'workflows';

    if (!forceRefresh) {
      const cached = this.cache.get(cacheKey);
      if (cached && Date.now() - cached.timestamp < this.cacheTimeout) {
        return cached.data;
      }
    }

    const data = await this.client.listWorkflows();
    this.cache.set(cacheKey, {
      data,
      timestamp: Date.now()
    });

    return data;
  }
}

错误处理最佳实践

1. 实现重试机制

async function executeWithRetry(fn, maxRetries = 3, delay = 1000) {
  let lastError;

  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error;
      console.log(`尝试 ${i + 1} 失败: ${error.message}`);

      if (i < maxRetries - 1) {
        await new Promise(r => setTimeout(r, delay * (i + 1)));
      }
    }
  }

  throw new Error(`执行失败,已重试 ${maxRetries} 次: ${lastError.message}`);
}

// 使用示例
const result = await executeWithRetry(
  () => client.executeWorkflow(workflowId, data),
  3,
  2000
);

2. 优雅的错误处理

class RobustMcpClient {
  constructor(config) {
    this.client = new N8nMcpClient(config);
    this.errorHandlers = new Map();
  }

  /**
   * 注册错误处理器
   */
  onError(errorType, handler) {
    this.errorHandlers.set(errorType, handler);
  }

  /**
   * 安全执行
   */
  async safeExecute(workflowId, data, options = {}) {
    try {
      const result = await this.client.executeWorkflow(workflowId, data);
      return { success: true, data: result };
    } catch (error) {
      // 尝试查找特定的错误处理器
      const handler = this.errorHandlers.get(error.code);
      if (handler) {
        return await handler(error, { workflowId, data });
      }

      // 通用错误处理
      console.error('工作流执行失败:', error);
      return {
        success: false,
        error: {
          message: error.message,
          code: error.code,
          workflowId
        }
      };
    }
  }
}

安全性建议

1. 敏感信息管理

// 使用环境变量存储敏感信息
const config = {
  baseUrl: process.env.N8N_BASE_URL,
  apiKey: process.env.N8N_API_KEY,  // 不要硬编码
  webhookSecret: process.env.WEBHOOK_SECRET
};

// 验证webhook签名
function verifyWebhookSignature(payload, signature, secret) {
  const crypto = require('crypto');
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(JSON.stringify(payload))
    .digest('hex');

  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(expectedSignature)
  );
}

2. 权限控制

class PermissionControlledClient {
  constructor(client, permissions) {
    this.client = client;
    this.permissions = permissions;
  }

  async executeWorkflow(workflowId, data) {
    // 检查权限
    if (!this.hasPermission('execute', workflowId)) {
      throw new Error('权限不足: 无法执行此工作流');
    }

    return this.client.executeWorkflow(workflowId, data);
  }

  hasPermission(action, resourceId) {
    const key = `${action}:${resourceId}`;
    return this.permissions.includes(key) || 
           this.permissions.includes(`${action}:*`);
  }
}

监控与日志

1. 完整的日志记录

class LoggedMcpClient {
  constructor(client, logger) {
    this.client = client;
    this.logger = logger;
  }

  async executeWorkflow(workflowId, data) {
    const executionId = this.generateId();

    this.logger.info('开始执行工作流', {
      executionId,
      workflowId,
      timestamp: new Date().toISOString()
    });

    try {
      const startTime = Date.now();
      const result = await this.client.executeWorkflow(workflowId, data);
      const duration = Date.now() - startTime;

      this.logger.info('工作流执行成功', {
        executionId,
        workflowId,
        duration,
        timestamp: new Date().toISOString()
      });

      return result;
    } catch (error) {
      this.logger.error('工作流执行失败', {
        executionId,
        workflowId,
        error: error.message,
        timestamp: new Date().toISOString()
      });
      throw error;
    }
  }
}

2. 性能指标收集

class MetricsCollector {
  constructor() {
    this.metrics = {
      executions: [],
      errors: [],
      latencies: []
    };
  }

  recordExecution(execution) {
    this.metrics.executions.push({
      workflowId: execution.workflowId,
      status: execution.status,
      timestamp: new Date()
    });
  }

  recordLatency(workflowId, duration) {
    this.metrics.latencies.push({
      workflowId,
      duration,
      timestamp: new Date()
    });
  }

  getStats() {
    const recentLatencies = this.metrics.latencies
      .filter(l => Date.now() - l.timestamp < 3600000);  // 最近1小时

    return {
      totalExecutions: this.metrics.executions.length,
      successRate: this.calculateSuccessRate(),
      avgLatency: this.calculateAvgLatency(recentLatencies),
      p95Latency: this.calculatePercentile(recentLatencies, 95)
    };
  }
}

总结与相关资源

核心要点回顾

通过本文的学习,你应该已经掌握了以下关键知识点:

1. n8n-mcp是什么

n8n-mcp是一个将n8n工作流自动化引擎与MCP协议结合的开源项目,它让AI模型能够真正执行自动化任务,而不仅仅是给出建议。

2. 核心功能

  • 工作流的创建、查询、修改和删除
  • 异步执行和同步等待
  • 实时事件订阅
  • 批量操作支持

3. 实战应用

  • AI邮件自动化
  • 智能数据分析
  • AI任务调度系统
  • 客服系统、数据同步、报表生成等场景

4. 最佳实践

  • 性能优化:批量处理、缓存、合理超时
  • 错误处理:重试机制、优雅降级
  • 安全性:敏感信息管理、权限控制
  • 可观测性:日志记录、性能监控

进阶学习路径

如果你想要更深入地学习这个领域,以下是一些建议的学习方向:

深入n8n本身

  • 学习n8n的高级节点和表达式
  • 掌握n8n的出错处理和重试机制
  • 了解n8n的集群部署方案

MCP协议深入

  • 阅读MCP协议规范文档
  • 参与MCP开源项目贡献
  • 探索MCP在不同场景的应用

AI集成

  • 学习LangChain等AI框架
  • 掌握提示工程技巧
  • 了解AI Agent的设计模式

相关资源链接

官方资源

  • n8n官方文档: https://docs.n8n.io/
  • n8n-mcp GitHub: https://github.com/czlonkowski/n8n-mcp
  • MCP协议规范: https://modelcontextprotocol.io/

社区资源

  • n8n社区论坛: https://community.n8n.io/
  • n8n工作流模板库: https://n8n.io/workflows

相关开源项目

  • n8n-nodes-langchain: n8n的LangChain集成节点
  • n8n-nodes-openai: OpenAI集成节点
  • flowise: 低代码LLM应用构建工具

未来展望

n8n-mcp代表了AI与自动化工具融合的一个重要方向。随着AI能力的不断提升,这种”AI+自动化”的模式将会变得越来越重要。

你可以期待的未来发展方向包括:

  • 更智能的工作流生成:AI根据需求自动设计工作流
  • 更强的上下文理解:AI能够理解整个工作流的状态和历史
  • 更自然的交互方式:语音、对话式的自动化配置
  • 跨平台的自动化:统一的自动化标准,覆盖更多应用场景

无论你是想提升个人效率,还是为企业构建自动化解决方案,n8n-mcp都是一个值得深入研究的工具。它不仅提供了强大的功能,更重要的是,它开启了一种全新的人机协作模式——让AI从”旁观者”变成”参与者”,真正成为我们工作中的得力助手。

现在,就动手开始你的n8n-mcp之旅吧!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注