Skip to main content

节点规范与数据映射系统实施计划

📋 概述

本文档整合了原有的三个独立设计文档,提供统一的实施计划:

  • 节点规范系统 (node_spec.md) - 核心架构,定义节点类型、参数、端口规范
  • 数据映射系统 (data_mapping_system.md) - 基于节点规范的数据转换层
  • 端口定义计划 (NODE_PORT_DEFINITION_PLAN.md) - 已整合到节点规范系统

🚨 4 天紧急实施模式

总时间: 4 天 (96 小时) 实施模式: 紧急开发,优先核心功能,最小可行产品(MVP) 团队: 需要 2-3 名全职开发者并行工作

🏗️ 统一架构设计

系统层次结构

核心依赖关系

  1. 节点规范系统 是基础层,定义:
  • 节点类型和子类型的规范结构
  • 输入输出端口的类型和验证规则
  • 参数定义和验证逻辑
  • 数据格式要求
  1. 数据映射系统 依赖节点规范:
  • 使用端口规范验证连接兼容性
  • 基于数据格式进行转换验证
  • 引用节点规范进行数据校验
  1. 工作流引擎 整合两个系统:
    • 使用节点规范验证节点配置
    • 使用数据映射处理节点间数据流转
    • 执行统一的工作流验证

🚀 AI 代理架构革命 (已完成)

重大架构变更说明

状态: ✅ 已完成 - 这个架构变更已经在 2025 年 1 月实施完成

在实施本计划之前,系统经历了一次革命性的 AI 代理架构转型:

旧架构 (已废弃) ❌

# 硬编码的AI代理角色 - 已移除
- AI_ROUTER_AGENT # 路由决策
- AI_TASK_ANALYZER # 任务分析
- AI_DATA_INTEGRATOR # 数据整合
- AI_REPORT_GENERATOR # 报告生成
- AI_REMINDER_DECISION # 提醒决策
- AI_WEEKLY_REPORT # 周报生成

新架构 (当前) ✅

# 提供商驱动的AI代理 - 功能由系统提示词定义
- AI_GEMINI_NODE # Google Gemini提供商
- AI_OPENAI_NODE # OpenAI GPT提供商
- AI_CLAUDE_NODE # Anthropic Claude提供商

# 核心参数
system_prompt: "您是[角色定义]。[任务描述]。[输出要求]。"
model_version: "provider-specific-model"
temperature: 0.0-2.0
# ... 提供商特定参数

革命性影响

  1. 开发效率提升: 新 AI 功能从数小时缩短到数分钟
  2. 代码简化: 3 个提供商替代了数十个硬编码角色
  3. 无限可能: 任何 AI 任务都可通过提示词实现
  4. 提供商优化: 可为特定任务选择最佳 AI 提供商

对实施计划的影响

  • 简化节点规范: AI_AGENT_NODE 只需定义 3 个提供商规范
  • 统一端口结构: 所有 AI 代理共享相同的输入/输出端口
  • 减少开发工作量: 无需为每个 AI 角色单独开发
  • 增强灵活性: 系统提示词可在运行时动态配置

📅 4 天详细实施计划

Day 1 (第 1 天) - 基础架构 [24 小时]

上午 (0-4 小时): Protocol Buffer 更新

负责人: Backend Developer 1

  • 0-1h: 更新 workflow.proto 添加端口定义消息
  • 1-2h: 添加数据映射相关消息定义
  • 2-3h: 重新生成 Python protobuf 文件
  • 3-4h: 验证生成文件,修复编译错误

Protocol Buffer Schema:

// 端口定义
message InputPort {
string name = 1;
string type = 2; // ConnectionType
bool required = 3;
string description = 4;
int32 max_connections = 5;
string validation_schema = 6; // JSON Schema
}

message OutputPort {
string name = 1;
string type = 2;
string description = 3;
int32 max_connections = 4;
string validation_schema = 5;
}

// 增强的节点定义
message Node {
string id = 1;
string name = 2;
NodeType type = 3;
NodeSubtype subtype = 4;
// ... 其他现有字段 ...

// 新增:端口定义(基于NodeSpec自动生成)
repeated InputPort input_ports = 14;
repeated OutputPort output_ports = 15;
}

// 增强的连接定义
message Connection {
string node = 1;
ConnectionType type = 2;
int32 index = 3; // 向后兼容
string source_port = 4; // 基于NodeSpec验证
string target_port = 5; // 基于NodeSpec验证
DataMapping data_mapping = 6; // 数据映射规则
}

// 数据映射系统
message DataMapping {
MappingType type = 1;
repeated FieldMapping field_mappings = 2;
string transform_script = 3;
map<string, string> static_values = 4;
string description = 5;
}

