Workflow Engine V2: Technical Design Document
1. Executive Summaryโ
The Workflow Engine V2 (workflow_engine_v2) is a modern, spec-driven execution engine designed to run complex AI-powered workflows with precision, reliability, and comprehensive observability. Built with FastAPI, it provides a robust foundation for executing node-based workflows with advanced features including Human-in-the-Loop (HIL) interactions, attached node patterns, and real-time execution tracking.
Key Architectural Decisionsโ
- Spec-Driven Validation: All nodes validated against centralized specifications in
shared/node_specs/ - Graph-Based Execution: Workflows executed using topological sort with cycle detection
- Runner Factory Pattern: Dynamic node executor dispatch based on node type/subtype
- Attached Nodes Pattern: AI_AGENT nodes support TOOL and MEMORY attachments for enhanced capabilities
- State Persistence: Complete execution state preserved in Supabase for pause/resume operations
Technology Stackโ
- Framework: FastAPI 0.104+ (HTTP/REST API)
- Database: PostgreSQL via Supabase (execution state, workflow definitions)
- ORM: Pydantic models with direct Supabase client integration
- Validation: Centralized node specifications with automatic type coercion
- AI Providers: OpenAI, Anthropic, Google Gemini with unified interface
- Deployment: Docker + AWS ECS Fargate (linux/amd64)
2. System Architectureโ
2.1 High-Level Architectureโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API Layer (FastAPI) โ
โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโค
โ Health Endpoint โ V2 Execution API โ V2 Workflow API โ
โ /health โ /v2/executions/* โ /v2/workflows/* โ
โโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Core Execution Engine โ
โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโค
โ ExecutionEngine โ WorkflowGraph โ ExecutionContext โ
โ - run() โ - topo_order() โ - node_outputs โ
โ - run_async() โ - cycle detection โ - pending_inputs โ
โ - resume_*() โ - attached nodes โ - execution state โ
โโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
โ Node Runners โ โ Services โ โ Persistence โ
โโโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโโค
โ AIAgentRunner โ โ HILService โ โ Supabase Repo โ
โ ActionRunner โ โ MemoryService โ โ - executions โ
โ FlowRunner โ โ EventPublisher โ โ - workflows โ
โ ExternalAction โ โ LoggingService โ โ - hil_interact. โ
โ HILRunner โ โ AI Providers โ โ - pauses โ
โ MemoryRunner โ โ Timer Service โ โ โ
โ ToolRunner โ โ Credential Encr. โ โ โ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโ
โ External APIs โ
โโโโโโโโโโโโโโโโโโโโค
โ OpenAI โ
โ Anthropic โ
โ Google Gemini โ
โ Slack โ
โ GitHub โ
โ Notion โ
โ Firecrawl โ
โโโโโโโโโโโโโโโโโโโโ
2.2 Component Architectureโ
Execution Engine (core/engine.py)โ
- ExecutionEngine: Main orchestrator for workflow execution
- Validates workflows against node specifications
- Builds execution graph with attached node filtering
- Manages execution lifecycle (NEW โ RUNNING โ SUCCESS/ERROR/PAUSED)
- Handles retry logic with exponential backoff
- Implements timeout enforcement per node
- Provides pause/resume for HIL interactions
- Tracks token usage and credits consumption
Runner Factory (runners/factory.py)โ
- default_runner_for(node): Dispatcher function
- Routes nodes to appropriate runner based on type/subtype
- Supports 7 core node types: TRIGGER, AI_AGENT, ACTION, EXTERNAL_ACTION, FLOW, HUMAN_IN_THE_LOOP, MEMORY, TOOL
- Enforces explicit AI provider selection (no fallback to generic AI)
- Returns PassthroughRunner for unknown types (graceful degradation)
Node Runners (runners/*.py)โ
-
Base Runners:
NodeRunner(ABC): Definesrun(node, inputs, trigger) -> outputsinterfaceTriggerRunner: Passes through trigger data to downstream nodesPassthroughRunner: Default fallback, passes inputs through unchanged
-
AI Agent Runners:
AIAgentRunner: Enhanced AI execution with memory/tool integrationAnthropicClaudeRunner: Anthropic Claude-specific implementationOpenAIChatGPTRunner: OpenAI GPT-specific implementationGoogleGeminiRunner: Google Gemini-specific implementation
-
Flow Control Runners:
IfRunner: Conditional branching based on expression evaluationMergeRunner: Combines multiple inputs into single outputSplitRunner: Splits data into multiple outputsFilterRunner: Filters data based on conditionsSortRunner: Sorts data collectionsWaitRunner: Waits for external events or timeoutsDelayRunner: Introduces delays in executionTimeoutRunner: Enforces execution time limitsLoopRunner: Repeats execution for collectionsForEachRunner: Fan-out execution for each item
-
Action Runners:
HttpRequestRunner: HTTP API callsDataTransformationRunner: Data manipulation and transformation
-
Integration Runners:
ExternalActionRunner: Routes to service-specific external actionsMemoryRunner: Manages conversation history and context storageToolRunner: MCP tool discovery and invocationHILRunner: Human-in-the-loop interaction management
3. Data Architectureโ
3.1 Data Modelsโ
Core Execution Models (from shared/models/)โ
Workflow:
class Workflow(BaseModel):
metadata: WorkflowMetadata
nodes: List[Node]
connections: List[Connection]
triggers: List[str] # Node IDs that can initiate execution
variables: Dict[str, Any]
Node:
class Node(BaseModel):
id: str
name: str
type: NodeType # TRIGGER, AI_AGENT, ACTION, EXTERNAL_ACTION, FLOW, HUMAN_IN_THE_LOOP, MEMORY, TOOL
subtype: str # Provider/action-specific subtype
configurations: Dict[str, Any]
input_params: Dict[str, Any]
output_params: Dict[str, Any]
input_ports: List[Port]
output_ports: List[Port]
attached_nodes: List[str] # For AI_AGENT: attached TOOL/MEMORY node IDs
position: Optional[Position]
Execution:
class Execution(BaseModel):
id: str
execution_id: str
workflow_id: str
workflow_version: str
status: ExecutionStatus # NEW, RUNNING, SUCCESS, ERROR, PAUSED, WAITING_FOR_HUMAN, CANCELED, TIMEOUT
start_time: int # epoch milliseconds
end_time: Optional[int]
duration_ms: Optional[int]
trigger_info: TriggerInfo
node_executions: Dict[str, NodeExecution] # keyed by node_id
node_runs: Dict[str, List[NodeExecution]] # for fan-out tracking
execution_sequence: List[str] # ordered node execution history
current_node_id: Optional[str] # for paused workflows
error: Optional[ExecutionError]
tokens_used: Optional[TokenUsage]
credits_consumed: int
run_data: Optional[Dict[str, Any]] # snapshot for API responses
NodeExecution:
class NodeExecution(BaseModel):
node_id: str
node_name: str
node_type: str
node_subtype: str
status: NodeExecutionStatus # PENDING, RUNNING, COMPLETED, FAILED, WAITING_INPUT, RETRYING
start_time: Optional[int]
end_time: Optional[int]
duration_ms: Optional[int]
input_data: Dict[str, Any]
output_data: Dict[str, Any]
error: Optional[NodeError]
execution_details: NodeExecutionDetails
activation_id: Optional[str] # for tracking fan-out executions
parent_activation_id: Optional[str]
credits_consumed: int
3.2 Data Flowโ
Standard Node Execution Flowโ
-
Input Aggregation: Engine merges outputs from predecessor nodes
- Inputs keyed by port name (default: "result")
- Multiple inputs to same port are collected in a list
- Conversion functions applied during propagation
-
Node Execution: Runner processes inputs and produces outputs
- Context object (
_ctx) provides access to execution state - Outputs structured as
{port_name: payload}dictionary - Special control keys (prefixed with
_) control engine behavior
- Context object (
-
Output Shaping: Outputs validated against node spec output_params
- Only declared output parameters included in final output
- Undeclared fields filtered out for data consistency
- Fallback to defaults for missing declared parameters
-
Output Propagation: Shaped outputs flow to successor nodes
- Connection
output_keydetermines which port to use - Conversion functions transform data during propagation
- Fan-out supported via "iteration" output key
- Connection
Attached Node Flow (AI_AGENT only)โ
AI_AGENT nodes can attach TOOL and MEMORY nodes for enhanced capabilities:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AI_AGENT Node Execution โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ 1. PRE-EXECUTION: Load Context โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Attached MEMORY Nodes โ โ
โ โ - Query conversation history โ โ
โ โ - Retrieve relevant context โ โ
โ โ - Enhance system prompt with memory โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ 2. PRE-EXECUTION: Discover Tools โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Attached TOOL Nodes โ โ
โ โ - List available MCP functions โ โ
โ โ - Register tools with AI provider โ โ
โ โ - Enable tool calling during generation โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ 3. EXECUTION: AI Generation โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ AI Provider (OpenAI/Anthropic/Gemini) โ โ
โ โ - Generate response with enhanced prompt โ โ
โ โ - Invoke tools if needed โ โ
โ โ - Return structured response โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ 4. POST-EXECUTION: Store Conversation โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Attached MEMORY Nodes โ โ
โ โ - Store user message โ โ
โ โ - Store AI response โ โ
โ โ - Update conversation context โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Points:
- Attached nodes do NOT appear in workflow execution sequence
- No separate NodeExecution records for attached nodes
- All attachment logic handled within AIAgentRunner
- Results tracked in AI node's execution_details
4. Implementation Detailsโ
4.1 Core Componentsโ
ExecutionEngine.run()โ
The main execution loop implements a sophisticated task-queue based system:
def run(self, workflow: Workflow, trigger: TriggerInfo, workflow_id: str) -> Execution:
# 1. Validation
self.validate_against_specs(workflow) # Validates nodes against spec registry
# 2. Graph Construction
graph = WorkflowGraph(workflow) # Filters out attached nodes
_ = graph.topo_order() # Raises CycleError if cycle detected
# 3. Initialize Execution State
workflow_execution = Execution(status=ExecutionStatus.RUNNING, ...)
pending_inputs = {node_id: {} for node_id in graph.nodes.keys()}
execution_context = ExecutionContext(workflow, graph, workflow_execution, pending_inputs)
# 4. Task Queue Execution
queue = [{"node_id": tid, "override": None} for tid in self._get_initial_ready_nodes(graph)]
executed_main = set()
while queue:
task = queue.pop(0)
node_id = task["node_id"]
# Skip if already executed (unless fan-out)
if task["override"] is None and node_id in executed_main:
continue
# 5. Node Execution with Retry
max_retries = node.configurations.get("retry_attempts", 0)
for attempt in range(max_retries + 1):
try:
runner = default_runner_for(node)
outputs = runner.run(node, inputs, trigger)
break
except Exception as e:
if attempt == max_retries:
# Fail workflow
workflow_execution.status = ExecutionStatus.ERROR
break
# Exponential backoff
time.sleep(backoff * (backoff_factor ** attempt))
# 6. Handle Special Outputs
if outputs.get("_hil_wait"):
# Pause workflow for Human-in-the-Loop
workflow_execution.status = ExecutionStatus.WAITING_FOR_HUMAN
return workflow_execution
if outputs.get("_wait") or outputs.get("_delay_ms"):
# Schedule timer for delayed continuation
self._timers.schedule(...)
return workflow_execution
# 7. Fail-Fast on Node Failure
if any(port.get("success") is False for port in outputs.values()):
workflow_execution.status = ExecutionStatus.ERROR
break
# 8. Output Shaping and Propagation
shaped_outputs = {port: _shape_payload(payload) for port, payload in outputs.items()}
for successor, output_key, conversion_fn in graph.successors(node_id):
value = shaped_outputs.get(output_key)
# Apply conversion function if specified
if conversion_fn:
value = execute_conversion_function_flexible(conversion_fn, value)
# Handle fan-out for "iteration" port
if output_key == "iteration" and isinstance(value, list):
for item in value:
queue.append({"node_id": successor, "override": {"result": item}})
else:
pending_inputs[successor]["result"] = value
if self._is_node_ready(graph, successor, pending_inputs):
queue.append({"node_id": successor, "override": None})
executed_main.add(node_id)
# 9. Finalization
workflow_execution.status = ExecutionStatus.SUCCESS
workflow_execution.end_time = _now_ms()
return workflow_execution
4.2 Technical Decisionsโ
Node Specification Systemโ
Centralized Validation: All node types defined in shared/node_specs/
- Type-safe configuration schemas with default values
- Automatic type coercion for inputs/outputs
- Runtime validation before execution
- Supports optional parameters with fallback defaults
Example Spec:
class SlackExternalActionSpec(NodeSpecificationBase):
node_type: str = "EXTERNAL_ACTION"
subtype: str = "SLACK"
configurations: Dict[str, Any] = {
"action": {"type": "string", "options": ["send_message", "create_channel"]},
"channel": {"type": "string", "required": True},
"message": {"type": "string", "default": ""},
}
input_params: Dict[str, Any] = {
"channel": None,
"message": None,
}
output_params: Dict[str, Any] = {
"success": False,
"message_ts": None,
"channel_id": None,
}
Runner Factory Patternโ
Dynamic Dispatch: Removes need for large if/elif chains
- Cleaner codebase with separation of concerns
- Easy to add new node types without modifying core engine
- Type-safe dispatch with Enum-based routing
Trade-off: Requires explicit registration in factory.py, but provides compile-time safety and clear documentation of supported node types.
Graph-Based Executionโ
Topological Sort: Ensures correct execution order
- Detects cycles at graph construction time (fail-fast)
- Supports conditional execution via output port selection
- Handles fan-out with activation tracking
Attached Node Filtering: WorkflowGraph excludes attached nodes
- Prevents double execution of TOOL/MEMORY nodes
- Maintains clean separation between workflow graph and attachment logic
- Attached nodes managed by parent AI_AGENT runner
Fail-Fast Error Handlingโ
Philosophy: "Fail Fast with Clear Feedback" (from CLAUDE.md)
- Never return mock responses or silent failures
- Structured errors with error_code, error_message, error_details
- Actionable solutions provided in error responses
Example Error Response:
{
"success": False,
"error_code": "missing_oauth_token",
"error_message": "Slack OAuth token not found",
"error_details": {
"reason": "missing_oauth_token",
"solution": "Connect Slack account in integrations settings",
"oauth_flow_url": "/integrations/connect/slack"
}
}
5. System Interactionsโ
5.1 Internal Interactionsโ
API Gateway โ Workflow Engineโ
Execute Workflow:
POST /v2/workflows/{workflow_id}/execute
Content-Type: application/json
{
"trigger_data": {"user_input": "Hello!"},
"async_execution": true,
"start_from_node": null
}
Response:
{
"success": true,
"execution_id": "exec_123",
"execution": {
"id": "exec_123",
"workflow_id": "wf_456",
"status": "RUNNING",
"start_time": 1705000000000
}
}
Resume HIL Workflowโ
Flow:
- User responds to HIL interaction (Slack/Email/Web)
- Response processed by API Gateway
- API Gateway calls Workflow Engine resume endpoint
- Engine restores execution context from database
- Engine updates paused node with user response
- Engine continues workflow from next nodes
Resume Endpoint:
POST /v2/executions/{execution_id}/resume
Content-Type: application/json
{
"node_id": "hil_node_1",
"user_response": {
"approved": true,
"comment": "Looks good!"
}
}
5.2 External Integrationsโ
AI Providersโ
Unified Interface (services/ai_providers.py):
class AIProvider(ABC):
def generate(self, prompt: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Generate AI response with provider-specific implementation"""
pass
# Implementations:
# - OpenAIProvider: Uses openai SDK
# - AnthropicProvider: Uses anthropic SDK
# - GeminiProvider: Uses google.generativeai SDK
Provider Configuration:
- Model selection via node configuration
- Temperature, max_tokens, top_p customizable per node
- Tool calling support for MCP integration
- Streaming support (chunked responses)
External Action Servicesโ
OAuth-Based Integrations:
- Slack: send_message, create_channel, update_message
- GitHub: create_issue, create_pr, add_comment
- Notion: create_page, update_page, query_database
- Google Calendar: create_event, update_event, list_events
Authentication Flow:
- User connects account via API Gateway OAuth flow
- OAuth tokens stored in Supabase with encryption
- Workflow Engine retrieves tokens via credential service
- Tokens automatically refreshed when expired
6. Non-Functional Requirementsโ
6.1 Performanceโ
Targets:
- Workflow execution initiation: < 100ms
- Simple node execution (no external calls): < 50ms
- AI node execution: < 5s (depends on AI provider)
- Database query latency: < 100ms
- Full workflow execution: < 30s for typical workflows
Optimization Strategies:
- Supabase connection pooling for database operations
- Async execution with FastAPI background tasks
- Caching of node specifications
- Efficient graph traversal algorithms
- Minimal serialization overhead with Pydantic
Caching:
- Node specs cached in memory after first load
- Workflow definitions cached per execution
- AI provider clients reused across executions
6.2 Scalabilityโ
Horizontal Scaling:
- Stateless API layer (FastAPI)
- Execution state stored in Supabase (shared across instances)
- Background task processing via FastAPI BackgroundTasks
- Ready for message queue integration (future: Celery/RQ)
Resource Considerations:
- Memory: ~200MB per instance (base)
- CPU: 0.25 vCPU per instance (ECS Fargate)
- Database connections: 10 per instance (Supabase pool)
- Concurrent executions: Limited by ECS task count
6.3 Securityโ
Authentication:
- JWT tokens from Supabase Auth
- Row-level security (RLS) for multi-tenant isolation
- API key authentication for MCP endpoints
Data Encryption:
- OAuth tokens encrypted at rest (credential_encryption service)
- TLS for all external API calls
- Environment variables for secrets (AWS SSM Parameters)
Input Validation:
- Pydantic models for all API requests
- Node spec validation before execution
- SQL injection prevention via parameterized queries
6.4 Reliabilityโ
Error Handling:
- Per-node retry with exponential backoff
- Structured error responses with error codes
- Execution state persisted after each node
- Graceful degradation for external service failures
Failure Recovery:
- Automatic retry for transient failures (network, timeout)
- Manual retry capability for failed nodes
- Workflow pause/resume for long-running executions
- Complete execution history for debugging
Monitoring & Logging:
- Structured logging with log levels (INFO, WARNING, ERROR)
- Execution events published to event system
- User-friendly logs for frontend display
- Backend developer logs with detailed diagnostics
6.5 Testing & Observabilityโ
Testing Strategyโ
Unit Testing:
- Runner tests: Verify node execution logic
- Graph tests: Cycle detection, topological sort
- Spec tests: Validation and type coercion
- Service tests: HIL service, memory service, AI providers
Integration Testing:
- End-to-end workflow execution
- HIL pause/resume flows
- External action integration tests
- Database persistence verification
Test Coverage:
- Target: >= 80% code coverage
- Critical paths: >= 95% coverage (execution engine, runners)
- Edge cases: Cycle detection, error handling, retry logic
Testing Automation:
- pytest with async support
- GitHub Actions CI/CD pipeline
- Pre-deployment integration tests
Observabilityโ
Key Metrics:
- Latency: Node execution time, workflow duration
- Throughput: Workflows executed per minute, nodes per second
- Error Rates: Failed executions, failed nodes, retry counts
- Resource Utilization: Memory usage, CPU usage, database connections
Logging Strategy:
- INFO: Workflow start/end, node execution milestones
- WARNING: Retry attempts, timeout warnings
- ERROR: Execution failures, external API errors
- DEBUG: Detailed input/output data, graph construction
Application Performance Monitoring:
- Execution traces with unique trace_id
- Per-node performance tracking
- Database query performance
- External API latency tracking
Monitoring & Alertingโ
Dashboards:
- Real-time execution status
- Node execution timeline
- Error rate trends
- Resource utilization graphs
Alert Thresholds:
- Error rate > 5% over 5 minutes
- Average execution time > 60s
- Database connection pool exhaustion
- External API failure rate > 10%
SLIs and SLOs:
- Availability: >= 99.9% uptime
- Latency: p95 < 10s for workflow execution
- Success Rate: >= 95% successful executions
Incident Response:
- Automatic alerts via PagerDuty/Slack
- Execution logs retrieved from Supabase
- Retry failed workflows manually
- Escalate to on-call engineer if needed
7. Human-in-the-Loop (HIL) Architectureโ
7.1 HIL Workflow Patternโ
5-Phase Execution Flow:
-
HIL Node Startup:
- Extract configuration (interaction_type, channel_type, timeout_seconds)
- Validate parameters against HIL spec
- Extract user_id from trigger/execution context
- Return
_hil_wait: trueto signal pause
-
Workflow Pause:
- ExecutionEngine detects
_hil_waitflag - Creates record in
hil_interactionstable - Creates record in
workflow_execution_pausestable - Updates workflow status to WAITING_FOR_HUMAN
- Stores complete execution context for resume
- ExecutionEngine detects
-
Interaction Request:
- HILService sends notification via configured channel
- Slack: Interactive message with action buttons
- Email: Email with approval links
- App: In-app notification with form
-
Human Response:
- User responds via Slack/Email/Web interface
- Response webhook received by API Gateway
- AI classification (8-factor analysis):
relevant(score >= 0.7): Process responsefiltered(score <= 0.3): Ignore spamuncertain(0.3 < score < 0.7): Log for review
- Update
hil_interactions.statusto "completed"
-
Workflow Resume:
- API Gateway calls
/v2/executions/{id}/resume - ExecutionEngine restores context from database
- HIL node output includes user response data
- Workflow continues from successor nodes
- Update
workflow_execution_pauses.statusto "resumed"
- API Gateway calls
7.2 HIL Configurationโ
Node Configuration:
{
"interaction_type": "approval", # approval|input|selection|review
"channel_type": "slack", # slack|email|webhook|app
"timeout_seconds": 3600, # 60 to 86400 (1 hour to 24 hours)
"message": "Please approve this request",
"approval_options": ["approve", "reject"],
"channel_config": {
"channel": "#approvals", # Slack channel
},
"timeout_action": "fail" # fail|continue|default_response
}
7.3 Database Schemaโ
hil_interactions:
CREATE TABLE hil_interactions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL,
execution_id VARCHAR(255) NOT NULL,
node_id VARCHAR(255) NOT NULL,
user_id VARCHAR(255) NOT NULL,
interaction_type VARCHAR(50) NOT NULL,
channel_type VARCHAR(50) NOT NULL,
request_data JSONB NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
timeout_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
responded_at TIMESTAMP,
response_data JSONB
);
workflow_execution_pauses:
CREATE TABLE workflow_execution_pauses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
execution_id VARCHAR(255) NOT NULL,
paused_node_id VARCHAR(255) NOT NULL,
pause_reason VARCHAR(255) NOT NULL,
resume_conditions JSONB NOT NULL,
status VARCHAR(50) DEFAULT 'active',
paused_at TIMESTAMP DEFAULT NOW(),
resumed_at TIMESTAMP,
hil_interaction_id UUID
);
8. Technical Debt and Future Considerationsโ
Known Limitationsโ
Current State:
- In-memory execution store has no persistence between restarts (mitigated by Supabase repository)
- Limited support for distributed execution (single-instance design)
- No built-in workflow versioning or rollback
- Manual OAuth token refresh (not fully automated)
Areas for Improvementโ
Short-Term (Next Quarter):
- Implement workflow version control with rollback
- Add distributed tracing with OpenTelemetry
- Enhance error recovery with automatic retry policies
- Implement workflow debugging tools (breakpoints, step-through)
Medium-Term (6-12 Months):
- Support for parallel execution of independent nodes
- Workflow optimization recommendations based on execution history
- Advanced monitoring dashboards with custom metrics
- Workflow testing framework for pre-deployment validation
Long-Term (12+ Months):
- Distributed execution with message queue (Celery/RabbitMQ)
- Workflow analytics and intelligence layer
- Auto-scaling based on execution volume
- Multi-region deployment support
Migration Pathsโ
From V1 to V2:
- Gradual migration with parallel execution support
- Workflow conversion tool for V1 โ V2 format
- Backward compatibility layer for V1 API endpoints
- Deprecation timeline: 6 months after V2 GA
9. Appendicesโ
A. Glossaryโ
- Attached Node: TOOL or MEMORY node attached to AI_AGENT node, executed as part of AI context enhancement (not as separate workflow step)
- Conversion Function: Python code snippet that transforms data during connection propagation
- Execution Context: Complete runtime state including node outputs, pending inputs, and workflow definition
- Fan-out: Executing a node multiple times with different inputs (via LOOP/FOR_EACH nodes)
- HIL (Human-in-the-Loop): Workflow pause pattern requiring human interaction before continuation
- Node Spec: Centralized specification defining node configuration schema, inputs, outputs, and validation rules
- Output Port: Named output channel from a node (e.g., "result", "true", "false", "iteration")
- Runner: Executor class responsible for running a specific node type
- Topological Sort: Graph ordering algorithm ensuring nodes execute after all dependencies
- Workflow Graph: Directed acyclic graph (DAG) representing node dependencies
B. Referencesโ
Internal Documentation:
/docs/tech-design/new_workflow_spec.md: Complete workflow data model specification/apps/backend/workflow_engine_v2/README.md: Service-specific setup and development guide/apps/backend/CLAUDE.md: Backend architecture and development patterns/shared/node_specs/README.md: Node specification system documentation
External Resources:
- FastAPI Documentation: https://fastapi.tiangolo.com/
- Pydantic Models: https://docs.pydantic.dev/
- Supabase Python Client: https://supabase.com/docs/reference/python
- OpenAI API: https://platform.openai.com/docs
- Anthropic Claude API: https://docs.anthropic.com/
- Google Gemini API: https://ai.google.dev/docs
Code Examples:
/apps/backend/workflow_engine_v2/examples/: Example workflows and usage patterns/apps/backend/workflow_engine_v2/tests/: Comprehensive test suite with examples