节点规范与数据映射系统实施计划
📋 概述
本文档整合了原有的三个独立设计文档,提供统一的实施计划:
- 节点规范系统 (
node_spec.md
) - 核心架构,定义节点类型、参数、端口规范 - 数据映射系统 (
data_mapping_system.md
) - 基于节点规范的数据转换层 端口定义计划 (- 已整合到节点规范系统NODE_PORT_DEFINITION_PLAN.md
)
🚨 4 天紧急实施模式
总时间: 4 天 (96 小时) 实施模式: 紧急开发,优先核心功能,最小可行产品(MVP) 团队: 需要 2-3 名全职开发者并行工作
🏗️ 统一架构设计
系统层次结构
核心依赖关系
- 节点规范系统 是基础层,定义:
- 节点类型和子类型的规范结构
- 输入输出端口的类型和验证规则
- 参数定义和验证逻辑
- 数据格式要求
- 数据映射系统 依赖节点规范:
- 使用端口规范验证连接兼容性
- 基于数据格式进行转换验证
- 引用节点规范进行数据校验
- 工作流引擎 整合两个系统:
- 使用节点规范验证节点配置
- 使用数据映射处理节点间数据流转
- 执行统一的工作流验证
🚀 AI 代理架构革命 (已完成)
重大架构变更说明
状态: ✅ 已完成 - 这个架构变更已经在 2025 年 1 月实施完成
在实施本计划之前,系统经历了一次革命性的 AI 代理架构转型:
旧架构 (已废弃) ❌
# 硬编码的AI代理角色 - 已移除
- AI_ROUTER_AGENT # 路由决策
- AI_TASK_ANALYZER # 任务分析
- AI_DATA_INTEGRATOR # 数据整合
- AI_REPORT_GENERATOR # 报告生成
- AI_REMINDER_DECISION # 提醒决策
- AI_WEEKLY_REPORT # 周报生成
新架构 (当前) ✅
# 提供商驱动的AI代理 - 功能由系统提示词定义
- AI_GEMINI_NODE # Google Gemini提供商
- AI_OPENAI_NODE # OpenAI GPT提供商
- AI_CLAUDE_NODE # Anthropic Claude提供商
# 核心参数
system_prompt: "您是[角色定义]。[任务描述]。[输出要求]。"
model_version: "provider-specific-model"
temperature: 0.0-2.0
# ... 提供商特定参数
革命性影响
- 开发效率提升: 新 AI 功能从数小时缩短到数分钟
- 代码简化: 3 个提供商替代了数十个硬编码角色
- 无限可能: 任何 AI 任务都可通过提示词实现
- 提供商优化: 可为特定任务选择最佳 AI 提供商
对实施计划的影响
- 简化节点规范: AI_AGENT_NODE 只需定义 3 个提供商规范
- 统一端口结构: 所有 AI 代理共享相同的输入/输出端口
- 减少开发工作量: 无需为每个 AI 角色单独开发
- 增强灵活性: 系统提示词可在运行时动态配置
📅 4 天详细实施计划
Day 1 (第 1 天) - 基础架构 [24 小时]
上午 (0-4 小时): Protocol Buffer 更新
负责人: Backend Developer 1
- 0-1h: 更新
workflow.proto
添加端口定义消息 - 1-2h: 添加数据映射相关消息定义
- 2-3h: 重新生成 Python protobuf 文件
- 3-4h: 验证生成文件,修复编译错误
Protocol Buffer Schema:
// 端口定义
message InputPort {
string name = 1;
string type = 2; // ConnectionType
bool required = 3;
string description = 4;
int32 max_connections = 5;
string validation_schema = 6; // JSON Schema
}
message OutputPort {
string name = 1;
string type = 2;
string description = 3;
int32 max_connections = 4;
string validation_schema = 5;
}
// 增强的节点定义
message Node {
string id = 1;
string name = 2;
NodeType type = 3;
NodeSubtype subtype = 4;
// ... 其他现有字段 ...
// 新增:端口定义(基于NodeSpec自动生成)
repeated InputPort input_ports = 14;
repeated OutputPort output_ports = 15;
}
// 增强的连接定义
message Connection {
string node = 1;
ConnectionType type = 2;
int32 index = 3; // 向后兼容
string source_port = 4; // 基于NodeSpec验证
string target_port = 5; // 基于NodeSpec验证
DataMapping data_mapping = 6; // 数据映射规则
}
// 数据映射系统
message DataMapping {
MappingType type = 1;
repeated FieldMapping field_mappings = 2;
string transform_script = 3;
map<string, string> static_values = 4;
string description = 5;
}
enum MappingType {
DIRECT = 0;
FIELD_MAPPING = 1;
TEMPLATE = 2;
TRANSFORM = 3;
}
message FieldMapping {
string source_field = 1;
string target_field = 2;
FieldTransform transform = 3;
bool required = 4;
string default_value = 5;
}
message FieldTransform {
TransformType type = 1;
string transform_value = 2;
map<string, string> options = 3;
}
enum TransformType {
NONE = 0;
STRING_FORMAT = 1;
JSON_PATH = 2;
REGEX = 3;
FUNCTION = 4;
CONDITION = 5;
}
上午-下午 (4-12 小时): 节点规范核心系统
负责人: Backend Developer 2
- 4-6h: 创建
shared/node_specs/base.py
基础数据结构 - 6-8h: 实现
shared/node_specs/registry.py
注册器核心功能 - 8-10h: 实现
shared/node_specs/validator.py
基础验证 - 10-12h: 创建基础单元测试
目录结构:
shared/node_specs/
├── __init__.py
├── base.py # 核心数据结构
├── registry.py # 节点规范注册器
├── validator.py # 统一验证器
└── definitions/
├── __init__.py
├── trigger_nodes.py # 触发器节点规范
├── ai_agent_nodes.py # AI代理节点规范
├── action_nodes.py # 动作节点规范
├── flow_nodes.py # 流程控制节点规范
├── tool_nodes.py # 工具节点规范
├── memory_nodes.py # 记忆节点规范
└── human_loop_nodes.py # 人机交互节点规范
下午-晚上 (12-20 小时): 数据映射核心
负责人: Backend Developer 1 (并行)
- 12-14h: 创建
DataMappingProcessor
基础框架 - 14-16h: 实现 DIRECT 和 FIELD_MAPPING 类型
- 16-18h: 实现 JSONPath 字段提取
- 18-20h: 集成节点规范验证
数据映射目录结构:
workflow_engine/data_mapping/
├── __init__.py
├── processor.py # 数据映射处理器
├── executors.py # 连接执行器
├── validators.py # 映射验证器
└── engines/
├── template_engine.py # 模板引擎
├── script_engine.py # 脚本引擎
└── function_registry.py # 函数注册器
晚上 (20-24 小时): 测试和集成
负责人: 两人协作
- 20-22h: 集成测试节点规范和数据映射
- 22-24h: 修复集成问题,准备第二天工作
Day 2 (第 2 天) - 核心节点规范定义 [24 小时]
上午 (24-32 小时): 高优先级节点规范
负责人: Backend Developer 2
- 24-26h: 定义
AI_AGENT_NODE
提供商规范 (Gemini/OpenAI/Claude) - 26-28h: 定义
TRIGGER_NODE
所有子类型规范 - 28-30h: 定义
FLOW_NODE
基础子类型规范 - 30-32h: 注册所有定义的规范到注册器
节点规范示例:
# 示例:OpenAI AI代理提供商规范
OPENAI_NODE_SPEC = NodeSpec(
node_type="AI_AGENT_NODE",
subtype="OPENAI_NODE",
description="OpenAI GPT AI代理,功能完全由系统提示词定义",
parameters=[
ParameterDef(
name="system_prompt",
type=ParameterType.STRING,
required=True,
description="定义AI代理角色、行为和指令的核心系统提示词"
),
ParameterDef(
name="model_version",
type=ParameterType.ENUM,
required=False,
enum_values=["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo", "gpt-4-vision-preview"],
default_value="gpt-4",
description="OpenAI模型版本"
),
ParameterDef(
name="temperature",
type=ParameterType.FLOAT,
required=False,
default_value=0.7,
min_value=0.0,
max_value=2.0,
description="创造性参数,控制输出的随机性"
),
ParameterDef(
name="presence_penalty",
type=ParameterType.FLOAT,
required=False,
default_value=0.0,
min_value=-2.0,
max_value=2.0,
description="存在惩罚,减少重复内容"
)
],
input_ports=[
InputPortSpec(
name="main",
type="MAIN",
required=True,
description="AI代理的主要输入数据",
validation_schema='{"type": "object", "properties": {"user_message": {"type": "string"}}, "required": ["user_message"]}'
),
InputPortSpec(
name="ai_tool",
type="AI_TOOL",
required=False,
max_connections=10,
description="AI代理可调用的工具连接"
),
InputPortSpec(
name="ai_memory",
type="AI_MEMORY",
required=False,
max_connections=5,
description="AI代理可访问的记忆系统连接"
)
],
output_ports=[
OutputPortSpec(
name="main",
type="MAIN",
description="AI代理的处理结果",
validation_schema='{"type": "object", "properties": {"response": {"type": "string"}, "metadata": {"type": "object"}}, "required": ["response"]}'
),
OutputPortSpec(
name="error",
type="MAIN",
description="执行失败时的错误信息"
)
],
examples=[
{
"title": "客户服务路由代理",
"description": "智能分析客户询问并路由到适当部门",
"parameters": {
"system_prompt": "您是智能客户服务路由系统。分析客户询问并路由到适当部门(billing/technical/sales/general),提供置信分数和推理。",
"model_version": "gpt-4",
"temperature": 0.1
}
},
{
"title": "数据分析代理",
"description": "分析数据集并提供业务洞察",
"parameters": {
"system_prompt": "您是高级数据分析师。分析提供的数据集,提供统计概览、趋势分析和可操作的业务建议,以结构化JSON格式输出。",
"model_version": "gpt-4-turbo",
"temperature": 0.3
}
}
]
)
# 示例:Gemini AI代理提供商规范
GEMINI_NODE_SPEC = NodeSpec(
node_type="AI_AGENT_NODE",
subtype="GEMINI_NODE",
description="Google Gemini AI代理,功能完全由系统提示词定义",
parameters=[
ParameterDef(
name="system_prompt",
type=ParameterType.STRING,
required=True,
description="定义AI代理角色、行为和指令的核心系统提示词"
),
ParameterDef(
name="model_version",
type=ParameterType.ENUM,
required=False,
enum_values=["gemini-pro", "gemini-pro-vision", "gemini-ultra"],
default_value="gemini-pro",
description="Gemini模型版本"
),
ParameterDef(
name="safety_settings",
type=ParameterType.JSON,
required=False,
description="Gemini特有的安全设置配置"
)
],
# ... 相同的input_ports和output_ports
)
# 示例:Claude AI代理提供商规范
CLAUDE_NODE_SPEC = NodeSpec(
node_type="AI_AGENT_NODE",
subtype="CLAUDE_NODE",
description="Anthropic Claude AI代理,功能完全由系统提示词定义",
parameters=[
ParameterDef(
name="system_prompt",
type=ParameterType.STRING,
required=True,
description="定义AI代理角色、行为和指令的核心系统提示词"
),
ParameterDef(
name="model_version",
type=ParameterType.ENUM,
required=False,
enum_values=["claude-3-opus", "claude-3-sonnet", "claude-3-haiku", "claude-2.1"],
default_value="claude-3-sonnet",
description="Claude模型版本"
),
ParameterDef(
name="stop_sequences",
type=ParameterType.JSON,
required=False,
description="Claude特有的停止序列配置"
)
]
# ... 相同的input_ports和output_ports
)
上午-下午 (32-40 小时): 数据映射高级功能
负责人: Backend Developer 1
- 32-34h: 实现 TEMPLATE 转换类型
- 34-36h: 实现基础的 TRANSFORM 脚本执行
- 36-38h: 实现静态值注入功能
- 38-40h: 添加错误处理和日志记录
下午-晚上 (40-46 小时): 工作流引擎集成
负责人: Backend Developer 2
- 40-42h: 更新
BaseNodeExecutor
集成节点规范 - 42-44h: 更新 2-3 个核心节点执行器作为示例
- 44-46h: 更新
WorkflowValidator
支持新验证
BaseNodeExecutor 增强:
class BaseNodeExecutor(ABC):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.spec = self._get_node_spec()
self.data_mapper = DataMappingProcessor()
def _get_node_spec(self) -> Optional[NodeSpec]:
"""获取此执行器的规范"""
return None
def validate(self, node: Any) -> List[str]:
"""根据规范验证节点"""
if self.spec:
return node_spec_registry.validate_node(node)
return []
def get_input_port_specs(self) -> List[InputPortSpec]:
"""获取输入端口规范"""
return self.spec.input_ports if self.spec else []
def get_output_port_specs(self) -> List[OutputPortSpec]:
"""获取输出端口规范"""
return self.spec.output_ports if self.spec else []
def validate_input_data(self, port_name: str, data: Dict[str, Any]) -> List[str]:
"""验证输入端口数据"""
if not self.spec:
return []
port_spec = None
for port in self.spec.input_ports:
if port.name == port_name:
port_spec = port
break
if not port_spec:
return [f"未知输入端口: {port_name}"]
return NodeSpecValidator.validate_port_data(port_spec, data)
晚上 (46-48 小时): ConnectionExecutor 实现
负责人: Backend Developer 1
- 46-48h: 实现新的
ConnectionExecutor
支持数据映射
Day 3 (第 3 天) - API 和验证完善 [24 小时]
上午 (48-56 小时): API 端点开发
负责人: Backend Developer 1
- 48-50h: 实现节点规范查询 API
- 50-52h: 实现工作流验证 API
- 52-54h: 实现连接验证 API
- 54-56h: 实现数据映射测试 API
API 端点增强:
# 节点规范相关API
@router.get("/node-types")
async def get_node_types():
"""获取所有节点类型和子类型"""
# 从节点规范注册器获取
@router.get("/node-types/{node_type}/{subtype}/spec")
async def get_node_spec(node_type: str, subtype: str):
"""获取特定节点的详细规范"""
# 返回完整的参数、端口、验证规则
@router.post("/workflows/{workflow_id}/validate")
async def validate_workflow(workflow_id: str, workflow_data: dict):
"""验证工作流配置"""
# 使用统一的工作流验证器
@router.post("/connections/validate")
async def validate_connection(connection_config: dict):
"""验证端口连接兼容性"""
# 验证两个节点的端口是否可以连接
@router.post("/data-mapping/test")
async def test_data_mapping(mapping_config: dict, sample_data: dict):
"""测试数据映射转换"""
# 允许前端测试数据映射规则
上午-下午 (56-64 小时): 剩余节点规范
负责人: Backend Developer 2
- 56-58h: 定义
ACTION_NODE
基础子类型 - 58-60h: 定义
TOOL_NODE
基础子类型 - 60-62h: 定义
MEMORY_NODE
基础子类型 - 62-64h: 注册并测试所有节点规范
下午-晚上 (64-70 小时): 向后兼容性
负责人: 两人协作
- 64-66h: 实现 index-based 连接的兼容性支持 (跳过 - 项目初期阶段)
- 66-68h: 创建简单的迁移工具 (跳过 - 项目初期阶段)
- 68-70h: 测试现有工作流的兼容性 (跳过 - 项目初期阶段)
晚上 (70-72 小时): 性能优化
负责人: Backend Developer 1
- 70-72h: 添加缓存机制,优化关键路径性能
Day 4 (第 4 天) - 测试、调试、部署 [24 小时]
上午 (72-80 小时): 全面测试
负责人: 两人协作
- 72-74h: 端到端集成测试
- 74-76h: 性能测试和瓶颈识别
- 76-78h: 边界情况和错误处理测试
- 78-80h: API 功能完整性测试
上午-下午 (80-88 小时): 问题修复
负责人: 根据测试结果分工
- 80-84h: 修复发现的关键问题
- 84-88h: 再次回归测试确保稳定性
下午-晚上 (88-96 小时): 部署准备
负责人: 两人协作
- 88-90h: 准备部署脚本和数据库迁移
- 90-92h: 在测试环境完整部署测试
- 92-94h: 准备生产环境部署
- 94-96h: 文档更新和交接准备
🎯 MVP 功能范围
必须实现 (P0)
- 基础节点规范系统 (注册器、验证器)
- 5 个核心节点类型的规范定义 (AI_AGENT, TRIGGER, FLOW, ACTION, TOOL)
- 数据映射的 DIRECT 和 FIELD_MAPPING 类型
- 基本的端口连接验证
- 核心 API 端点 (规范查询、工作流验证)
- 向后兼容性支持 (跳过 - 项目初期阶段)
应该实现 (P1)
- TEMPLATE 和 TRANSFORM 数据映射类型
- 完整的节点规范覆盖
- 性能优化和缓存
- 详细的错误信息和调试功能
可以推迟 (P2)
- 高级数据映射功能 (复杂转换函数)
- 可视化编辑器支持 API (未实现 - P2 优先级)
- 完整的迁移工具 (跳过 - 项目初期阶段)
- 详细的监控和指标
⚠️ 风险管理
高风险项目
- Protocol Buffer 兼容性 - 可能破坏现有系统
- 缓解: 先在 feature branch 测试,确保向后兼容
- 性能影响 - 新的验证逻辑可能影响性能
- 缓解: 实现基础缓存,性能测试验证
- 集成复杂性 - 多个系统同时修改
- 缓解: 小步骤迭代,频繁集成测试
应急预案
- Day 1 结束: 如果基础架构有问题,Day 2 上午专门修复
- Day 2 结束: 如果节点规范不完整,专注核心 5 个类型
- Day 3 结束: 如果 API 有问题,简化功能确保基本可用
- Day 4: 如果有严重问题,准备回滚方案
🔧 开发环境设置
必需工具
- Protocol Buffer 编译器
- Python 3.8+ 环境
- Redis (用于缓存测试)
- Supabase (用于集成测试)
代码分支策略
# 主开发分支
git checkout -b feature/unified-node-specs
# 各子功能分支 (可并行开发)
git checkout -b feature/protobuf-updates
git checkout -b feature/node-specs-core
git checkout -b feature/data-mapping-core
git checkout -b feature/api-endpoints
每日同步
- 每天 9:00: 站会,确认进度和依赖
- 每天 18:00: 代码合并和集成测试
- 每天 22:00: 问题回顾和第二天计划调整
📊 成功标准
Day 1 完成标准
- Protocol Buffer 文件编译成功
- 节点规范注册器可以注册和查询规范
- 数据映射处理器可以处理 DIRECT 类型
- 基础单元测试通过
Day 2 完成标准
- 3 个 AI 代理提供商规范完整定义 (Gemini/OpenAI/Claude)
- 至少 4 个其他核心节点类型有完整规范定义 (TRIGGER, FLOW, ACTION, TOOL)
- 数据映射支持 FIELD_MAPPING 和 TEMPLATE
- BaseNodeExecutor 成功集成节点规范
- 至少 1 个现有节点执行器完成升级 (AI 代理执行器优先)
Day 3 完成标准
- 核心 API 端点可以正常工作
- 工作流验证包含新的端口和数据映射检查
- 现有工作流保持兼容性 (跳过 - 项目初期阶段)
- 性能满足基本要求 (少于 500ms 验证时间)
Day 4 完成标准
- 所有核心功能端到端测试通过
- 在测试环境成功部署
- 准备好生产部署脚本
- 基础文档和交接材料完成
🔄 向后兼容性策略
兼容性保证
- 现有工作流: 继续使用 index-based 连接方式
- API 接口: 保持现有 API 的响应格式
- 节点执行器: 逐步迁移,支持两种模式并存
迁移路径
- Phase 1: 部署新系统,默认关闭新功能
- Phase 2: 为新创建的工作流启用端口和数据映射
- Phase 3: 提供迁移工具将旧工作流转换为新格式
- Phase 4: 逐步弃用旧的连接方式
迁移工具
class WorkflowMigrationTool:
def migrate_workflow_to_port_based(self, old_workflow: dict) -> dict:
"""将基于索引的连接迁移为基于端口的连接"""
# 自动推断端口名称并生成数据映射配置
pass
def generate_node_ports_from_spec(self, node: dict) -> dict:
"""基于节点规范为现有节点生成端口定义"""
# 查询节点规范并生成相应的端口配置
pass
👥 团队协作
代码审查
- 即时审查: 关键代码实时配对编程
- 每日审查: 晚上 18:00 统一代码审查
- 快速决策: 设计问题 30 分钟内必须决定
沟通机制
- 紧急问题: 立即语音/视频沟通
- 进度更新: 每 4 小时在群里更新进度
- 技术决策: 记录在 shared 文档,所有人可见
📈 最终成功指标
技术指标
- 100% 覆盖所有现有节点类型的规范定义 (包括 3 个 AI 代理提供商)
- 少于 100ms 节点规范查询性能
- 少于 500ms 数据映射处理性能
- 大于 95% 向后兼容性测试通过率 (跳过 - 项目初期阶段)
- AI 代理系统提示词验证和参数校验 100%覆盖
用户体验指标
- 节点配置错误减少 80%+
- 工作流创建时间减少 50%+
- 前端自动生成表单覆盖 100%节点类型
- 开发者上手时间减少 60%+
业务指标
- 工作流执行成功率提升 10%+
- 支持更复杂的业务场景配置
- 减少客户因配置错误导致的支持请求
🎯 总结
这个 4 天紧急实施计划将三个原本独立的系统整合为一个协调工作的整体:
- 节点规范系统 作为基础架构,提供类型安全和验证能力
- 数据映射系统 基于节点规范构建,提供强大的数据转换能力
- 端口系统 整合到节点规范中,提供清晰的数据流定义
通过 4 天的高强度并行开发、功能优先级划分和持续集成,确保在极短时间内交付核心功能,同时保持系统稳定性和向后兼容性。
文档版本: 2.0 创建时间: 2025-01-28 作者: Claude Code 状态: 紧急实施计划 预计完成: 2025-02-01 (4 天后)