Workflow Engine: Architecture & Execution Flow
1. Overview & Business Purpose
The Workflow Engine is the core execution component of the platform, designed to run automated workflows defined via a structured, Protobuf-based format. Its primary business purpose is to provide a reliable, scalable, and observable environment for executing complex processes, with a special emphasis on supporting AI-driven tasks.
Unlike traditional workflow systems, it is built to handle the intricate data flows required by modern AI Agents, such as providing specific "tools" or "memory" contexts to a node. Furthermore, it captures extremely detailed execution telemetry, enabling advanced features like AI-powered automatic debugging and process optimization.
2. High-Level Architecture
The engine is built on a modular, service-oriented architecture. A gRPC server exposes all functionality, which is internally delegated to specialized services that manage the lifecycle and execution of workflows.
3. The Lifecycle of a Workflow: From Creation to Execution
To understand the system, it's best to follow the journey of a single workflow.
3.1. Workflow Creation & Persistence
A workflow's life begins when a client sends a CreateWorkflow
request to the gRPC server.
- Delegation: The
MainWorkflowService
receives the request and delegates it to theWorkflowService
. - Data Modeling: The
WorkflowService
is responsible for persistence. A key design decision is how workflows are stored: the entire ProtobufWorkflow
message, including all its nodes and connections, is serialized into a single JSON object. - Storage: This JSON object is then saved into the
workflows
table in aJSONB
column namedworkflow_data
. This provides flexibility, as new node types or parameters can be added without requiring database schema migrations.
// In services/workflow_service.py
class WorkflowService:
def create_workflow(...):
# ...
# Convert protobuf to JSON for JSONB storage
from google.protobuf.json_format import MessageToDict
workflow_json = MessageToDict(workflow)
db_workflow = WorkflowModel(
id=workflow.id,
# ...
workflow_data=workflow_json, // Store protobuf as JSONB
tags=list(workflow.tags) // Store tags as native array
)
db.add(db_workflow)
db.commit()
# ...
3.2. Triggering an Execution
When a client calls ExecuteWorkflow
:
- Delegation: The request is passed from
MainWorkflowService
toExecutionService
. - Record Creation: The
ExecutionService
immediately creates a new record in theworkflow_executions
table with a status ofNEW
. This provides an immutable log of every execution attempt. - Engine Invocation: The
ExecutionService
then invokes theEnhancedWorkflowExecutionEngine
, passing it the workflow definition and initial input data.
3.3. The Execution Engine in Action
This is the core of the system, orchestrated by EnhancedWorkflowExecutionEngine
. The process is deterministic and observable.
Execution Steps:
- Planning (
_calculate_execution_order
): The engine first performs a topological sort on the workflow'sConnectionsMap
to determine the precise, dependency-aware order in which to execute the nodes. - Iterative Execution: The engine loops through the sorted list of nodes. For each node:
a. Data Aggregation (
_prepare_node_input_data_with_tracking
): This is a critical step. The engine inspects theConnectionsMap
to find all parent nodes connecting to the current node. It intelligently aggregates their outputs based on the connection type. For example, data from aMAIN
connection is merged directly into the input, while data from anAI_TOOL
connection is nested under anai_tool
key. This allows nodes (especially AI Agents) to receive complex, structured inputs from multiple sources. b. Node Dispatch: It uses theNodeExecutorFactory
to instantiate the correct executor class for the node's type (e.g.,AIAgentNodeExecutor
). c. Execution: It calls the executor'sexecute()
method, passing aNodeExecutionContext
object that contains the aggregated input data, credentials, and other metadata. d. Telemetry Capture: After the node returns a result, the engine captures a comprehensive snapshot of the operation—including the inputs, outputs, status, and performance metrics—and appends it to theexecution_path
. - Completion: Once all nodes have run (or if a node fails and the error policy is to stop), the engine returns the final state to the
ExecutionService
. - Persistence: The
ExecutionService
updates the correspondingworkflow_executions
record in the database, setting the final status and saving the entire detailed execution trace into therun_data
JSONB column.
4. Core Component Deep Dive
4.1. The Pluggable Node System
The engine's functionality is defined by its library of nodes. The system is designed to be easily extended with new nodes.
BaseNodeExecutor
(base.py
): This abstract class defines the contract for all nodes, requiring them to implement anexecute()
andvalidate()
method. This ensures all nodes behave predictably.NodeExecutorFactory
(factory.py
): This factory holds a registry of all available node types. On startup, it's populated with the default executors. To add a new node, a developer simply creates a new executor class and registers it with the factory.
// In nodes/action_node.py - A simplified example
class ActionNodeExecutor(BaseNodeExecutor):
def get_supported_subtypes(self) -> List[str]:
return ["HTTP_REQUEST", "RUN_CODE"]
def execute(self, context: NodeExecutionContext) -> NodeExecutionResult:
if context.node.subtype == "HTTP_REQUEST":
# ... logic to make an HTTP call ...
return self._create_success_result(output_data={"status_code": 200})
# ...
4.2. The AI Agent Architecture Revolution
The workflow engine has undergone a fundamental transformation in how it handles AI Agent nodes, moving from hardcoded roles to a flexible, provider-based architecture.
Legacy Approach (Before v2.0) ❌
Previously, AI agents were defined by rigid, hardcoded subtypes:
AI_ROUTER_AGENT
- Limited to routing decisionsAI_TASK_ANALYZER
- Only capable of task analysisAI_DATA_INTEGRATOR
- Fixed data integration logicAI_REPORT_GENERATOR
- Restricted to report generation
This approach required new code for each AI role and limited customization capabilities.
Provider-Based Architecture (v2.0+) ✅
The new architecture introduces three universal AI agent providers where functionality is entirely defined by system prompts:
// In nodes/ai_agent_node.py - New implementation
class AIAgentNodeExecutor(BaseNodeExecutor):
def get_supported_subtypes(self) -> List[str]:
return [
"GEMINI_NODE", # Google Gemini provider
"OPENAI_NODE", # OpenAI GPT provider
"CLAUDE_NODE" # Anthropic Claude provider
]
def execute(self, context: NodeExecutionContext) -> NodeExecutionResult:
subtype = context.node.subtype
if subtype == "GEMINI_NODE":
return self._execute_gemini_agent(context, logs, start_time)
elif subtype == "OPENAI_NODE":
return self._execute_openai_agent(context, logs, start_time)
elif subtype == "CLAUDE_NODE":
return self._execute_claude_agent(context, logs, start_time)
Key Benefits:
- Unlimited Functionality: Any AI task can be achieved through system prompts
- Easy Customization: Simply modify the system prompt parameter
- Provider Optimization: Leverage unique capabilities of each AI provider
- Simplified Codebase: Three providers instead of dozens of hardcoded roles
- Rapid Experimentation: Test new AI behaviors instantly
System Prompt Examples
Data Analysis Agent (Gemini):
{
"type": "AI_AGENT_NODE",
"subtype": "GEMINI_NODE",
"parameters": {
"system_prompt": "You are a senior data analyst. Analyze datasets and provide statistical insights, trend analysis, and business recommendations in structured JSON format.",
"model_version": "gemini-pro",
"temperature": 0.3
}
}
Customer Service Router (OpenAI):
{
"type": "AI_AGENT_NODE",
"subtype": "OPENAI_NODE",
"parameters": {
"system_prompt": "You are an intelligent customer service routing system. Analyze inquiries and route to appropriate departments (billing/technical/sales/general) with confidence scoring.",
"model_version": "gpt-4",
"temperature": 0.1
}
}
4.3. The ConnectionsMap
: A System for AI Data Flows
A standout feature is the ConnectionsMap
, which goes beyond simple linear connections. It supports 13 distinct connection types, allowing a workflow to model sophisticated data flows.
- Purpose: Different connection types allow a node to understand the semantic meaning of its inputs. An AI Agent node, for example, can distinguish between its main data input (
MAIN
), a tool it can use (AI_TOOL
), and a memory source it can query (AI_MEMORY
). - Implementation: The
_prepare_node_input_data_with_tracking
method in the execution engine is responsible for interpreting these types and structuring the input data accordingly.
// Example of a ConnectionsMap for a Provider-Based AI Agent
"Customer Analysis Agent": {
"connection_types": {
"ai_tool": {
"connections": [{"node": "Customer Database Tool", "type": "AI_TOOL"}]
},
"ai_memory": {
"connections": [{"node": "Customer Interaction History", "type": "AI_MEMORY"}]
},
"main": {
"connections": [{"node": "Data Ingestion", "type": "MAIN"}]
}
}
}
5. Extensibility
5.1. Traditional Node Extension
Adding new non-AI capabilities to the engine follows the established pattern:
- Create a New Node Executor: Write a new Python class in the
nodes/
directory that inherits fromBaseNodeExecutor
. - Implement the Logic: Implement the
execute()
andvalidate()
methods. - Register the Executor: Add the new executor to the
register_default_executors()
function innodes/factory.py
.
5.2. AI Agent Extension (Revolutionary Approach)
With the provider-based architecture, adding new AI capabilities is dramatically simpler:
No Code Required: Simply create a new workflow with a different system_prompt
:
{
"type": "AI_AGENT_NODE",
"subtype": "CLAUDE_NODE",
"parameters": {
"system_prompt": "You are a cybersecurity analyst. Review code for vulnerabilities, classify risks by CVSS scoring, and provide detailed remediation steps with secure code examples.",
"model_version": "claude-3-opus",
"temperature": 0.2
}
}
Adding New AI Providers: To add support for new AI providers (e.g., LLAMA_NODE
):
- Add the new subtype to
get_supported_subtypes()
inAIAgentNodeExecutor
- Implement a new
_execute_llama_agent()
method - Update the protobuf schema and regenerate
This approach has revolutionized development velocity - new AI functionalities can be created in minutes rather than hours or days.