enum MappingType {
DIRECT = 0;
FIELD_MAPPING = 1;
TEMPLATE = 2;
TRANSFORM = 3;
}

message FieldMapping {
string source_field = 1;
string target_field = 2;
FieldTransform transform = 3;
bool required = 4;
string default_value = 5;
}

message FieldTransform {
TransformType type = 1;
string transform_value = 2;
map<string, string> options = 3;
}

enum TransformType {
NONE = 0;
STRING_FORMAT = 1;
JSON_PATH = 2;
REGEX = 3;
FUNCTION = 4;
CONDITION = 5;
}

上午-下午 (4-12 小时): 节点规范核心系统

负责人: Backend Developer 2

  • 4-6h: 创建 shared/node_specs/base.py 基础数据结构
  • 6-8h: 实现 shared/node_specs/registry.py 注册器核心功能
  • 8-10h: 实现 shared/node_specs/validator.py 基础验证
  • 10-12h: 创建基础单元测试

目录结构:

shared/node_specs/
├── __init__.py
├── base.py # 核心数据结构
├── registry.py # 节点规范注册器
├── validator.py # 统一验证器
└── definitions/
├── __init__.py
├── trigger_nodes.py # 触发器节点规范
├── ai_agent_nodes.py # AI代理节点规范
├── action_nodes.py # 动作节点规范
├── flow_nodes.py # 流程控制节点规范
├── tool_nodes.py # 工具节点规范
├── memory_nodes.py # 记忆节点规范
└── human_loop_nodes.py # 人机交互节点规范

下午-晚上 (12-20 小时): 数据映射核心

负责人: Backend Developer 1 (并行)

  • 12-14h: 创建 DataMappingProcessor 基础框架
  • 14-16h: 实现 DIRECT 和 FIELD_MAPPING 类型
  • 16-18h: 实现 JSONPath 字段提取
  • 18-20h: 集成节点规范验证

数据映射目录结构:

workflow_engine/data_mapping/
├── __init__.py
├── processor.py # 数据映射处理器
├── executors.py # 连接执行器
├── validators.py # 映射验证器
└── engines/
├── template_engine.py # 模板引擎
├── script_engine.py # 脚本引擎
└── function_registry.py # 函数注册器

晚上 (20-24 小时): 测试和集成

负责人: 两人协作

  • 20-22h: 集成测试节点规范和数据映射
  • 22-24h: 修复集成问题,准备第二天工作

Day 2 (第 2 天) - 核心节点规范定义 [24 小时]

上午 (24-32 小时): 高优先级节点规范

负责人: Backend Developer 2

  • 24-26h: 定义 AI_AGENT_NODE 提供商规范 (Gemini/OpenAI/Claude)
  • 26-28h: 定义 TRIGGER_NODE 所有子类型规范
  • 28-30h: 定义 FLOW_NODE 基础子类型规范
  • 30-32h: 注册所有定义的规范到注册器

节点规范示例:

# 示例:OpenAI AI代理提供商规范
OPENAI_NODE_SPEC = NodeSpec(
node_type="AI_AGENT_NODE",
subtype="OPENAI_NODE",
description="OpenAI GPT AI代理,功能完全由系统提示词定义",
parameters=[
ParameterDef(
name="system_prompt",
type=ParameterType.STRING,
required=True,
description="定义AI代理角色、行为和指令的核心系统提示词"
),
ParameterDef(
name="model_version",
type=ParameterType.ENUM,
required=False,
enum_values=["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo", "gpt-4-vision-preview"],
default_value="gpt-4",
description="OpenAI模型版本"
),
ParameterDef(
name="temperature",
type=ParameterType.FLOAT,
required=False,
default_value=0.7,
min_value=0.0,
max_value=2.0,
description="创造性参数,控制输出的随机性"
),
ParameterDef(
name="presence_penalty",
type=ParameterType.FLOAT,
required=False,
default_value=0.0,
min_value=-2.0,
max_value=2.0,
description="存在惩罚,减少重复内容"
)
],
input_ports=[
InputPortSpec(
name="main",
type="MAIN",
required=True,
description="AI代理的主要输入数据",
validation_schema='{"type": "object", "properties": {"user_message": {"type": "string"}}, "required": ["user_message"]}'
),
InputPortSpec(
name="ai_tool",
type="AI_TOOL",
required=False,
max_connections=10,
description="AI代理可调用的工具连接"
),
InputPortSpec(
name="ai_memory",
type="AI_MEMORY",
required=False,
max_connections=5,
description="AI代理可访问的记忆系统连接"
)
],
output_ports=[
OutputPortSpec(
name="main",
type="MAIN",
description="AI代理的处理结果",
validation_schema='{"type": "object", "properties": {"response": {"type": "string"}, "metadata": {"type": "object"}}, "required": ["response"]}'
),
OutputPortSpec(
name="error",
type="MAIN",
description="执行失败时的错误信息"
)
],
examples=[
{
"title": "客户服务路由代理",
"description": "智能分析客户询问并路由到适当部门",
"parameters": {
"system_prompt": "您是智能客户服务路由系统。分析客户询问并路由到适当部门(billing/technical/sales/general),提供置信分数和推理。",
"model_version": "gpt-4",
"temperature": 0.1
}
},
{
"title": "数据分析代理",
"description": "分析数据集并提供业务洞察",
"parameters": {
"system_prompt": "您是高级数据分析师。分析提供的数据集,提供统计概览、趋势分析和可操作的业务建议,以结构化JSON格式输出。",
"model_version": "gpt-4-turbo",
"temperature": 0.3
}
}
]
)

