Workflow Scheduler Technical Design
1. Executive Summaryโ
workflow_scheduler is the trigger management and scheduling core of the Workflow System, responsible for managing workflow trigger conditions and coordinating workflow execution with the workflow_engine.
Core Responsibilitiesโ
- Trigger Management: Manage Cron, Manual, Webhook, Email, GitHub, Slack, and Google Calendar triggers
- Schedule Monitoring: Continuously monitor trigger conditions to ensure timely response
- Deployment Management: Manage workflow deployment status and trigger configuration
- Schedule Coordination: Call workflow_engine via HTTP when trigger conditions are met
Technology Stackโ
- Web Framework: FastAPI (Python 3.11+)
- Scheduler: APScheduler for cron triggers
- Database: Supabase PostgreSQL with RLS
- Cache/Lock: Redis for distributed locking
- HTTP Client: httpx for async HTTP calls
- Dependency Management: uv for Python packages
2. System Architectureโ
2.1 High-Level Architectureโ
2.2 Service Communicationโ
- Port: 8003 (HTTP/REST)
- Protocol: FastAPI with OpenAPI documentation
- Health Check:
curl -f http://localhost:8003/health - Start Period: 60s
Key Endpoints:
/api/v1/deployment/workflows/{workflow_id}/deploy- Deploy workflow/api/v1/deployment/workflows/{workflow_id}/undeploy- Undeploy workflow/api/v1/triggers/workflows/{workflow_id}/manual- Manual trigger/api/v1/github/webhook- GitHub webhook handler/api/v1/slack/events- Slack event handler
3. Core Componentsโ
3.1 DeploymentServiceโ
Responsibility: Manage the complete workflow deployment lifecycle
Deployment Status Flow:
UNDEPLOYED โ DEPLOYING โ DEPLOYED
โ
DEPLOYMENT_FAILED
Core Operations:
-
deploy_workflow(workflow_id, workflow_spec):
- Validate workflow definition (nodes, triggers, connections)
- Extract trigger specifications from workflow nodes
- Register triggers with TriggerManager (in-memory)
- Register triggers with TriggerIndexManager (database)
- Update workflow deployment status in database
- Create deployment history record
-
undeploy_workflow(workflow_id):
- Unregister triggers from TriggerManager
- Remove trigger index entries
- Update workflow status to UNDEPLOYED
- Create deployment history record
-
deploy_workflow_from_database(workflow_id):
- Fetch workflow spec from database
- Call deploy_workflow with fetched spec
Auto-Resolution Features:
- GitHub Integration: Automatically resolves
installation_idfrom user's OAuth token - Slack Integration: Automatically resolves
workspace_idand converts channel names to IDs
3.2 TriggerManagerโ
Responsibility: Central manager for all trigger types and their lifecycle
Supported Trigger Types (from shared.models.node_enums.TriggerSubtype):
MANUAL: User-initiated triggersCRON: Time-based scheduled triggersWEBHOOK: HTTP endpoint triggersSLACK: Slack event triggersEMAIL: Email monitoring triggersGITHUB: GitHub webhook triggersGOOGLE_CALENDAR: Google Calendar event triggers
Core Operations:
-
register_triggers(workflow_id, trigger_specs):
- Create trigger instances from specifications
- Start each trigger (setup monitoring/scheduling)
- Store active triggers in memory
- Return success/failure status
-
unregister_triggers(workflow_id):
- Stop all triggers for workflow
- Cleanup trigger resources
- Remove from in-memory registry
-
restore_active_triggers():
- Called on service startup
- Query trigger_index table for active triggers
- Register triggers for all deployed workflows
Trigger Factory Pattern:
# Register trigger classes
trigger_manager.register_trigger_class(TriggerSubtype.CRON, CronTrigger)
trigger_manager.register_trigger_class(TriggerSubtype.MANUAL, ManualTrigger)
trigger_manager.register_trigger_class(TriggerSubtype.WEBHOOK, WebhookTrigger)
# Create trigger from specification
trigger = trigger_class(workflow_id, trigger_config)
await trigger.start()
3.3 TriggerIndexManagerโ
Responsibility: Manage trigger indexing for fast event routing and matching
Database Table: trigger_index
Schema Design:
CREATE TABLE trigger_index (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL,
trigger_type TEXT NOT NULL, -- "TRIGGER"
trigger_subtype TEXT NOT NULL, -- "CRON", "SLACK", "GITHUB", etc.
trigger_config JSONB NOT NULL, -- Full trigger configuration
index_key TEXT, -- Fast lookup key
deployment_status TEXT DEFAULT 'active', -- 'active', 'inactive', 'pending', 'failed'
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_trigger_index_key ON trigger_index(trigger_subtype, index_key)
WHERE index_key IS NOT NULL;
Index Key Strategy (Unified Fast Lookup):
- CRON:
cron_expression(e.g., "0 9 * * MON-FRI") - WEBHOOK:
webhook_path(e.g., "/webhook/workflow-123") - SLACK:
workspace_id(e.g., "T1234567890") - EMAIL:
email_filter(e.g., "support@example.com") - GITHUB:
repository(e.g., "owner/repo-name") - MANUAL: No index_key (direct workflow_id lookup)
Two-Phase Matching:
- Coarse Filtering: Fast lookup using index_key
- Detailed Matching: Apply filters from trigger_config JSONB (channels, event types, branches, paths)
3.4 EventRouterโ
Responsibility: Fast event routing using database indexes
Core Routing Methods:
-
route_cron_event(cron_expression, timezone):
- Query:
SELECT * FROM trigger_index WHERE trigger_subtype='CRON' AND index_key=? - Returns: List of matching workflows
- Query:
-
route_webhook_event(path, method, headers, payload):
- Query:
SELECT * FROM trigger_index WHERE trigger_subtype='WEBHOOK' AND index_key=? - Additional validation: Check allowed_methods from config
- Query:
-
route_github_event(event_type, delivery_id, payload):
- Extract repository from payload
- Query:
SELECT * FROM trigger_index WHERE trigger_subtype='GITHUB' AND index_key=repository - Detailed validation: Check event_config, branches, paths, actions
-
route_slack_event(event_data):
- Extract workspace_id from event
- Query:
SELECT * FROM trigger_index WHERE trigger_subtype='SLACK' AND (index_key=workspace_id OR index_key IS NULL) - Detailed validation: Check event_types, channels, users, mention_required
Performance Optimization:
- Database index on (trigger_subtype, index_key)
- JSONB containment operators for config filtering
- Connection pooling with async database sessions
3.5 BaseTrigger (Abstract Base Class)โ
Responsibility: Common trigger functionality and workflow execution interface
Key Features:
-
Smart Resume Detection:
- Checks for paused workflow executions
- Automatically resumes HIL workflows instead of starting new execution
- Query:
/v2/workflows/{workflow_id}/executionswith status filter
-
Workflow Execution:
- Calls workflow_engine:
POST /v2/workflows/{workflow_id}/execute - Auto-resolves workflow owner user_id from database
- Supports JWT token forwarding for authenticated execution
- Timeout: 2 hours (for human-in-the-loop support)
- Calls workflow_engine:
-
Jitter Calculation:
- Hash-based jitter for distributed execution
- Ensures consistent but spread-out timing
- Range: 0-30 seconds
4. Trigger Type Implementationsโ
4.1 CronTriggerโ
Technology: Python APScheduler with AsyncIOScheduler
Configuration:
{
"cron_expression": "0 9 * * MON-FRI",
"timezone": "America/New_York",
"enabled": true
}
Implementation:
class CronTrigger(BaseTrigger):
async def start(self):
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger as APSCronTrigger
self.scheduler = AsyncIOScheduler()
self.scheduler.add_job(
func=self._execute_workflow,
trigger=APSCronTrigger.from_crontab(self.config['cron_expression']),
timezone=self.config.get('timezone', 'UTC'),
id=f"cron_{self.workflow_id}"
)
self.scheduler.start()
Features:
- Standard cron expression syntax
- Timezone support
- Hash-based jitter for load distribution
- Distributed lock prevents duplicate execution
4.2 ManualTriggerโ
Configuration:
{
"enabled": true,
"require_confirmation": false
}
API Endpoint:
POST /api/v1/triggers/workflows/{workflow_id}/manual
Authorization: Bearer {jwt_token}
Implementation:
class ManualTrigger(BaseTrigger):
async def trigger_manual(self, user_id: str, access_token: Optional[str] = None):
trigger_data = {
"trigger_type": "MANUAL",
"user_id": user_id,
"trigger_time": datetime.now().isoformat()
}
return await self._trigger_workflow(trigger_data, access_token)
Features:
- Immediate execution
- User identity tracking
- JWT token forwarding
- Optional confirmation requirement
4.3 WebhookTriggerโ
Configuration:
{
"webhook_path": "/webhook/my-workflow",
"allowed_methods": ["POST", "PUT"],
"enabled": true
}
Webhook URL Format:
https://api.example.com/api/v1/public/webhook/{workflow_id}
https://api.example.com/api/v1/public/webhook/custom-path
Implementation:
class WebhookTrigger(BaseTrigger):
async def process_webhook(self, request_data: dict):
trigger_data = {
"headers": request_data["headers"],
"body": request_data["body"],
"query_params": request_data["query_params"],
"method": request_data["method"],
"path": request_data["path"]
}
return await self._trigger_workflow(trigger_data)
Features:
- Unique webhook path per workflow
- HTTP method filtering
- Header and query parameter capture
- Request body forwarding
4.4 SlackTriggerโ
Configuration:
{
"workspace_id": "T1234567890",
"channels": ["C09D2JW6814", "C09D2JW6815"],
"event_types": ["message", "app_mention"],
"mention_required": false,
"ignore_bots": true
}
Auto-Resolution:
workspace_id: Auto-resolved from user's Slack OAuth token during deploymentchannels: Channel names converted to IDs (e.g., "general" โ "C09D2JW6814")
Event Processing:
- Slack platform sends event to API Gateway
- API Gateway routes to EventRouter
- EventRouter queries trigger_index with workspace_id
- Detailed filtering: channels, event_types, users, mentions
- Matching workflows triggered
Features:
- Multi-channel support
- Event type filtering (message, app_mention, reaction_added, etc.)
- User and bot filtering
- Mention detection
- Thread requirement support
4.5 GitHubTriggerโ
Configuration:
{
"github_app_installation_id": "12345678",
"repository": "owner/repo-name",
"event_config": {
"push": {
"branches": ["main", "develop"],
"paths": ["src/**", "*.md"]
},
"pull_request": {
"actions": ["opened", "synchronize"],
"branches": ["main"]
}
}
}
Auto-Resolution:
github_app_installation_id: Auto-resolved from user's GitHub OAuth token during deployment
Event Processing:
- GitHub sends webhook to API Gateway
/api/v1/github/webhook - Webhook signature verification
- EventRouter queries trigger_index with repository name
- Detailed filtering: event types, branches, paths, actions
- Matching workflows triggered
Advanced Filtering:
- Branch patterns:
["main", "release/*", "feature/*"] - Path patterns:
["src/**/*.py", "docs/*.md"] - Action filters:
["opened", "synchronize", "reopened"] - Author filters: Regex patterns for commit authors
Features:
- GitHub App integration with installation_id
- Multi-event support (push, pull_request, issues, release, etc.)
- Complex filtering (branches, paths, actions, authors)
- Webhook event storage for debugging
4.6 EmailTriggerโ
Configuration:
{
"email_filter": "support@example.com",
"folder": "INBOX",
"mark_as_read": true,
"check_interval": 60
}
Implementation:
class EmailTrigger(BaseTrigger):
async def start(self):
asyncio.create_task(self._monitor_emails())
async def _monitor_emails(self):
while True:
# Check for new emails via IMAP
# Apply email_filter
# Trigger matching workflows
await asyncio.sleep(self.check_interval)
Features:
- IMAP-based email monitoring
- Sender, subject, content filtering
- Attachment handling
- Auto-mark as read
- Configurable check interval
5. Deployment Workflowsโ
5.1 Workflow Deployment Flowโ
5.2 Workflow Undeployment Flowโ
5.3 Service Startup Trigger Restorationโ
6. Trigger Execution Flowsโ
6.1 Cron Trigger Executionโ
6.2 Manual Trigger Executionโ
6.3 Webhook Trigger Executionโ
6.4 GitHub Webhook Executionโ
6.5 Slack Event Executionโ
7. Human-in-the-Loop (HIL) Integrationโ
7.1 HIL Workflow Resume Patternโ
The workflow_scheduler implements intelligent resume detection for Human-in-the-Loop workflows:
Smart Resume Logic:
- When trigger fires (e.g., Slack message received)
- Check for paused executions of the workflow
- If paused execution exists, resume it with trigger data as human response
- Otherwise, start new execution
Resume Detection Implementation:
async def _check_for_resume_opportunity(self, trigger_data: dict) -> dict:
# Query workflow_engine for paused executions
engine_url = f"{settings.workflow_engine_url}/v2/workflows/{self.workflow_id}/executions"
response = await client.get(engine_url)
executions = response.json().get("workflows", [])
paused_executions = [e for e in executions if e["status"] == "PAUSED"]
if paused_executions:
most_recent = max(paused_executions, key=lambda x: x["started_at"])
return {
"should_resume": True,
"execution_id": most_recent["execution_id"]
}
return {"should_resume": False}
Resume Execution:
async def _resume_paused_workflow(self, execution_id: str, trigger_data: dict):
engine_url = f"{settings.workflow_engine_url}/v2/executions/{execution_id}/resume"
resume_payload = {
"resume_data": trigger_data,
"interaction_id": trigger_data.get("interaction_id")
}
response = await self._client.post(engine_url, json=resume_payload)
7.2 HIL Use Casesโ
Slack HIL Workflow:
- Workflow pauses at HIL node
- HIL node sends Slack message: "Please approve this deployment"
- User responds in Slack: "approved"
- Slack event triggers workflow
- Scheduler detects paused execution and resumes with response
- Workflow continues with approval
Email HIL Workflow:
- Workflow pauses at HIL node
- HIL node sends email: "Review this document"
- User replies to email
- Email trigger fires
- Scheduler resumes paused workflow with email content
8. Configuration & Environmentโ
8.1 Environment Variablesโ
# Core Service
PORT="8003"
HOST="0.0.0.0"
DEBUG="false"
# External Services
WORKFLOW_ENGINE_URL="http://workflow-engine:8002"
API_GATEWAY_URL="http://api-gateway:8000"
# Database
SUPABASE_URL="https://your-project.supabase.co"
SUPABASE_SECRET_KEY="service-role-key"
SUPABASE_PUB_KEY="anon-key"
# Redis
REDIS_URL="redis://localhost:6379/1"
# APScheduler
SCHEDULER_TIMEZONE="UTC"
SCHEDULER_MAX_WORKERS="10"
# GitHub Integration
GITHUB_APP_ID="123456"
GITHUB_APP_PRIVATE_KEY="-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----"
GITHUB_WEBHOOK_SECRET="webhook-secret"
GITHUB_CLIENT_ID="github-oauth-client-id"
# Slack Integration
SLACK_BOT_TOKEN="xoxb-..."
SLACK_CLIENT_ID="slack-client-id"
SLACK_CLIENT_SECRET="slack-client-secret"
SLACK_SIGNING_SECRET="slack-signing-secret"
# Google OAuth
GOOGLE_CLIENT_ID="google-client-id"
GOOGLE_CLIENT_SECRET="google-client-secret"
# Distributed Locks
LOCK_TIMEOUT="300"
LOCK_RETRY_DELAY="0.1"
# Logging
LOG_LEVEL="INFO"
LOG_FORMAT="text"
8.2 Service Configuration (core/config.py)โ
class Settings(BaseSettings):
service_name: str = "workflow_scheduler"
host: str = "0.0.0.0"
port: int = 8003
debug: bool = False
workflow_engine_url: str = "http://localhost:8002"
api_gateway_url: str = "http://localhost:8000"
supabase_url: str = ""
supabase_secret_key: str = ""
redis_url: str = "redis://localhost:6379/1"
scheduler_timezone: str = "UTC"
scheduler_max_workers: int = 10
9. Database Schemaโ
9.1 trigger_index Tableโ
CREATE TABLE trigger_index (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
trigger_type TEXT NOT NULL,
trigger_subtype TEXT NOT NULL,
trigger_config JSONB NOT NULL,
index_key TEXT,
deployment_status TEXT DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
CONSTRAINT trigger_index_deployment_status_check
CHECK (deployment_status IN ('active', 'inactive', 'pending', 'failed'))
);
CREATE INDEX idx_trigger_index_key
ON trigger_index(trigger_subtype, index_key)
WHERE index_key IS NOT NULL;
CREATE INDEX idx_trigger_index_lookup
ON trigger_index(trigger_subtype, index_key, deployment_status)
WHERE index_key IS NOT NULL;
9.2 github_webhook_events Tableโ
CREATE TABLE github_webhook_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
delivery_id TEXT UNIQUE NOT NULL,
event_type TEXT NOT NULL,
installation_id BIGINT,
repository_id BIGINT,
payload JSONB,
processed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_github_webhook_events_delivery
ON github_webhook_events(delivery_id);
CREATE INDEX idx_github_webhook_events_repo
ON github_webhook_events(repository_id, event_type);
9.3 github_installations Tableโ
CREATE TABLE github_installations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
installation_id BIGINT UNIQUE NOT NULL,
account_id BIGINT NOT NULL,
account_login TEXT NOT NULL,
account_type TEXT NOT NULL,
repositories JSONB,
permissions JSONB,
access_token_expires_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_github_installations_user
ON github_installations(user_id);
10. Monitoring & Observabilityโ
10.1 Health Check Endpointsโ
@app.get("/health")
async def health_check():
# Check TriggerManager initialization
# Quick Redis connectivity test (2s timeout)
return {"service": "workflow_scheduler", "status": "healthy"}
@app.get("/health/detailed")
async def detailed_health_check():
# Check lock_manager (Redis)
# Check trigger_manager (active triggers)
return {
"service": "workflow_scheduler",
"status": "healthy",
"components": {
"lock_manager": "healthy",
"trigger_manager": {...}
}
}
10.2 Metrics Endpointโ
@app.get("/metrics")
async def get_metrics():
return {
"service": "workflow_scheduler",
"total_workflows": 10,
"total_triggers": 25,
"trigger_types": {
"CRON": 5,
"SLACK": 8,
"GITHUB": 7,
"MANUAL": 5
}
}
10.3 Key Metricsโ
-
Deployment Metrics:
- Total deployed workflows
- Deployment success/failure rate
- Average deployment time
-
Trigger Metrics:
- Active triggers by type
- Trigger execution count
- Trigger execution latency
- Failed trigger executions
-
Event Router Metrics:
- Event routing latency
- Matching workflow count per event
- Database query performance
-
System Metrics:
- Redis connection status
- Database connection pool status
- Memory usage
- CPU usage
10.4 OpenTelemetry Integrationโ
# Automatic tracing for all HTTP requests
app.add_middleware(TrackingMiddleware)
app.add_middleware(MetricsMiddleware, service_name="workflow-scheduler")
# OTLP endpoint configuration
environment = os.getenv("ENVIRONMENT", "development")
if environment in ["production", "staging"]:
otlp_endpoint = "http://localhost:4317" # AWS OTEL Collector sidecar
else:
otlp_endpoint = "http://otel-collector:4317" # Docker Compose
11. Security Considerationsโ
11.1 Authentication & Authorizationโ
- Internal API: Service-to-service authentication via API Gateway
- OAuth Integrations: User-specific OAuth tokens stored in oauth_tokens table
- GitHub Webhooks: Signature verification using GITHUB_WEBHOOK_SECRET
- Slack Events: Signature verification using SLACK_SIGNING_SECRET
- Manual Triggers: JWT token authentication and forwarding
11.2 Data Securityโ
- Sensitive Configuration: Encrypted storage in Supabase
- OAuth Tokens: Stored with encryption in oauth_tokens table
- Environment Secrets: AWS SSM Parameters for production
- Network Security: VPC isolation, security groups, TLS encryption
11.3 Rate Limiting & Abuse Preventionโ
- Distributed Locks: Prevent duplicate cron executions
- Jitter: Spread out concurrent executions
- Timeout Configuration: 2-hour timeout for HIL workflows
- Event Deduplication: Track processed webhook delivery IDs
12. Development & Deploymentโ
12.1 Development Commandsโ
# Install dependencies
cd apps/backend/workflow_scheduler
uv sync
# Run service locally
python -m workflow_scheduler.main
# Run tests
pytest tests/
# Test trigger system
python test_trigger_system.py
12.2 Docker Deploymentโ
# Build for AWS ECS (AMD64)
docker build --platform linux/amd64 \
-f apps/backend/workflow_scheduler/Dockerfile \
-t workflow-scheduler .
# Run with Docker Compose
cd apps/backend
docker-compose up workflow-scheduler
12.3 Project Structureโ
workflow_scheduler/
โโโ main.py # FastAPI app entry point
โโโ core/
โ โโโ config.py # Configuration management
โ โโโ database.py # Database connections
โ โโโ supabase_client.py # Supabase client
โโโ api/
โ โโโ deployment.py # Deployment endpoints
โ โโโ triggers.py # Trigger management endpoints
โ โโโ github.py # GitHub webhook handler
โ โโโ slack.py # Slack event handler
โ โโโ google_calendar.py # Google Calendar integration
โ โโโ auth.py # OAuth callbacks
โโโ services/
โ โโโ deployment_service.py # Deployment orchestration
โ โโโ trigger_manager.py # Trigger lifecycle management
โ โโโ trigger_index_manager.py # Database indexing
โ โโโ event_router.py # Fast event routing
โ โโโ lock_manager.py # Distributed locking
โ โโโ notification_service.py # Notifications
โ โโโ direct_db_service.py # Direct database access
โโโ triggers/
โ โโโ base.py # BaseTrigger abstract class
โ โโโ cron_trigger.py # Cron implementation
โ โโโ manual_trigger.py # Manual implementation
โ โโโ webhook_trigger.py # Webhook implementation
โ โโโ slack_trigger.py # Slack implementation
โ โโโ github_trigger.py # GitHub implementation
โ โโโ email_trigger.py # Email implementation
โโโ dependencies.py # FastAPI dependency injection
โโโ tests/ # Unit tests
โโโ pyproject.toml # Python project config
13. Key Design Decisionsโ
13.1 Unified Index Key Designโ
Decision: Use single index_key column for all trigger types instead of type-specific columns
Rationale:
- Simplifies database schema
- Reduces index maintenance
- Supports future trigger types without schema changes
- Enables consistent query patterns
Trade-offs:
- Requires type-specific index_key calculation logic
- JSONB config required for detailed filtering
13.2 Two-Phase Matchingโ
Decision: Coarse filtering with index_key, detailed filtering with trigger_config
Rationale:
- Fast initial filtering using database indexes
- Flexible detailed filtering without schema changes
- Supports complex trigger configurations
Trade-offs:
- Additional CPU overhead for detailed matching
- Cannot index all filter criteria
13.3 In-Memory + Database Hybridโ
Decision: Active triggers in memory (TriggerManager) + persistent index (TriggerIndexManager)
Rationale:
- Fast trigger execution without database queries
- Persistent state for service restarts
- Enables fast event routing via database indexes
Trade-offs:
- Memory usage scales with number of workflows
- Requires restoration on service startup
13.4 Auto-Resolution of Integration Credentialsโ
Decision: Automatically resolve OAuth credentials during deployment
Rationale:
- Simplified user experience (no manual credential entry)
- Ensures credentials are up-to-date
- Reduces configuration errors
Implementation:
- GitHub: Resolve installation_id from oauth_tokens
- Slack: Resolve workspace_id and channel IDs from oauth_tokens
13.5 Smart HIL Resume Detectionโ
Decision: Automatically detect and resume paused workflows on trigger
Rationale:
- Enables seamless HIL user experience
- Avoids duplicate workflow executions
- Supports conversational workflows
Trade-offs:
- Additional latency for paused execution check
- Complexity in matching trigger to paused execution
14. Future Enhancementsโ
14.1 Performance Optimizationsโ
- Redis caching for frequently matched triggers
- Pre-computed filter indexes for complex queries
- Batch event processing for high-volume triggers
- Connection pooling optimization
14.2 Feature Additionsโ
- Trigger condition expressions (e.g., "time == 9am AND day == Monday")
- Trigger dependencies (e.g., "trigger A must complete before trigger B")
- Trigger rate limiting per workflow
- Advanced HIL matching (context-aware resume)
- Trigger execution history and analytics
14.3 Scalabilityโ
- Multi-region deployment support
- Horizontal scaling with leader election
- Trigger sharding by workflow_id
- Event queue for high-volume processing
Appendicesโ
A. Glossaryโ
- Trigger: Condition that initiates workflow execution
- Deployment: Process of activating workflow and its triggers
- Index Key: Fast lookup field for trigger matching
- Event Router: Service for matching events to workflows
- HIL: Human-in-the-Loop, workflows requiring human interaction
- Jitter: Random delay to distribute execution load
- Distributed Lock: Redis-based lock to prevent duplicate execution
B. Referencesโ
- Workflow Specification:
docs/tech-design/new_workflow_spec.md - Node Specifications:
shared/node_specs/ - API Gateway Documentation:
apps/backend/api-gateway/docs/ - Supabase Migrations:
supabase/migrations/ - CLAUDE.md:
apps/backend/CLAUDE.md