Node Specification System Technical Design
Executive Summaryโ
The Node Specification System is a centralized, code-based framework that defines the complete behavioral and structural specifications for all workflow node types. This system provides type-safe parameter validation, comprehensive configuration schemas, and automated instance creation for the 8 core node types across the workflow engine.
Key Architectural Decisions:
- Code-Based Storage: Specifications stored in Python files under
shared/node_specs/for version control and type safety - BaseModel Architecture: All specifications inherit from
BaseNodeSpec(Pydantic-based) for validation - Registry Pattern: Global
NODE_SPECS_REGISTRYprovides O(1) access to specifications by type.subtype key - Output-Key Based Routing: Simplified connection system using
output_keyinstead of complex port specifications - Conversion Functions: Support for runtime data transformation between connected nodes
Technology Stack:
- Base Classes: Pydantic BaseModel for schema validation
- Storage: Python modules with explicit imports
- Runtime Access: Dictionary-based registry with wrapper class for backward compatibility
System Architectureโ
High-Level Architectureโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Workflow Engine โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Node Executor (Runtime) โ โ
โ โ - Validates configurations against spec โ โ
โ โ - Creates node instances โ โ
โ โ - Executes node logic โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โฒ โ
โ โ โ
โ โ get_node_spec() โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ NODE_SPECS_REGISTRY (Global Registry) โ โ
โ โ - Dictionary: "TYPE.SUBTYPE" โ NodeSpec โ โ
โ โ - O(1) lookup performance โ โ
โ โ - 50+ node specifications loaded at startup โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โฒ โ
โ โ โ
โ โ import โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Node Specification Files (shared/node_specs/) โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ AI_AGENT/ โ โ โ
โ โ โ - OPENAI_CHATGPT.py โ โ โ
โ โ โ - ANTHROPIC_CLAUDE.py โ โ โ
โ โ โ - GOOGLE_GEMINI.py โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ TRIGGER/ โ โ โ
โ โ โ - MANUAL.py, WEBHOOK.py, CRON.py โ โ โ
โ โ โ - GITHUB.py, SLACK.py, EMAIL.py โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ FLOW/, ACTION/, EXTERNAL_ACTION/ โ โ โ
โ โ โ TOOL/, MEMORY/, HUMAN_IN_THE_LOOP/ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Directory Structureโ
apps/backend/shared/node_specs/
โโโ __init__.py # Registry and exports
โโโ base.py # Base classes and types
โโโ registry.py # Backward compatibility wrapper
โโโ AI_AGENT/
โ โโโ __init__.py
โ โโโ OPENAI_CHATGPT.py
โ โโโ ANTHROPIC_CLAUDE.py
โ โโโ GOOGLE_GEMINI.py
โโโ TRIGGER/
โ โโโ __init__.py
โ โโโ MANUAL.py
โ โโโ WEBHOOK.py
โ โโโ CRON.py
โ โโโ GITHUB.py
โ โโโ SLACK.py
โ โโโ EMAIL.py
โโโ EXTERNAL_ACTION/
โ โโโ __init__.py
โ โโโ SLACK.py
โ โโโ GITHUB.py
โ โโโ NOTION.py
โ โโโ GOOGLE_CALENDAR.py
โ โโโ FIRECRAWL.py
โ โโโ DISCORD_ACTION.py
โ โโโ TELEGRAM_ACTION.py
โโโ ACTION/
โ โโโ __init__.py
โ โโโ HTTP_REQUEST.py
โ โโโ DATA_TRANSFORMATION.py
โโโ FLOW/
โ โโโ __init__.py
โ โโโ IF.py
โ โโโ LOOP.py
โ โโโ MERGE.py
โ โโโ FILTER.py
โ โโโ SORT.py
โ โโโ WAIT.py
โ โโโ DELAY.py
โโโ TOOL/
โ โโโ __init__.py
โ โโโ SLACK_MCP_TOOL.py
โ โโโ NOTION_MCP_TOOL.py
โ โโโ GOOGLE_CALENDAR_MCP_TOOL.py
โ โโโ FIRECRAWL_MCP_TOOL.py
โ โโโ DISCORD_MCP_TOOL.py
โโโ MEMORY/
โ โโโ __init__.py
โ โโโ CONVERSATION_BUFFER.py
โ โโโ KEY_VALUE_STORE.py
โ โโโ VECTOR_DATABASE.py
โ โโโ DOCUMENT_STORE.py
โ โโโ ENTITY_MEMORY.py
โ โโโ EPISODIC_MEMORY.py
โ โโโ KNOWLEDGE_BASE.py
โ โโโ GRAPH_MEMORY.py
โโโ HUMAN_IN_THE_LOOP/
โโโ __init__.py
โโโ SLACK_INTERACTION.py
โโโ GMAIL_INTERACTION.py
โโโ OUTLOOK_INTERACTION.py
โโโ DISCORD_INTERACTION.py
โโโ TELEGRAM_INTERACTION.py
โโโ MANUAL_REVIEW.py
Core Data Structuresโ
Base Classesโ
ParameterType Enumโ
class ParameterType(Enum):
"""Supported parameter types for node configuration."""
STRING = "string"
INTEGER = "integer"
FLOAT = "float"
BOOLEAN = "boolean"
ENUM = "enum"
JSON = "json"
FILE = "file"
URL = "url"
EMAIL = "email"
CRON_EXPRESSION = "cron"
ParameterDef Dataclassโ
@dataclass
class ParameterDef:
"""Definition of a node parameter."""
name: str
type: ParameterType
required: bool = False
default_value: Optional[str] = None
enum_values: Optional[List[str]] = None
description: str = ""
validation_pattern: Optional[str] = None
DataFormat Dataclassโ
@dataclass
class DataFormat:
"""Data format specification for ports."""
mime_type: str = "application/json"
schema: Optional[str] = None # JSON Schema
examples: Optional[List[str]] = None
NodeSpec Dataclassโ
@dataclass
class NodeSpec:
"""Complete specification for a node type (legacy format)."""
node_type: str
subtype: str
version: str = "1.0.0"
description: str = ""
parameters: List[ParameterDef] = field(default_factory=list)
examples: Optional[List[Dict[str, Any]]] = None
display_name: Optional[str] = None
category: Optional[str] = None
template_id: Optional[str] = None
is_system_template: bool = True
manual_invocation: Optional[ManualInvocationSpec] = None
BaseNodeSpec (Pydantic Model)โ
class BaseNodeSpec(BaseModel):
"""Base class for all node specifications following the new workflow spec.
This is the primary specification format used throughout the system.
"""
# Core node identification
type: NodeType = Field(..., description="่็นๅคง็ฑป")
subtype: str = Field(..., description="่็น็ปๅ็ง็ฑป")
# Node metadata
name: str = Field(..., description="่็นๅ็งฐ๏ผไธๅฏๅ
ๅซ็ฉบๆ ผ")
description: str = Field(..., description="่็น็ไธๅฅ่ฏ็ฎไป")
# Configuration and parameters
configurations: Dict[str, Any] = Field(
default_factory=dict,
description="่็น้
็ฝฎๅๆฐ"
)
# Schema-style parameter definitions (preferred)
input_params: Dict[str, Any] = Field(
default_factory=dict,
description="่พๅ
ฅๅๆฐๅฎไน๏ผๅ
ๅซtype/default/description/required็ญ๏ผ"
)
output_params: Dict[str, Any] = Field(
default_factory=dict,
description="่พๅบๅๆฐๅฎไน๏ผๅ
ๅซtype/default/description/required็ญ๏ผ"
)
# Legacy runtime default params (backward compatibility)
default_input_params: Dict[str, Any] = Field(
default_factory=dict,
description="้ป่ฎค่ฟ่กๆถ่พๅ
ฅๅๆฐ๏ผๅ
ผๅฎนๆง็๏ผ"
)
default_output_params: Dict[str, Any] = Field(
default_factory=dict,
description="้ป่ฎค่ฟ่กๆถ่พๅบๅๆฐ๏ผๅ
ผๅฎนๆง็๏ผ"
)
# Attached nodes (ๅช้็จไบAI_AGENT Node)
attached_nodes: Optional[List[str]] = Field(
default=None,
description="้ๅ ่็นIDๅ่กจ๏ผๅช้็จไบAI_AGENT่็น่ฐ็จTOOLๅMEMORY่็น"
)
# Optional metadata
version: str = Field(default="1.0", description="่็น่ง่็ๆฌ")
tags: List[str] = Field(default_factory=list, description="่็นๆ ็ญพ")
examples: Optional[List[Dict[str, Any]]] = Field(
default=None,
description="ไฝฟ็จ็คบไพ"
)
# AI guidance for upstream nodes
system_prompt_appendix: Optional[str] = Field(
default=None,
description="AI-readable guidance for using this node"
)
Connection Typesโ
class ConnectionType:
"""Standard connection types used in the workflow system."""
MAIN = "MAIN"
AI_TOOL = "AI_TOOL"
AI_MEMORY = "AI_MEMORY"
MEMORY = "MEMORY"
AI_LANGUAGE_MODEL = "AI_LANGUAGE_MODEL"
ERROR = "ERROR"
WEBHOOK = "WEBHOOK"
HUMAN_INPUT = "HUMAN_INPUT"
TRIGGER = "TRIGGER"
SCHEDULE = "SCHEDULE"
EMAIL = "EMAIL"
SLACK = "SLACK"
DATABASE = "DATABASE"
FILE = "FILE"
HTTP = "HTTP"
MCP_TOOLS = "MCP_TOOLS"
Output-Key Based Routingโ
The system uses simplified output-key based routing instead of complex port specifications:
# Connection structure (from workflow_new.py)
{
"id": "conn_id",
"from_node": "source_node_id",
"to_node": "target_node_id",
"output_key": "result", # Default output key
"conversion_function": "optional_transform_code"
}
# Special output keys for conditional nodes:
# - IF node: "true", "false"
# - SWITCH node: case values as keys
# - Default: "result"
Conversion Functionsโ
Nodes can define conversion functions for data transformation:
def validate_conversion_function(func_string: str) -> bool:
"""Validate conversion function format."""
# Required format:
# 'def convert(input_data: Dict[str, Any]) -> Dict[str, Any]: return transformed_data'
pass
def execute_conversion_function(func_string: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute conversion function safely in restricted namespace."""
pass
# Example conversion functions
CONVERSION_FUNCTION_EXAMPLES = {
"passthrough": """def convert(input_data: Dict[str, Any]) -> Dict[str, Any]: return input_data""",
"add_slack_formatting": """def convert(input_data: Dict[str, Any]) -> Dict[str, Any]: return {"text": f"๐ญ {input_data.get('output', '')} ๐ญ", "channel": "#general"}""",
"extract_ai_response": """def convert(input_data: Dict[str, Any]) -> Dict[str, Any]: return {"message": input_data.get("output", ""), "timestamp": str(input_data.get("timestamp", ""))}"""
}
Node Type Coverageโ
Complete Node Type Registryโ
| Node Type | Subtypes Implemented | Status | Description |
|---|---|---|---|
| TRIGGER | MANUAL, WEBHOOK, CRON, EMAIL, GITHUB, SLACK | โ Complete (6/6) | Event-based workflow triggers |
| AI_AGENT | OPENAI_CHATGPT, ANTHROPIC_CLAUDE, GOOGLE_GEMINI | โ Complete (3/3) | Provider-based AI nodes with prompt-driven behavior |
| EXTERNAL_ACTION | SLACK, GITHUB, NOTION, GOOGLE_CALENDAR, FIRECRAWL, DISCORD_ACTION, TELEGRAM_ACTION | โ Complete (7/7) | Third-party service integrations |
| ACTION | HTTP_REQUEST, DATA_TRANSFORMATION | ๐ก Partial (2/10) | Core system actions |
| FLOW | IF, LOOP, MERGE, FILTER, SORT, WAIT, DELAY | โ Complete (7/7) | Flow control and logic nodes |
| TOOL | SLACK_MCP_TOOL, NOTION_MCP_TOOL, GOOGLE_CALENDAR_MCP_TOOL, FIRECRAWL_MCP_TOOL, DISCORD_MCP_TOOL | โ Complete (5/5) | MCP-based tools attached to AI_AGENT |
| MEMORY | CONVERSATION_BUFFER, KEY_VALUE_STORE, VECTOR_DATABASE, DOCUMENT_STORE, ENTITY_MEMORY, EPISODIC_MEMORY, KNOWLEDGE_BASE, GRAPH_MEMORY | โ Complete (8/8) | Memory stores attached to AI_AGENT |
| HUMAN_IN_THE_LOOP | SLACK_INTERACTION, GMAIL_INTERACTION, OUTLOOK_INTERACTION, DISCORD_INTERACTION, TELEGRAM_INTERACTION, MANUAL_REVIEW | โ Complete (6/6) | Human interaction points with built-in AI analysis |
Total Specifications: 50+ node specifications implemented
Node Specification Examplesโ
1. AI Agent Node (OPENAI_CHATGPT)โ
class OpenAIChatGPTSpec(BaseNodeSpec):
"""OpenAI ChatGPT AI agent specification aligned with OpenAI API."""
def __init__(self):
super().__init__(
type=NodeType.AI_AGENT,
subtype=AIAgentSubtype.OPENAI_CHATGPT,
name="OpenAI_ChatGPT",
description="OpenAI ChatGPT AI agent with customizable behavior via system prompt.",
# Configuration parameters
configurations={
"model": {
"type": "string",
"default": OpenAIModel.GPT_5_NANO.value,
"description": "OpenAI model version",
"required": True,
"options": [model.value for model in OpenAIModel],
},
"system_prompt": {
"type": "string",
"default": "You are a helpful AI assistant.",
"description": "System prompt defining AI behavior and role",
"required": True,
"multiline": True,
},
"temperature": {
"type": "float",
"default": 0.7,
"min": 0.0,
"max": 2.0,
"description": "Controls randomness of outputs",
"required": False,
},
"max_tokens": {
"type": "integer",
"default": 8192,
"description": "Maximum number of tokens in response",
"required": False,
},
**COMMON_CONFIGS,
},
# Parameter schemas
input_params={
"user_prompt": {
"type": "string",
"default": "",
"description": "Primary user message or prompt input",
"required": True,
}
},
output_params={
"content": {
"type": "object",
"default": "",
"description": "The model response content",
"required": True,
},
"metadata": {
"type": "object",
"default": {},
"description": "Additional metadata returned with the response",
"required": False,
},
"token_usage": {
"type": "object",
"default": {},
"description": "Token usage statistics",
"required": False,
},
},
tags=["ai", "openai", "chatgpt", "language-model"],
examples=[...],
)
Key Features:
- Provider-specific configuration (OpenAI models and parameters)
- System prompt-driven behavior (unlimited functionality through prompts)
- Support for attached TOOL and MEMORY nodes
- Token usage tracking and metadata
2. Trigger Node (MANUAL)โ
class ManualTriggerSpec(BaseNodeSpec):
"""Manual trigger specification following the new workflow architecture."""
def __init__(self):
super().__init__(
type=NodeType.TRIGGER,
subtype=TriggerSubtype.MANUAL,
name="Manual_Trigger",
description="Manual trigger activated by user action",
configurations={
"trigger_name": {
"type": "string",
"default": "Manual Trigger",
"description": "ๆพ็คบๅ็งฐ",
"required": False,
},
**COMMON_CONFIGS,
},
input_params={}, # Triggers have no runtime inputs
output_params={
"trigger_time": {
"type": "string",
"default": "",
"description": "ISO-8601 time when user triggered execution",
"required": False,
},
"execution_id": {
"type": "string",
"default": "",
"description": "Execution identifier for correlation",
"required": False,
},
"user_id": {
"type": "string",
"default": "",
"description": "ID of the user who triggered",
"required": False,
},
},
tags=["trigger", "manual", "user-initiated"],
examples=[...],
)
Key Features:
- No input parameters (triggers are workflow entry points)
- Output parameters provide execution context
- Simple configuration for display purposes
3. Flow Control Node (IF)โ
class IfFlowSpec(BaseNodeSpec):
"""IF flow control specification for conditional workflow branching."""
def __init__(self):
super().__init__(
type=NodeType.FLOW,
subtype=FlowSubtype.IF,
name="If_Condition",
description="Conditional flow control with multiple branching paths",
configurations={
"condition_expression": {
"type": "string",
"default": "",
"description": "ๆกไปถ่กจ่พพๅผ (ไป
ๆฏๆ่กจ่พพๅผๅฝขๅผ็JavaScript่ฏญๆณ)",
"required": True,
"multiline": True,
},
**COMMON_CONFIGS,
},
input_params={
"data": {
"type": "object",
"default": {},
"description": "Input data for condition evaluation",
"required": True,
},
},
output_params={
"data": {
"type": "object",
"default": {},
"description": "Input data for condition evaluation",
"required": True,
},
"condition_result": {
"type": "boolean",
"default": False,
"description": "Final boolean evaluation of the condition",
"required": False,
},
},
tags=["flow", "conditional", "branching", "logic"],
examples=[...],
)
Key Features:
- Expression-based condition evaluation (JavaScript syntax)
- Multiple output keys: "true", "false" for branching
- Pass-through of input data to both branches
4. Memory Node (CONVERSATION_BUFFER)โ
class ConversationMemorySpec(BaseNodeSpec):
"""Conversation buffer with simple, built-in summarization policy."""
def __init__(self, *, subtype: MemorySubtype, name: Optional[str] = None):
super().__init__(
type=NodeType.MEMORY,
subtype=subtype,
name=name or "Conversation_Buffer_Memory",
description="Conversation buffer with auto-summary when nearly full",
configurations={
"max_messages": {
"type": "integer",
"default": 50,
"min": 1,
"max": 1000,
"description": "ๆๅคงๆถๆฏๅญๅจๆฐ้",
"required": False,
},
"auto_summarize": {
"type": "boolean",
"default": True,
"description": "ๆฏๅฆๅจๆฅ่ฟๅฎน้ๆถ่ชๅจๆป็ปๆงๆถๆฏ",
"required": False,
},
**COMMON_CONFIGS,
},
input_params={
"message": {
"type": "string",
"default": "",
"description": "Single message to add to the buffer",
"required": False,
},
"role": {
"type": "string",
"default": "user",
"description": "Role of the message author",
"required": False,
"options": ["user", "assistant", "system"],
},
},
output_params={
"messages": {
"type": "array",
"default": [],
"description": "Messages currently in buffer",
"required": False,
},
"summary": {
"type": "string",
"default": "",
"description": "Generated conversation summary",
"required": False,
},
},
attached_nodes=None, # Memory nodes don't have attached_nodes
examples=[...],
)
Key Features:
- Attached to AI_AGENT nodes (not connected via ports)
- Auto-summarization when buffer approaches capacity
- Role-based message organization (user, assistant, system)
5. Tool Node (SLACK_MCP_TOOL)โ
class SlackMCPToolSpec(BaseNodeSpec):
"""Slack MCP Tool specification for AI_AGENT attached functionality."""
def __init__(self):
super().__init__(
type=NodeType.TOOL,
subtype=ToolSubtype.SLACK_MCP_TOOL,
name="Slack_MCP_Tool",
description="Slack MCP tool for messaging through MCP protocol",
configurations={
"mcp_server_url": {
"type": "string",
"default": "http://localhost:8000/api/v1/mcp",
"description": "MCPๆๅกๅจURL",
"required": True,
},
"access_token": {
"type": "string",
"default": "{{$placeholder}}",
"description": "Slack OAuth access token",
"required": True,
"sensitive": True,
},
"available_tools": {
"type": "array",
"default": ["slack_send_message", "slack_list_channels"],
"description": "ๅฏ็จ็Slackๅทฅๅ
ทๅ่กจ",
"required": False,
"options": [
"slack_send_message",
"slack_list_channels",
"slack_get_user_info",
"slack_create_channel",
],
},
**COMMON_CONFIGS,
},
input_params={
"tool_name": {
"type": "string",
"default": "",
"description": "MCP tool function name to invoke",
"required": True,
},
"function_args": {
"type": "object",
"default": {},
"description": "Arguments for the selected tool function",
"required": False,
},
},
output_params={
"result": {
"type": "object",
"default": {},
"description": "Result payload returned by the MCP tool",
"required": False,
},
"success": {
"type": "boolean",
"default": False,
"description": "Whether the MCP tool invocation succeeded",
"required": False,
},
},
attached_nodes=None, # Tools don't have attached_nodes
tags=["tool", "mcp", "slack", "attached"],
examples=[...],
)
Key Features:
- MCP (Model Context Protocol) integration
- Attached to AI_AGENT nodes for function calling
- Dynamic tool selection from available_tools list
- OAuth-based authentication
6. External Action Node (SLACK)โ
class SlackExternalActionSpec(BaseNodeSpec):
"""Slack external action specification."""
def __init__(self):
super().__init__(
type=NodeType.EXTERNAL_ACTION,
subtype=ExternalActionSubtype.SLACK,
name="Slack_Action",
description="Send messages and interact with Slack workspace",
configurations={
"action_type": {
"type": "string",
"default": "send_message",
"description": "Slackๆไฝ็ฑปๅ",
"required": True,
"options": [
"send_message", "send_file", "create_channel",
"invite_users", "get_user_info", "update_message",
],
},
"channel": {
"type": "string",
"default": "{{$placeholder}}",
"description": "็ฎๆ ้ข้๏ผ#channel ๆ @user ๆ channel_id๏ผ",
"required": True,
"api_endpoint": "/api/proxy/v1/app/integrations/slack/channels",
},
"bot_token": {
"type": "string",
"default": "{{$placeholder}}",
"description": "Slack Bot Token (xoxb-...)",
"required": True,
"sensitive": True,
},
**COMMON_CONFIGS,
},
input_params={
"message": {
"type": "string",
"default": "",
"description": "Message text to send",
"required": False,
"multiline": True,
},
"blocks": {
"type": "array",
"default": [],
"description": "Slack block kit elements for rich messages",
"required": False,
},
},
output_params={
"success": {
"type": "boolean",
"default": False,
"description": "Whether Slack API operation succeeded",
"required": False,
},
"message_ts": {
"type": "string",
"default": "",
"description": "Slack message timestamp",
"required": False,
},
"channel_id": {
"type": "string",
"default": "",
"description": "Channel ID where the message was sent",
"required": False,
},
},
tags=["slack", "messaging", "external", "oauth"],
examples=[...],
# System prompt guidance for AI nodes
system_prompt_appendix="""Output `action_type` to dynamically control Slack operations...""",
)
Key Features:
- Multiple action types (send_message, create_channel, etc.)
- OAuth integration support
- Block Kit support for rich formatting
- Dynamic API endpoint for channel discovery
- System prompt appendix for AI guidance
7. Human-in-the-Loop Node (SLACK_INTERACTION)โ
class SlackInteractionSpec(BaseNodeSpec):
"""Slack interaction HIL specification with built-in AI response analysis."""
def __init__(self):
super().__init__(
type=NodeType.HUMAN_IN_THE_LOOP,
subtype=HumanLoopSubtype.SLACK_INTERACTION,
name="Slack_Interaction",
description="Human-in-the-loop Slack interaction with built-in AI response analysis",
configurations={
"channel": {
"type": "string",
"default": "{{$placeholder}}",
"description": "็ฎๆ Slack้ข้ๆ็จๆท",
"required": True,
"api_endpoint": "/api/proxy/v1/app/integrations/slack/channels",
},
"clarification_question_template": {
"type": "string",
"default": "Please review: {{content}}\\n\\nRespond with 'yes' to approve or 'no' to reject.",
"description": "ๅ้็ป็จๆท็ๆถๆฏๆจกๆฟ",
"required": True,
"multiline": True,
},
"timeout_minutes": {
"type": "integer",
"default": 60,
"min": 1,
"max": 1440,
"description": "็ญๅพ
ๅๅบ็่ถ
ๆถๆถ้ด๏ผๅ้๏ผ",
"required": False,
},
"ai_analysis_model": {
"type": "string",
"default": OpenAIModel.GPT_5_MINI.value,
"description": "็จไบๅๅบๅๆ็AIๆจกๅ",
"required": False,
},
**COMMON_CONFIGS,
},
input_params={
"content": {
"type": "object",
"default": "",
"description": "The content that need to be reviewed",
"required": False,
"multiline": True,
},
},
output_params={
"content": {
"type": "object",
"default": {},
"description": "Pass-through content from input_params",
"required": False,
},
"ai_classification": {
"type": "string",
"default": "",
"description": "AI classification of the response",
"required": False,
"options": ["confirmed", "rejected", "unrelated", "timeout"],
},
"user_response": {
"type": "string",
"default": "",
"description": "The actual text response from the human",
"required": False,
},
},
examples=[...],
system_prompt_appendix="""This HUMAN_IN_THE_LOOP:SLACK_INTERACTION node handles BOTH sending messages to Slack AND waiting for user responses.""",
)
Key Features:
- Built-in AI response analysis: Automatically classifies user responses as confirmed/rejected/unrelated
- Multiple output keys: Routes workflow based on AI classification
- Template-based messaging: Supports variable substitution
- Timeout handling: Configurable timeout with fallback behavior
- No additional nodes needed: Eliminates need for separate IF or AI_AGENT nodes for response analysis
8. Action Node (HTTP_REQUEST)โ
class HTTPRequestActionSpec(BaseNodeSpec):
"""HTTP Request action specification for making external API calls."""
def __init__(self):
super().__init__(
type=NodeType.ACTION,
subtype=ActionSubtype.HTTP_REQUEST,
name="HTTP_Request",
description="Make HTTP requests to external APIs",
configurations={
"method": {
"type": "string",
"default": "GET",
"description": "HTTPๆนๆณ",
"required": True,
"options": ["GET", "POST", "PUT", "DELETE", "PATCH"],
},
"url": {
"type": "string",
"default": "",
"description": "่ฏทๆฑURL",
"required": True,
},
"headers": {
"type": "object",
"default": {},
"description": "่ฏทๆฑๅคด",
"required": False,
},
"timeout": {
"type": "integer",
"default": 30,
"min": 1,
"max": 300,
"description": "่ฏทๆฑ่ถ
ๆถๆถ้ด๏ผ็ง๏ผ",
"required": False,
},
**COMMON_CONFIGS,
},
input_params={
"body": {
"type": "object",
"default": {},
"description": "Request body (for POST/PUT/PATCH)",
"required": False,
},
"query_params": {
"type": "object",
"default": {},
"description": "URL query parameters",
"required": False,
},
},
output_params={
"status_code": {
"type": "integer",
"default": 0,
"description": "HTTP response status code",
"required": False,
},
"body": {
"type": "object",
"default": {},
"description": "Response body (parsed JSON or text)",
"required": False,
},
"headers": {
"type": "object",
"default": {},
"description": "Response headers",
"required": False,
},
},
tags=["http", "api", "external", "action"],
examples=[...],
)
Registry Systemโ
Global Registryโ
# In shared/node_specs/__init__.py
NODE_SPECS_REGISTRY = {
# TRIGGER specifications
"TRIGGER.MANUAL": MANUAL_TRIGGER_SPEC,
"TRIGGER.WEBHOOK": WEBHOOK_TRIGGER_SPEC,
"TRIGGER.CRON": CRON_TRIGGER_SPEC,
"TRIGGER.GITHUB": GITHUB_TRIGGER_SPEC,
"TRIGGER.SLACK": SLACK_TRIGGER_SPEC,
"TRIGGER.EMAIL": EMAIL_TRIGGER_SPEC,
# AI_AGENT specifications
"AI_AGENT.OPENAI_CHATGPT": OPENAI_CHATGPT_SPEC,
"AI_AGENT.ANTHROPIC_CLAUDE": ANTHROPIC_CLAUDE_SPEC,
"AI_AGENT.GOOGLE_GEMINI": GOOGLE_GEMINI_SPEC,
# EXTERNAL_ACTION specifications
"EXTERNAL_ACTION.SLACK": SLACK_EXTERNAL_ACTION_SPEC,
"EXTERNAL_ACTION.GITHUB": GITHUB_EXTERNAL_ACTION_SPEC,
"EXTERNAL_ACTION.NOTION": NOTION_EXTERNAL_ACTION_SPEC,
# ... additional external actions
# ACTION specifications
"ACTION.HTTP_REQUEST": HTTP_REQUEST_ACTION_SPEC,
"ACTION.DATA_TRANSFORMATION": DATA_TRANSFORMATION_ACTION_SPEC,
# FLOW specifications
"FLOW.IF": IF_FLOW_SPEC,
"FLOW.LOOP": LOOP_FLOW_SPEC,
"FLOW.MERGE": MERGE_FLOW_SPEC,
# ... additional flow controls
# TOOL specifications
"TOOL.SLACK_MCP_TOOL": SLACK_MCP_TOOL_SPEC,
"TOOL.NOTION_MCP_TOOL": NOTION_MCP_TOOL_SPEC,
# ... additional tools
# MEMORY specifications
"MEMORY.CONVERSATION_BUFFER": CONVERSATION_BUFFER_MEMORY_SPEC,
"MEMORY.KEY_VALUE_STORE": KEY_VALUE_STORE_MEMORY_SPEC,
"MEMORY.VECTOR_DATABASE": VECTOR_DATABASE_MEMORY_SPEC,
# ... additional memory types
# HUMAN_IN_THE_LOOP specifications
"HUMAN_IN_THE_LOOP.SLACK_INTERACTION": SLACK_INTERACTION_SPEC,
"HUMAN_IN_THE_LOOP.GMAIL_INTERACTION": GMAIL_INTERACTION_HIL_SPEC,
# ... additional HIL types
}
Registry Access Functionsโ
def get_node_spec(node_type: str, node_subtype: str):
"""Get a node specification by type and subtype."""
key = f"{node_type}.{node_subtype}"
return NODE_SPECS_REGISTRY.get(key)
def list_available_specs():
"""List all available node specifications."""
return list(NODE_SPECS_REGISTRY.keys())
class NodeSpecRegistryWrapper:
"""Wrapper class for backward compatibility."""
def __init__(self, registry_dict):
self._registry = registry_dict
def get_node_types(self):
"""Get all node types and their subtypes."""
types_dict = {}
for key, spec in self._registry.items():
node_type, subtype = key.split(".", 1)
if node_type not in types_dict:
types_dict[node_type] = []
types_dict[node_type].append(subtype)
return types_dict
def get_spec(self, node_type: str, subtype: str):
"""Get a node specification by type and subtype."""
key = f"{node_type}.{subtype}"
return self._registry.get(key)
def list_all_specs(self):
"""List all available node specifications."""
return list(self._registry.values())
# Singleton instance for backward compatibility
_wrapped_registry = NodeSpecRegistryWrapper(NODE_SPECS_REGISTRY)
node_spec_registry = _wrapped_registry
Validation and Type Conversionโ
Configuration Validationโ
class BaseNodeSpec(BaseModel):
"""Base specification with built-in validation."""
def validate_configuration(self, config: Dict[str, Any]) -> bool:
"""Validate a configuration against this specification."""
required_keys = set()
for key, value in self.configurations.items():
if isinstance(value, dict) and value.get("required", False):
required_keys.add(key)
return all(key in config for key in required_keys)
Node Instance Creationโ
class BaseNodeSpec(BaseModel):
"""Base specification with instance creation."""
def create_node_instance(
self,
node_id: str,
position: Optional[Dict[str, float]] = None,
attached_nodes: Optional[List[str]] = None,
) -> Node:
"""Create a Node instance based on this specification."""
# For AI_AGENT nodes, use attached_nodes if provided
final_attached_nodes = attached_nodes if attached_nodes is not None else self.attached_nodes
# Derive runtime params from schema definitions
def _derive_defaults_from_schema(schema: Dict[str, Any]) -> Dict[str, Any]:
return {
key: (spec.get("default") if isinstance(spec, dict) else None)
for key, spec in (schema or {}).items()
}
runtime_input_defaults = (
self.default_input_params.copy()
if self.default_input_params
else _derive_defaults_from_schema(self.input_params)
)
runtime_output_defaults = (
self.default_output_params.copy()
if self.default_output_params
else _derive_defaults_from_schema(self.output_params)
)
runtime_configurations = _derive_defaults_from_schema(self.configurations)
node_data = {
"id": node_id,
"name": self.name,
"description": self.description,
"type": self.type,
"subtype": self.subtype,
"configurations": runtime_configurations,
"input_params": runtime_input_defaults,
"output_params": runtime_output_defaults,
"position": position,
}
# Only add attached_nodes if it's not None (AI_AGENT specific)
if final_attached_nodes is not None:
node_data["attached_nodes"] = final_attached_nodes
return Node(**node_data)
Attached Nodes Pattern (AI_AGENT)โ
AI_AGENT nodes support attached TOOL and MEMORY nodes for enhanced capabilities:
Execution Modelโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AI_AGENT Node Execution โ
โ โ
โ 1. Pre-execution: โ
โ - Load memory context from MEMORY nodes โ
โ - Discover tools from TOOL nodes (MCP) โ
โ - Enhance AI prompt with context and tools โ
โ โ
โ 2. AI Execution: โ
โ - Generate response with augmented capabilities โ
โ - AI can invoke registered tools internally โ
โ โ
โ 3. Post-execution: โ
โ - Store conversation to MEMORY nodes โ
โ - Persist tool invocation results โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Characteristicsโ
- Not in workflow sequence: Attached nodes don't appear in the main workflow execution path
- No separate NodeExecution: Attached node execution is tracked within AI_AGENT's NodeExecution
- Managed in context: All operations happen within the AI_AGENT node's execution context
- Results in metadata: Stored in
attached_executionsfield of NodeExecution
Integration Pointsโ
Workflow Engine Integrationโ
# In BaseNodeExecutor
class BaseNodeExecutor(ABC):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.spec = self._get_node_spec()
def _get_node_spec(self) -> Optional[BaseNodeSpec]:
"""Get the node specification for this executor."""
return get_node_spec(self.node_type, self.node_subtype)
def validate(self, node: Node) -> List[str]:
"""Validate node configuration against specification."""
if self.spec:
if not self.spec.validate_configuration(node.configurations):
return ["Invalid configuration"]
return []
API Gateway Integrationโ
@router.get("/node-types")
async def get_node_types():
"""Get all node types and their subtypes."""
result = {}
for spec in node_spec_registry.list_all_specs():
if spec.type not in result:
result[spec.type] = []
result[spec.type].append({
"subtype": spec.subtype,
"description": spec.description
})
return result
@router.get("/node-types/{node_type}/{subtype}/spec")
async def get_node_spec_detail(node_type: str, subtype: str):
"""Get detailed specification for a specific node type."""
spec = get_node_spec(node_type, subtype)
if not spec:
raise HTTPException(404, "Node specification not found")
return {
"type": spec.type,
"subtype": spec.subtype,
"description": spec.description,
"configurations": spec.configurations,
"input_params": spec.input_params,
"output_params": spec.output_params,
"examples": spec.examples,
}
Frontend Integrationโ
// Frontend can fetch structured node specifications
interface NodeSpec {
type: string;
subtype: string;
description: string;
configurations: Record<string, ConfigSchema>;
input_params: Record<string, ParamSchema>;
output_params: Record<string, ParamSchema>;
}
// Auto-generate configuration forms based on spec
function generateNodeConfigForm(spec: NodeSpec) {
return Object.entries(spec.configurations).map(([key, schema]) => {
switch (schema.type) {
case "enum":
return <Select options={schema.options} required={schema.required} />;
case "boolean":
return <Checkbox defaultValue={schema.default} />;
case "integer":
return <NumberInput min={schema.min} max={schema.max} />;
// ... other types
}
});
}
Non-Functional Requirementsโ
Performanceโ
- Specification Loading: All specifications loaded at startup (<100ms)
- Registry Lookup: O(1) dictionary access (<1ms)
- Validation: Schema validation completes in <10ms per node
- Memory Footprint: ~5MB for all 50+ specifications
Scalabilityโ
- Extensibility: New node types added by creating new specification files
- Backward Compatibility: Legacy
NodeSpecdataclass still supported - Version Management: Each specification has independent versioning
Securityโ
- Sensitive Fields: Configurations marked with
"sensitive": Truefor proper handling - Conversion Function Safety: Restricted namespace for conversion function execution
- Validation: Comprehensive schema validation prevents malformed configurations
Reliabilityโ
- Type Safety: Pydantic models provide runtime type validation
- Error Handling: Clear error messages for invalid configurations
- Default Values: All parameters have sensible defaults
Testing & Observabilityโ
Testing Strategyโ
Unit Tests:
def test_node_spec_validation():
"""Test configuration validation."""
spec = get_node_spec("AI_AGENT", "OPENAI_CHATGPT")
# Valid configuration
valid_config = {
"model": "gpt-5-nano",
"system_prompt": "You are a helpful assistant",
"temperature": 0.7,
}
assert spec.validate_configuration(valid_config) is True
# Missing required field
invalid_config = {
"temperature": 0.7,
}
assert spec.validate_configuration(invalid_config) is False
def test_node_instance_creation():
"""Test node instance creation from spec."""
spec = get_node_spec("TRIGGER", "MANUAL")
node = spec.create_node_instance(
node_id="trigger_1",
position={"x": 100, "y": 200}
)
assert node.id == "trigger_1"
assert node.type == NodeType.TRIGGER
assert node.subtype == TriggerSubtype.MANUAL
assert "trigger_time" in node.output_params
Integration Tests:
def test_end_to_end_workflow_with_specs():
"""Test complete workflow execution using specs."""
# Create workflow using specifications
trigger_spec = get_node_spec("TRIGGER", "MANUAL")
ai_spec = get_node_spec("AI_AGENT", "OPENAI_CHATGPT")
trigger_node = trigger_spec.create_node_instance("trigger_1")
ai_node = ai_spec.create_node_instance("ai_1")
# Execute workflow
result = execute_workflow(
nodes=[trigger_node, ai_node],
connections=[{"from_node": "trigger_1", "to_node": "ai_1"}]
)
assert result.success is True
Monitoring & Observabilityโ
Key Metrics:
- Specification access frequency by node type
- Validation failure rates
- Node instance creation latency
- Configuration schema compliance
Logging:
logger.info(f"Loading node specification: {node_type}.{subtype}")
logger.warning(f"Validation failed for node {node_id}: {errors}")
logger.error(f"Failed to create node instance: {exception}")
Technical Debt and Future Considerationsโ
Known Limitationsโ
- Port System Removed: Simplified to output-key based routing (trade-off for simplicity)
- Legacy NodeSpec Support: Both
NodeSpecdataclass andBaseNodeSpecPydantic model exist - Incomplete ACTION Coverage: Only 2 of 10 planned ACTION subtypes implemented
- Conversion Function Security: Limited namespace may not cover all use cases
Areas for Improvementโ
- Unified Specification Format: Migrate all legacy
NodeSpecusages toBaseNodeSpec - Complete ACTION Node Coverage: Implement remaining ACTION subtypes
- Enhanced Validation: Add JSON Schema validation for input/output params
- Performance Optimization: Cache commonly accessed specifications
- Documentation Generation: Auto-generate API docs from specifications
Planned Enhancementsโ
- Dynamic Specification Loading: Support runtime specification updates without restart
- Specification Versioning: Support multiple versions of same node type
- Advanced Validation: Cross-field validation and dependency checking
- Specification Marketplace: Allow community-contributed node specifications
Migration Pathsโ
From Legacy NodeSpec to BaseNodeSpec:
# Legacy format (to be deprecated)
OLD_SPEC = NodeSpec(
node_type="AI_AGENT",
subtype="OPENAI_CHATGPT",
parameters=[ParameterDef(name="model", type=ParameterType.STRING)]
)
# New format (recommended)
NEW_SPEC = BaseNodeSpec(
type=NodeType.AI_AGENT,
subtype=AIAgentSubtype.OPENAI_CHATGPT,
configurations={"model": {"type": "string", "required": True}}
)
Appendicesโ
A. Glossaryโ
| Term | Definition |
|---|---|
| Node Specification | Complete definition of a node type including configurations, parameters, and behavior |
| BaseNodeSpec | Pydantic-based base class for all node specifications |
| Output Key | String key used for routing data between nodes (e.g., "result", "true", "false") |
| Conversion Function | Python code snippet for transforming data between connected nodes |
| Attached Nodes | TOOL and MEMORY nodes associated with AI_AGENT nodes |
| Registry | Global dictionary mapping "TYPE.SUBTYPE" to node specifications |
| Configuration | Static parameters defining node behavior (set at design time) |
| Input/Output Params | Runtime parameters for data flow (set at execution time) |
| MCP | Model Context Protocol - standard for AI tool integration |
B. Referencesโ
Internal Documentation:
/apps/backend/shared/node_specs/- Node specification implementations/apps/backend/shared/models/node_enums.py- Node type and subtype enums/docs/tech-design/new_workflow_spec.md- Workflow data model specification/apps/backend/CLAUDE.md- Backend development guide
External Resources:
- Pydantic Documentation - BaseModel validation
- JSON Schema - Schema validation standard
- Model Context Protocol - MCP specification
Document Version: 2.0 Created: 2025-01-28 Last Updated: 2025-10-11 Author: Claude Code Status: Active - Reflects Current Implementation Next Review: 2025-11-11
Version Historyโ
v2.0 (2025-10-11)โ
- โ Complete Rewrite: Updated entire document to reflect actual implementation
- โ BaseNodeSpec Documentation: Added comprehensive BaseNodeSpec (Pydantic) specification
- โ Output-Key Routing: Documented simplified connection system (replaced port-based)
- โ All 8 Node Types: Added detailed examples for all node types with actual code
- โ Registry System: Documented actual registry implementation and access patterns
- โ Attached Nodes: Comprehensive documentation of AI_AGENT attached nodes pattern
- โ 50+ Specifications: Updated coverage table with all implemented specifications
- โ Conversion Functions: Documented conversion function system for data transformation
- โ Integration Points: Added actual integration code for Workflow Engine, API Gateway, Frontend
v1.1 (2025-01-28)โ
- โ Initial Design: Original design specification (outdated)