# 示例:Gemini AI代理提供商规范
GEMINI_NODE_SPEC = NodeSpec(
node_type="AI_AGENT_NODE",
subtype="GEMINI_NODE",
description="Google Gemini AI代理,功能完全由系统提示词定义",
parameters=[
ParameterDef(
name="system_prompt",
type=ParameterType.STRING,
required=True,
description="定义AI代理角色、行为和指令的核心系统提示词"
),
ParameterDef(
name="model_version",
type=ParameterType.ENUM,
required=False,
enum_values=["gemini-pro", "gemini-pro-vision", "gemini-ultra"],
default_value="gemini-pro",
description="Gemini模型版本"
),
ParameterDef(
name="safety_settings",
type=ParameterType.JSON,
required=False,
description="Gemini特有的安全设置配置"
)
],
# ... 相同的input_ports和output_ports
)

# 示例:Claude AI代理提供商规范
CLAUDE_NODE_SPEC = NodeSpec(
node_type="AI_AGENT_NODE",
subtype="CLAUDE_NODE",
description="Anthropic Claude AI代理,功能完全由系统提示词定义",
parameters=[
ParameterDef(
name="system_prompt",
type=ParameterType.STRING,
required=True,
description="定义AI代理角色、行为和指令的核心系统提示词"
),
ParameterDef(
name="model_version",
type=ParameterType.ENUM,
required=False,
enum_values=["claude-3-opus", "claude-3-sonnet", "claude-3-haiku", "claude-2.1"],
default_value="claude-3-sonnet",
description="Claude模型版本"
),
ParameterDef(
name="stop_sequences",
type=ParameterType.JSON,
required=False,
description="Claude特有的停止序列配置"
)
]
# ... 相同的input_ports和output_ports
)

上午-下午 (32-40 小时): 数据映射高级功能

负责人: Backend Developer 1

  • 32-34h: 实现 TEMPLATE 转换类型
  • 34-36h: 实现基础的 TRANSFORM 脚本执行
  • 36-38h: 实现静态值注入功能
  • 38-40h: 添加错误处理和日志记录

下午-晚上 (40-46 小时): 工作流引擎集成

负责人: Backend Developer 2

  • 40-42h: 更新 BaseNodeExecutor 集成节点规范
  • 42-44h: 更新 2-3 个核心节点执行器作为示例
  • 44-46h: 更新 WorkflowValidator 支持新验证

BaseNodeExecutor 增强:

class BaseNodeExecutor(ABC):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.spec = self._get_node_spec()
self.data_mapper = DataMappingProcessor()

def _get_node_spec(self) -> Optional[NodeSpec]:
"""获取此执行器的规范"""
return None

def validate(self, node: Any) -> List[str]:
"""根据规范验证节点"""
if self.spec:
return node_spec_registry.validate_node(node)
return []

def get_input_port_specs(self) -> List[InputPortSpec]:
"""获取输入端口规范"""
return self.spec.input_ports if self.spec else []

def get_output_port_specs(self) -> List[OutputPortSpec]:
"""获取输出端口规范"""
return self.spec.output_ports if self.spec else []

def validate_input_data(self, port_name: str, data: Dict[str, Any]) -> List[str]:
"""验证输入端口数据"""
if not self.spec:
return []

port_spec = None
for port in self.spec.input_ports:
if port.name == port_name:
port_spec = port
break

if not port_spec:
return [f"未知输入端口: {port_name}"]

return NodeSpecValidator.validate_port_data(port_spec, data)

晚上 (46-48 小时): ConnectionExecutor 实现

负责人: Backend Developer 1

  • 46-48h: 实现新的 ConnectionExecutor 支持数据映射

Day 3 (第 3 天) - API 和验证完善 [24 小时]

上午 (48-56 小时): API 端点开发

负责人: Backend Developer 1

  • 48-50h: 实现节点规范查询 API
  • 50-52h: 实现工作流验证 API
  • 52-54h: 实现连接验证 API
  • 54-56h: 实现数据映射测试 API

API 端点增强:

# 节点规范相关API
@router.get("/node-types")
async def get_node_types():
"""获取所有节点类型和子类型"""
# 从节点规范注册器获取

@router.get("/node-types/{node_type}/{subtype}/spec")
async def get_node_spec(node_type: str, subtype: str):
"""获取特定节点的详细规范"""
# 返回完整的参数、端口、验证规则

@router.post("/workflows/{workflow_id}/validate")
async def validate_workflow(workflow_id: str, workflow_data: dict):
"""验证工作流配置"""
# 使用统一的工作流验证器

@router.post("/connections/validate")
async def validate_connection(connection_config: dict):
"""验证端口连接兼容性"""
# 验证两个节点的端口是否可以连接

@router.post("/data-mapping/test")
async def test_data_mapping(mapping_config: dict, sample_data: dict):
"""测试数据映射转换"""
# 允许前端测试数据映射规则

上午-下午 (56-64 小时): 剩余节点规范

负责人: Backend Developer 2

  • 56-58h: 定义 ACTION_NODE 基础子类型
  • 58-60h: 定义 TOOL_NODE 基础子类型
  • 60-62h: 定义 MEMORY_NODE 基础子类型
  • 62-64h: 注册并测试所有节点规范

下午-晚上 (64-70 小时): 向后兼容性

负责人: 两人协作

  • 64-66h: 实现 index-based 连接的兼容性支持 (跳过 - 项目初期阶段)
  • 66-68h: 创建简单的迁移工具 (跳过 - 项目初期阶段)
  • 68-70h: 测试现有工作流的兼容性 (跳过 - 项目初期阶段)

晚上 (70-72 小时): 性能优化

负责人: Backend Developer 1

  • 70-72h: 添加缓存机制,优化关键路径性能

Day 4 (第 4 天) - 测试、调试、部署 [24 小时]

上午 (72-80 小时): 全面测试

负责人: 两人协作

  • 72-74h: 端到端集成测试
  • 74-76h: 性能测试和瓶颈识别
  • 76-78h: 边界情况和错误处理测试
  • 78-80h: API 功能完整性测试

上午-下午 (80-88 小时): 问题修复

负责人: 根据测试结果分工

  • 80-84h: 修复发现的关键问题
  • 84-88h: 再次回归测试确保稳定性

下午-晚上 (88-96 小时): 部署准备

负责人: 两人协作

  • 88-90h: 准备部署脚本和数据库迁移
  • 90-92h: 在测试环境完整部署测试
  • 92-94h: 准备生产环境部署
  • 94-96h: 文档更新和交接准备

🎯 MVP 功能范围

必须实现 (P0)

  • 基础节点规范系统 (注册器、验证器)
  • 5 个核心节点类型的规范定义 (AI_AGENT, TRIGGER, FLOW, ACTION, TOOL)
  • 数据映射的 DIRECT 和 FIELD_MAPPING 类型
  • 基本的端口连接验证
  • 核心 API 端点 (规范查询、工作流验证)
  • 向后兼容性支持 (跳过 - 项目初期阶段)

应该实现 (P1)

  • TEMPLATE 和 TRANSFORM 数据映射类型
  • 完整的节点规范覆盖
  • 性能优化和缓存
  • 详细的错误信息和调试功能

可以推迟 (P2)

  • 高级数据映射功能 (复杂转换函数)
  • 可视化编辑器支持 API (未实现 - P2 优先级)
  • 完整的迁移工具 (跳过 - 项目初期阶段)
  • 详细的监控和指标

⚠️ 风险管理

高风险项目

  1. Protocol Buffer 兼容性 - 可能破坏现有系统
    • 缓解: 先在 feature branch 测试,确保向后兼容
  2. 性能影响 - 新的验证逻辑可能影响性能
    • 缓解: 实现基础缓存,性能测试验证
  3. 集成复杂性 - 多个系统同时修改
    • 缓解: 小步骤迭代,频繁集成测试

应急预案

  • Day 1 结束: 如果基础架构有问题,Day 2 上午专门修复
  • Day 2 结束: 如果节点规范不完整,专注核心 5 个类型
  • Day 3 结束: 如果 API 有问题,简化功能确保基本可用
  • Day 4: 如果有严重问题,准备回滚方案

🔧 开发环境设置

必需工具

  • Protocol Buffer 编译器
  • Python 3.8+ 环境
  • Redis (用于缓存测试)
  • Supabase (用于集成测试)

代码分支策略

# 主开发分支
git checkout -b feature/unified-node-specs

# 各子功能分支 (可并行开发)
git checkout -b feature/protobuf-updates
git checkout -b feature/node-specs-core
git checkout -b feature/data-mapping-core
git checkout -b feature/api-endpoints

每日同步

  • 每天 9:00: 站会,确认进度和依赖
  • 每天 18:00: 代码合并和集成测试
  • 每天 22:00: 问题回顾和第二天计划调整

📊 成功标准

Day 1 完成标准

  • Protocol Buffer 文件编译成功
  • 节点规范注册器可以注册和查询规范
  • 数据映射处理器可以处理 DIRECT 类型
  • 基础单元测试通过

Day 2 完成标准

  • 3 个 AI 代理提供商规范完整定义 (Gemini/OpenAI/Claude)
  • 至少 4 个其他核心节点类型有完整规范定义 (TRIGGER, FLOW, ACTION, TOOL)
  • 数据映射支持 FIELD_MAPPING 和 TEMPLATE
  • BaseNodeExecutor 成功集成节点规范
  • 至少 1 个现有节点执行器完成升级 (AI 代理执行器优先)

Day 3 完成标准

  • 核心 API 端点可以正常工作
  • 工作流验证包含新的端口和数据映射检查
  • 现有工作流保持兼容性 (跳过 - 项目初期阶段)
  • 性能满足基本要求 (少于 500ms 验证时间)

Day 4 完成标准

  • 所有核心功能端到端测试通过
  • 在测试环境成功部署
  • 准备好生产部署脚本
  • 基础文档和交接材料完成

🔄 向后兼容性策略

兼容性保证

  1. 现有工作流: 继续使用 index-based 连接方式
  2. API 接口: 保持现有 API 的响应格式
  3. 节点执行器: 逐步迁移,支持两种模式并存

迁移路径

  1. Phase 1: 部署新系统,默认关闭新功能
  2. Phase 2: 为新创建的工作流启用端口和数据映射
  3. Phase 3: 提供迁移工具将旧工作流转换为新格式
  4. Phase 4: 逐步弃用旧的连接方式

迁移工具

class WorkflowMigrationTool:
def migrate_workflow_to_port_based(self, old_workflow: dict) -> dict:
"""将基于索引的连接迁移为基于端口的连接"""
# 自动推断端口名称并生成数据映射配置
pass

def generate_node_ports_from_spec(self, node: dict) -> dict:
"""基于节点规范为现有节点生成端口定义"""
# 查询节点规范并生成相应的端口配置
pass

👥 团队协作

代码审查

  • 即时审查: 关键代码实时配对编程
  • 每日审查: 晚上 18:00 统一代码审查
  • 快速决策: 设计问题 30 分钟内必须决定

沟通机制

  • 紧急问题: 立即语音/视频沟通
  • 进度更新: 每 4 小时在群里更新进度
  • 技术决策: 记录在 shared 文档,所有人可见

📈 最终成功指标

技术指标

  • 100% 覆盖所有现有节点类型的规范定义 (包括 3 个 AI 代理提供商)
  • 少于 100ms 节点规范查询性能
  • 少于 500ms 数据映射处理性能
  • 大于 95% 向后兼容性测试通过率 (跳过 - 项目初期阶段)
  • AI 代理系统提示词验证和参数校验 100%覆盖

用户体验指标

  • 节点配置错误减少 80%+
  • 工作流创建时间减少 50%+
  • 前端自动生成表单覆盖 100%节点类型
  • 开发者上手时间减少 60%+

业务指标

  • 工作流执行成功率提升 10%+
  • 支持更复杂的业务场景配置
  • 减少客户因配置错误导致的支持请求

🎯 总结

这个 4 天紧急实施计划将三个原本独立的系统整合为一个协调工作的整体:

  1. 节点规范系统 作为基础架构,提供类型安全和验证能力
  2. 数据映射系统 基于节点规范构建,提供强大的数据转换能力
  3. 端口系统 整合到节点规范中,提供清晰的数据流定义

通过 4 天的高强度并行开发、功能优先级划分和持续集成,确保在极短时间内交付核心功能,同时保持系统稳定性和向后兼容性。


文档版本: 2.0 创建时间: 2025-01-28 作者: Claude Code 状态: 紧急实施计划 预计完成: 2025-02-01 (4 天后)