Skip to main content

API Gateway Technical Design

1. Executive Summaryโ€‹

The API Gateway is the primary entry point for the Workflow Agent Team system, implementing a three-layer API architecture with FastAPI. It provides unified authentication, rate limiting, and request routing for different client types: public consumers, web/mobile applications, and LLM clients.

Key Architectural Decisionsโ€‹

  • Three-Layer API Architecture: Separation of public, app, and MCP APIs with distinct authentication patterns
  • HTTP/REST Migration: Complete migration from gRPC to HTTP/REST for simplified inter-service communication
  • Supabase Integration: Leverages Supabase for authentication, database access with Row Level Security (RLS)
  • Redis Caching: Performance optimization with Redis-based rate limiting and session state caching
  • FastAPI Framework: Async-first architecture with comprehensive middleware stack

Technology Stackโ€‹

  • Framework: FastAPI 0.115.5 with async/await support
  • Authentication: Supabase Auth with JWT token validation
  • Database: Supabase PostgreSQL with Row Level Security (RLS)
  • Cache: Redis for rate limiting and session state
  • HTTP Client: httpx for async service-to-service communication
  • Observability: OpenTelemetry with custom telemetry middleware
  • Deployment: Docker + AWS ECS Fargate

2. System Architectureโ€‹

2.1 High-Level Architectureโ€‹

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Client Layer โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Web Frontend โ”‚ Mobile Apps โ”‚ External APIs โ”‚ LLM Clients โ”‚
โ”‚ (Next.js) โ”‚ (iOS/Android) โ”‚ (Third-party) โ”‚ (Claude/GPT) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ JWT Token โ”‚ JWT Token โ”‚ No Auth โ”‚ API Key
โ”‚ โ”‚ โ”‚ โ”‚
โ–ผ โ–ผ โ–ผ โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ API Gateway (FastAPI - Port 8000) โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Middleware Stack โ”‚ โ”‚
โ”‚ โ”‚ 1. CORS Middleware โ”‚ โ”‚
โ”‚ โ”‚ 2. Tracking Middleware (trace_id generation) โ”‚ โ”‚
โ”‚ โ”‚ 3. Metrics Middleware (OpenTelemetry) โ”‚ โ”‚
โ”‚ โ”‚ 4. Rate Limit Middleware (Redis sliding window) โ”‚ โ”‚
โ”‚ โ”‚ 5. Unified Auth Middleware (layer-based auth) โ”‚ โ”‚
โ”‚ โ”‚ 6. Request Logging Middleware (process time tracking) โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Public API โ”‚ App API โ”‚ MCP API โ”‚ โ”‚
โ”‚ โ”‚ /api/v1/public โ”‚ /api/v1/app โ”‚ /api/v1/mcp โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โˆ™ No Auth โ”‚ โˆ™ JWT Auth โ”‚ โˆ™ API Key Auth โ”‚ โ”‚
โ”‚ โ”‚ โˆ™ Rate Limited โ”‚ โˆ™ RLS Enabled โ”‚ โˆ™ Scoped Permissions โ”‚ โ”‚
โ”‚ โ”‚ โˆ™ Health/Status โ”‚ โˆ™ Sessions โ”‚ โˆ™ Tool Discovery โ”‚ โ”‚
โ”‚ โ”‚ โˆ™ Docs/Webhook โ”‚ โˆ™ Chat/Workflowsโ”‚ โˆ™ Tool Invocation โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ โ”‚ โ”‚
โ”‚ HTTP/REST โ”‚ HTTP/REST โ”‚ HTTP/REST
โ–ผ โ–ผ โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Workflow Agent โ”‚ โ”‚ Workflow Engine โ”‚ โ”‚ Workflow Scheduler โ”‚
โ”‚ (Port 8001) โ”‚ โ”‚ (Port 8002) โ”‚ โ”‚ (Port 8003) โ”‚
โ”‚ AI Generation โ”‚ โ”‚ Execution โ”‚ โ”‚ Trigger Management โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Supabase โ”‚
โ”‚ - PostgreSQL + RLS โ”‚
โ”‚ - Auth + JWT โ”‚
โ”‚ - Vector Store โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Redis โ”‚
โ”‚ - Rate Limiting โ”‚
โ”‚ - Session Cache โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

2.2 Component Architectureโ€‹

API Layer Organizationโ€‹

apps/backend/api-gateway/
โ”œโ”€โ”€ app/
โ”‚ โ”œโ”€โ”€ main.py # Application factory with middleware stack
โ”‚ โ”œโ”€โ”€ core/ # Core infrastructure
โ”‚ โ”‚ โ”œโ”€โ”€ config.py # Settings with Pydantic validation
โ”‚ โ”‚ โ”œโ”€โ”€ database.py # Supabase integration with RLS
โ”‚ โ”‚ โ”œโ”€โ”€ database_direct.py # Direct PostgreSQL for high performance
โ”‚ โ”‚ โ””โ”€โ”€ events.py # Lifespan events and health checks
โ”‚ โ”œโ”€โ”€ api/ # Three-layer API structure
โ”‚ โ”‚ โ”œโ”€โ”€ public/ # Public API endpoints
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ health.py # Service health checks
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ status.py # System status information
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ docs.py # API documentation
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ auth.py # OAuth callback handlers
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ workflows.py # Public workflow information
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ webhooks.py # Webhook receivers
โ”‚ โ”‚ โ”‚ โ””โ”€โ”€ validation.py # Workflow validation service
โ”‚ โ”‚ โ”œโ”€โ”€ app/ # App API endpoints (JWT auth)
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ sessions.py # Session management with RLS
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ chat.py # SSE streaming chat interface
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ workflows.py # Workflow CRUD operations
โ”‚ โ”‚ โ”‚ โ”œโ”€โ”€ executions.py # Execution management and logs
โ”‚ โ”‚ โ”‚ โ””โ”€โ”€ integrations.py # OAuth integration management
โ”‚ โ”‚ โ””โ”€โ”€ mcp/ # MCP API endpoints (API key auth)
โ”‚ โ”‚ โ”œโ”€โ”€ tools.py # Tool discovery and invocation
โ”‚ โ”‚ โ”œโ”€โ”€ slack_tools.py # Slack MCP integration
โ”‚ โ”‚ โ”œโ”€โ”€ notion_tools.py # Notion MCP integration
โ”‚ โ”‚ โ”œโ”€โ”€ gmail_tools.py # Gmail MCP integration
โ”‚ โ”‚ โ””โ”€โ”€ github_tools.py # GitHub MCP integration
โ”‚ โ”œโ”€โ”€ middleware/ # Middleware components
โ”‚ โ”‚ โ”œโ”€โ”€ auth.py # Unified authentication middleware
โ”‚ โ”‚ โ”œโ”€โ”€ rate_limit.py # Redis-based rate limiting
โ”‚ โ”‚ โ””โ”€โ”€ validation.py # Request validation
โ”‚ โ”œโ”€โ”€ services/ # Business logic services
โ”‚ โ”‚ โ”œโ”€โ”€ auth_service.py # Supabase authentication
โ”‚ โ”‚ โ”œโ”€โ”€ workflow_agent_http_client.py # HTTP client for workflow agent
โ”‚ โ”‚ โ”œโ”€โ”€ workflow_engine_http_client.py # HTTP client for workflow engine
โ”‚ โ”‚ โ”œโ”€โ”€ workflow_scheduler_http_client.py # HTTP client for scheduler
โ”‚ โ”‚ โ”œโ”€โ”€ state_manager.py # Session state management
โ”‚ โ”‚ โ”œโ”€โ”€ response_processor.py # SSE response processing
โ”‚ โ”‚ โ””โ”€โ”€ cache.py # Redis cache operations
โ”‚ โ”œโ”€โ”€ models/ # Pydantic data models
โ”‚ โ”‚ โ””โ”€โ”€ __init__.py # Shared models
โ”‚ โ”œโ”€โ”€ utils/ # Utility functions
โ”‚ โ”‚ โ”œโ”€โ”€ logger.py # Structured logging
โ”‚ โ”‚ โ”œโ”€โ”€ sse.py # Server-Sent Events utilities
โ”‚ โ”‚ โ””โ”€โ”€ node_converter.py # Node format conversion
โ”‚ โ”œโ”€โ”€ dependencies.py # FastAPI dependency injection
โ”‚ โ””โ”€โ”€ exceptions.py # Custom exception handlers
โ”œโ”€โ”€ tests/ # Test suite
โ”‚ โ”œโ”€โ”€ test_basic.py # Core functionality tests
โ”‚ โ”œโ”€โ”€ test_integration.py # Integration tests
โ”‚ โ””โ”€โ”€ interactive_chat_client.py # Manual testing client
โ”œโ”€โ”€ pyproject.toml # uv package configuration
โ”œโ”€โ”€ Dockerfile # Multi-stage Docker build
โ””โ”€โ”€ .env.example # Environment template

3. Data Architectureโ€‹

3.1 Data Modelsโ€‹

Core Pydantic Modelsโ€‹

Authentication Models (app/middleware/auth.py):

class AuthResult:
success: bool
user: Optional[Dict[str, Any]] # Supabase user data
client: Optional[Dict[str, Any]] # MCP client data
token: Optional[str]
error: Optional[str]

class MCPApiKey:
id: str
client_name: str
scopes: List[str] # ["tools:read", "tools:execute", "health:check"]
rate_limit_tier: str # "standard", "premium", "development"
active: bool
created_at: datetime
expires_at: Optional[datetime]

Session Models:

class SessionCreate:
action: str # "create", "edit", "copy"
workflow_id: Optional[str]
meta_data: Dict[str, Any]

class SessionResponse:
session_id: str
user_id: str
created_at: str
updated_at: str

Chat Models:

class ChatRequest:
session_id: str
user_message: str
workflow_context: Optional[Dict[str, Any]]

class ConversationResponse:
session_id: str
response_type: str # "MESSAGE", "STATUS", "ERROR", "FINAL"
is_final: bool
message: Optional[str]
status: Optional[Dict[str, Any]]
error: Optional[Dict[str, Any]]

Workflow Models:

class WorkflowCreateRequest:
user_id: str
name: str
description: Optional[str]
nodes: List[Dict[str, Any]]
connections: Union[List[Dict[str, str]], Dict[str, Any]] # Supports both formats
triggers: Optional[List[str]]
settings: Optional[Dict[str, Any]]
tags: Optional[List[str]]

class WorkflowResponse:
workflow_id: str
name: str
description: Optional[str]
nodes: List[Dict[str, Any]]
connections: List[Dict[str, str]]
triggers: List[str]
created_by: str
created_time_ms: int

3.2 Data Flowโ€‹

Session and Chat Data Flowโ€‹

Workflow Execution Flowโ€‹

4. Implementation Detailsโ€‹

4.1 Core Componentsโ€‹

FastAPI Application Factory (app/main.py)โ€‹

def create_application() -> FastAPI:
"""Factory pattern application creation"""
app = FastAPI(
title="Workflow Agent API Gateway",
description="Three-layer API architecture: Public, App, MCP",
version="2.0.0",
lifespan=lifespan, # Async lifespan events
debug=settings.DEBUG,
)

# CORS configuration - Allow all origins for development
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Middleware stack (order matters!)
# 1. Tracking middleware (generates trace_id)
# 2. Metrics middleware (OpenTelemetry)
# 3. Rate limiting middleware
# 4. Authentication middleware
# 5. Request logging middleware

# Register routes
app.include_router(public_router, prefix="/api/v1/public", tags=["Public API"])
app.include_router(app_router, prefix="/api/v1/app", tags=["App API"])
app.include_router(mcp_router, prefix="/api/v1/mcp", tags=["MCP API"])

return app

Lifespan Events (app/core/events.py):

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events"""
# Startup: Connect to workflow services
await workflow_agent_client.connect()
await workflow_engine_client.connect()

yield

# Shutdown: Close connections
await workflow_agent_client.close()
await workflow_engine_client.close()

4.2 Three-Layer Authenticationโ€‹

Unified Authentication Middleware (app/middleware/auth.py)โ€‹

async def unified_auth_middleware(request: Request, call_next):
"""Layer-based authentication routing"""
path = request.url.path

# Public API - No authentication, rate limited only
if path.startswith("/api/v1/public/"):
return await call_next(request)

# MCP API - API Key authentication
if path.startswith("/api/v1/mcp/"):
if not settings.MCP_API_KEY_REQUIRED:
return await call_next(request)

auth_result = await authenticate_mcp_client(request)
if not auth_result.success:
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": f"MCP authentication failed: {auth_result.error}",
}
)

request.state.client = auth_result.client
request.state.auth_type = "mcp_api_key"

# App API - Supabase JWT authentication
elif path.startswith("/api/v1/app/"):
if not settings.SUPABASE_AUTH_ENABLED:
return await call_next(request)

auth_result = await authenticate_supabase_user(request)
if not auth_result.success:
return JSONResponse(
status_code=401 if auth_result.error != "malformed_token" else 400,
content={
"error": "unauthorized",
"message": f"Authentication failed: {auth_result.error}",
}
)

request.state.user = auth_result.user
request.state.user_id = auth_result.user.get("sub")
request.state.access_token = auth_result.token
request.state.auth_type = "supabase"

return await call_next(request)

Supabase JWT Validation:

async def authenticate_supabase_user(request: Request) -> AuthResult:
"""Validate Supabase JWT token"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return AuthResult(success=False, error="missing_token")

token = auth_header.split(" ", 1)[1].strip()

# Validate JWT format (3 segments separated by dots)
if not _validate_jwt_format_middleware(token):
return AuthResult(success=False, error="malformed_token")

# Verify with Supabase
user_data = await verify_supabase_token(token)
if not user_data:
return AuthResult(success=False, error="invalid_token")

return AuthResult(success=True, user=user_data, token=token)

MCP API Key Authentication:

class MCPAuthenticator:
"""API Key management and verification"""

def verify_api_key(self, api_key: str) -> Optional[MCPApiKey]:
"""Verify API key and check activation/expiration"""
api_key_obj = self.api_keys.get(api_key)
if not api_key_obj or not api_key_obj.active:
return None

if api_key_obj.expires_at and datetime.now(timezone.utc) > api_key_obj.expires_at:
return None

return api_key_obj

def has_required_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> bool:
"""Check scope permissions"""
if "admin" in user_scopes:
return True
return any(scope in user_scopes for scope in required_scopes)

4.3 Rate Limiting Systemโ€‹

Redis-Based Sliding Window (app/middleware/rate_limit.py)โ€‹

Rate Limit Configuration:

class RateLimitConfig:
# Public API - Per IP address
PUBLIC_LIMITS = {
"global": "1000/hour",
"/api/v1/public/health": "100/minute",
"/api/v1/public/status": "60/minute",
}

# App API - Per authenticated user
APP_LIMITS = {
"authenticated_user": "10000/hour",
"/api/v1/app/chat/stream": "100/hour",
"/api/v1/app/workflows/execute": "100/hour",
}

# MCP API - Per API key
MCP_LIMITS = {
"api_client": "50000/hour",
"/api/v1/mcp/invoke": "1000/hour",
}

Sliding Window Algorithm:

async def check_rate_limit(key: str, limit_str: str, request: Request) -> tuple[bool, Dict[str, Any]]:
"""Redis sliding window rate limiting"""
count, window_seconds = self._parse_limit(limit_str)
current_time = int(time.time())
window_start = current_time - window_seconds

# Redis pipeline for atomic operations
pipe = redis_client.pipeline()

# Remove expired entries
pipe.zremrangebyscore(key, 0, window_start)

# Get current count
pipe.zcard(key)

# Add current request
pipe.zadd(key, {f"{current_time}:{hash(str(request.url))}": current_time})

# Set expiration
pipe.expire(key, window_seconds + 10)

results = pipe.execute()
current_count = results[1]

allowed = current_count < count

return allowed, {
"current_count": current_count,
"limit": count,
"window_seconds": window_seconds,
"reset_time": current_time + (window_seconds - (current_time % window_seconds)),
}

4.4 Service Orchestrationโ€‹

Workflow Agent HTTP Client (app/services/workflow_agent_http_client.py)โ€‹

SSE Streaming Communication:

async def process_conversation_stream(
self,
session_id: str,
user_message: str,
user_id: str = "anonymous",
workflow_context: Optional[Dict[str, Any]] = None,
access_token: Optional[str] = None,
trace_id: Optional[str] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream conversation processing via HTTP SSE"""

request_data = {
"session_id": session_id,
"user_id": user_id,
"user_message": user_message,
"access_token": access_token or "",
}

if workflow_context:
request_data["workflow_context"] = {
"origin": workflow_context.get("origin", "create"),
"source_workflow_id": workflow_context.get("source_workflow_id", ""),
}

async with httpx.AsyncClient(timeout=self.timeout, http2=True) as client:
async with client.stream(
"POST",
f"{self.base_url}/process-conversation",
json=request_data,
headers={
"Accept": "text/event-stream",
"X-Trace-ID": trace_id or "",
},
) as response:
response.raise_for_status()

async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:]) # Remove "data: " prefix
yield data

Timeout Configuration:

self.timeout = httpx.Timeout(
timeout=1800.0, # 30 minutes total for complex workflows
connect=15.0, # 15 seconds connection timeout
read=600.0, # 10 minutes read timeout per chunk
write=60.0, # 60 seconds write timeout
)

Workflow Engine HTTP Client (app/services/workflow_engine_http_client.py)โ€‹

Workflow Creation:

async def create_workflow(
self,
name: str,
nodes: List[Dict[str, Any]],
connections: Union[List[Dict], Dict],
user_id: str = "anonymous",
trace_id: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""Create workflow in Workflow Engine v2"""

# Convert to v2 format
v2_nodes = [
{
"id": node.get("id") or node.get("name"),
"name": node.get("name"),
"type": node.get("type"),
"subtype": node.get("subtype"),
"configurations": node.get("configurations") or node.get("parameters", {}),
"attached_nodes": node.get("attached_nodes"),
}
for node in nodes
]

v2_payload = {
"workflow_id": str(uuid.uuid4()),
"name": name,
"created_by": user_id,
"nodes": v2_nodes,
"connections": _convert_connections(connections),
"triggers": [n["id"] for n in v2_nodes if n["type"] == "TRIGGER"],
}

async with httpx.AsyncClient(timeout=self.query_timeout) as client:
response = await client.post(
f"{self.base_url}/v2/workflows",
json=v2_payload,
headers={"X-Trace-ID": trace_id} if trace_id else {}
)
response.raise_for_status()
return response.json()

Workflow Execution:

async def execute_workflow(
self,
workflow_id: str,
user_id: str,
input_data: Optional[Dict[str, Any]] = None,
async_execution: bool = False,
access_token: Optional[str] = None,
) -> Dict[str, Any]:
"""Execute workflow with RLS"""

headers = {}
if access_token:
headers["Authorization"] = f"Bearer {access_token}"

v2_payload = {
"trigger_type": "manual",
"trigger_data": input_data or {},
"async_execution": async_execution,
"user_id": user_id,
}

client = await self._get_client()
response = await client.post(
f"{self.base_url}/v2/workflows/{workflow_id}/execute",
json=v2_payload,
headers=headers,
timeout=self.execute_timeout, # 5 minutes for long-running workflows
)
response.raise_for_status()
return response.json()

5. System Interactionsโ€‹

5.1 Internal Service Communicationโ€‹

HTTP/REST Communication Pattern:

  • All services migrated from gRPC to HTTP/REST
  • Service discovery via environment variables
  • Connection pooling with httpx for performance
  • Trace ID propagation for distributed tracing

Service Endpoints:

Workflow Agent (8001):
POST /process-conversation # SSE streaming workflow generation
GET /health # Health check

Workflow Engine (8002):
POST /v2/workflows # Create workflow
GET /v2/workflows/{id} # Get workflow
POST /v2/workflows/{id}/execute # Execute workflow
GET /v2/executions/{id} # Execution status
GET /v2/executions/{id}/logs # Execution logs

Workflow Scheduler (8003):
POST /triggers # Create trigger
GET /triggers/{id} # Get trigger status

5.2 External Integrationsโ€‹

Supabase Integrationโ€‹

Authentication Flow:

# JWT token verification
async def verify_supabase_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT with Supabase Auth"""
try:
user_response = supabase.auth.get_user(token)
if user_response.user:
return {
"sub": user_response.user.id,
"email": user_response.user.email,
"email_confirmed": user_response.user.email_confirmed_at is not None,
}
except Exception as e:
log_error(f"Token verification failed: {e}")
return None

Row Level Security (RLS):

class SupabaseRepository:
"""RLS-enabled repository pattern"""

def create(self, data: dict, access_token: Optional[str] = None) -> Optional[dict]:
"""Create with RLS enforcement"""
if access_token:
# User-scoped operation with RLS
client = get_user_supabase_client(access_token)
return client.table(self.table_name).insert(data).execute()
else:
# Admin operation bypassing RLS
return self.client.table(self.table_name).insert(data).execute()

Performance Optimization:

# Header-based auth (fast) vs session-based auth (slow)
# โŒ SLOW - Avoid making HTTP call on every request
client.auth.set_session(access_token, access_token)

# โœ… FAST - Use headers for authentication
client.headers["Authorization"] = f"Bearer {access_token}"

OAuth Integration Managementโ€‹

Supported Integrations:

  • Slack: OAuth for workspace integration
  • Notion: OAuth for page/database access
  • Google: OAuth for Gmail and Calendar
  • GitHub: OAuth for repository operations

OAuth Callback Handler (app/api/public/auth.py):

@router.get("/auth/{provider}/callback")
async def oauth_callback(
provider: str,
code: str,
state: str,
request: Request,
):
"""Handle OAuth callbacks from external providers"""
# Exchange code for access token
# Store in user_integrations table
# Redirect to frontend success page

Integration Storage (Supabase user_integrations table):

CREATE TABLE user_integrations (
id UUID PRIMARY KEY,
user_id UUID REFERENCES auth.users NOT NULL,
provider TEXT NOT NULL,
access_token TEXT NOT NULL,
refresh_token TEXT,
expires_at TIMESTAMPTZ,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);

6. Non-Functional Requirementsโ€‹

6.1 Performanceโ€‹

Targets:

  • API Gateway response time: < 50ms (excluding downstream services)
  • SSE streaming latency: < 100ms first byte
  • Rate limit check: < 5ms (Redis)
  • Supabase query: < 100ms with RLS

Optimization Strategies:

  • Connection pooling for HTTP clients (max 20 connections)
  • HTTP/2 multiplexing for service communication
  • Redis caching for rate limiting and session state
  • Async/await throughout the stack
  • Header-based Supabase auth (not session-based)

Caching Strategy:

# Redis TTL configuration
CACHE_TTL_JWT_TOKEN = 1800 # 30 minutes
CACHE_TTL_USER_INFO = 900 # 15 minutes
CACHE_TTL_RATE_LIMIT = 3600 # 1 hour
CACHE_TTL_SESSION_STATE = 7200 # 2 hours

6.2 Scalabilityโ€‹

Horizontal Scaling:

  • Stateless application design
  • Redis for shared state (rate limits, session cache)
  • Supabase connection pooling
  • AWS ECS auto-scaling based on CPU/memory

Load Distribution:

AWS Application Load Balancer
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ โ”‚ โ”‚ โ”‚
ECS Task ECS Task ECS Task ECS Task
(API GW) (API GW) (API GW) (API GW)
โ”‚ โ”‚ โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
Shared Redis & Supabase

Resource Considerations:

  • CPU: 0.5 vCPU per task (ECS Fargate)
  • Memory: 1GB per task
  • Redis: Single instance (ElastiCache for production)
  • Concurrent connections: ~100 per task with connection pooling

6.3 Securityโ€‹

Authentication Measures:

  • JWT token validation with Supabase
  • API key hashing and secure storage (future enhancement)
  • Token expiration and refresh handling
  • Rate limiting per user/IP/API key

Authorization Approach:

  • Row Level Security (RLS) for user data isolation
  • API key scopes for fine-grained MCP permissions
  • Admin service role key for system operations

Data Protection:

  • HTTPS/TLS for all external communication
  • Secure headers (CORS, CSP)
  • JWT tokens in Authorization header (not cookies)
  • No sensitive data in logs or traces

Input Validation:

# Pydantic models for request validation
class ChatRequest(BaseModel):
session_id: str = Field(..., min_length=1, max_length=255)
user_message: str = Field(..., min_length=1, max_length=10000)
workflow_context: Optional[Dict[str, Any]] = None

# Middleware validation
@app.middleware("http")
async def validation_middleware(request: Request, call_next):
# JWT format validation before Supabase call
if not _validate_jwt_format_middleware(token):
return JSONResponse(status_code=400, content={"error": "malformed_token"})

6.4 Reliabilityโ€‹

Error Handling:

# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
log_exception(f"Unhandled exception: {type(exc).__name__}: {str(exc)}")
return JSONResponse(
status_code=500,
content={
"error": "internal_server_error",
"message": "Internal server error occurred",
}
)

# Structured error responses
{
"error": "error_type",
"message": "Human-readable description",
"error_code": "SPECIFIC_ERROR_CODE",
"details": {...},
"trace_id": "uuid"
}

Retry Mechanisms:

  • HTTP client automatic retries (3 attempts) for network errors
  • Exponential backoff for transient failures
  • Circuit breaker pattern for failing downstream services (future)

Health Checks:

@app.get("/health")
async def health_check():
"""Comprehensive health check"""
checks = {
"api_gateway": True,
"workflow_agent": await check_workflow_agent_health(),
"workflow_engine": await check_workflow_engine_health(),
"supabase": await check_supabase_connection(),
"redis": await check_redis_connection(),
}

status = "healthy" if all(checks.values()) else "degraded"
return {"status": status, "checks": checks}

AWS ECS Health Check Configuration:

Health Check Command: curl -f http://localhost:8000/health || exit 1
Start Period: 120 seconds
Interval: 30 seconds
Timeout: 5 seconds
Retries: 3

6.5 Testing & Observabilityโ€‹

Testing Strategyโ€‹

Unit Testing:

# Run comprehensive test suite
uv run pytest tests/ -v

# Current test coverage (12 tests)
tests/test_basic.py:
- Root endpoint response validation
- Health check availability
- Version endpoint verification
- Authentication requirement for App API
- Authentication requirement for MCP API
- CORS headers validation
- Request ID generation
- Process time tracking
- Application factory pattern
- Configuration loading

Integration Testing:

# Test authentication flow
async def test_supabase_auth_flow():
# Obtain JWT token from Supabase
token = await get_test_jwt_token()

# Create authenticated session
response = await client.post(
"/api/v1/app/sessions",
headers={"Authorization": f"Bearer {token}"},
json={"action": "create"}
)
assert response.status_code == 200

Manual Testing:

# Interactive chat client
python tests/interactive_chat_client.py

# Health check
curl http://localhost:8000/health

# Authenticated workflow creation
curl -X POST http://localhost:8000/api/v1/app/workflows \
-H "Authorization: Bearer ${JWT_TOKEN}" \
-H "Content-Type: application/json" \
-d @workflow.json

Observabilityโ€‹

OpenTelemetry Integration:

# Automatic trace generation
setup_telemetry(
app,
service_name="api-gateway",
service_version="2.0.0",
otlp_endpoint="http://otel-collector:4317"
)

# Custom span attributes
request.state.trace_id = generate_trace_id()
request.state.user_id = user.get("sub")

Key Metrics:

  • Request rate per API layer (public/app/mcp)
  • Response time percentiles (p50, p95, p99)
  • Error rate by error type
  • Rate limit rejections per user/IP
  • Concurrent SSE streams
  • Downstream service latency

Logging Strategy:

# Structured logging with emoji indicators
logger.info(f"๐Ÿ“จ {method} {path} [Trace:{trace_id}]") # Request start
logger.info(f"โœ… Supabase auth successful: {user_email}") # Auth success
logger.warning(f"๐Ÿšซ Rate limit exceeded: {path}") # Rate limit
logger.error(f"โŒ HTTP error: {status_code}") # Errors
logger.info(f"๐Ÿ“ค {method} {path} -> {status} [{time}ms]") # Response

Log Levels:

  • DEBUG: Development with detailed request/response bodies
  • INFO: Production with request flow and key events
  • WARNING: Rate limits, auth failures, degraded service
  • ERROR: Exceptions, service failures, data errors

Monitoring & Alertingโ€‹

Dashboard Metrics:

API Gateway Dashboard:
- Request rate by API layer
- Authentication success/failure rate
- Rate limit hit rate
- Response time histogram
- Error rate by status code
- Active SSE streams
- Downstream service health

Alert Thresholds:

  • Error rate > 5% for 5 minutes โ†’ Page on-call
  • Response time p95 > 500ms for 10 minutes โ†’ Warning
  • Rate limit rejection > 20% for 5 minutes โ†’ Warning
  • Health check failure > 3 consecutive โ†’ Page on-call
  • Redis connection failure โ†’ Immediate page

SLIs/SLOs:

Service Level Indicators:
- Availability: 99.9% uptime
- Latency: 95% of requests \< 200ms
- Error rate: \< 1% of requests
- Authentication success: \> 99%

Service Level Objectives:
- Monthly uptime: 99.9% (43 minutes downtime/month)
- P95 latency: \< 200ms for App API
- P99 latency: \< 500ms for App API

7. Technical Debt and Future Considerationsโ€‹

7.1 Known Limitationsโ€‹

Authentication & Security:

  • API keys stored in configuration (not encrypted)
  • No API key rotation mechanism
  • Basic scope validation (no complex RBAC)
  • Rate limiting per-layer only (no per-endpoint customization in code)

Performance:

  • Single Redis instance (no cluster/replication)
  • No circuit breaker pattern for downstream services
  • Limited connection pooling configuration
  • No request queuing for overload scenarios

Observability:

  • Basic OpenTelemetry setup (no custom spans)
  • Limited distributed tracing across services
  • No performance profiling in production
  • Alert configuration not in code

7.2 Areas for Improvementโ€‹

Short-term (1-3 months):

  1. Implement API key encryption and secure storage
  2. Add circuit breaker pattern for service resilience
  3. Enhanced OpenTelemetry with custom spans and metrics
  4. Automated integration test suite with real services
  5. Pre-commit hooks enforcement (already configured)

Medium-term (3-6 months):

  1. Redis Cluster for high availability
  2. Advanced rate limiting with burst handling
  3. Role-Based Access Control (RBAC) system
  4. Request/response caching layer
  5. GraphQL API layer for flexible queries

Long-term (6-12 months):

  1. Multi-region deployment with global load balancing
  2. Service mesh integration (Istio/Linkerd)
  3. Advanced security (mTLS, certificate rotation)
  4. ML-based anomaly detection for security
  5. Self-healing infrastructure automation

7.3 Migration Pathsโ€‹

gRPC to HTTP/REST Migration (Completed):

  • โœ… All services migrated to HTTP/REST
  • โœ… Connection pooling implemented
  • โœ… Trace ID propagation working
  • โœ… Performance comparable to gRPC

Supabase RLS Optimization:

  • Current: Header-based authentication (fast)
  • Previous: Session-based authentication (3+ seconds)
  • Result: 20% performance improvement

Future: GraphQL Layer:

type Query {
workflows(filter: WorkflowFilter): [Workflow!]!
workflow(id: ID!): Workflow
executions(workflowId: ID!, limit: Int): [Execution!]!
}

type Mutation {
createWorkflow(input: CreateWorkflowInput!): Workflow!
executeWorkflow(id: ID!, input: ExecutionInput!): Execution!
updateWorkflow(id: ID!, input: UpdateWorkflowInput!): Workflow!
}

type Subscription {
executionUpdates(executionId: ID!): ExecutionUpdate!
}

8. Appendicesโ€‹

A. Glossaryโ€‹

  • API Gateway: Unified entry point for all client requests with authentication and routing
  • RLS (Row Level Security): PostgreSQL security feature isolating data by user
  • SSE (Server-Sent Events): HTTP streaming protocol for real-time updates
  • MCP (Model Context Protocol): LLM tool discovery and invocation standard
  • JWT (JSON Web Token): Stateless authentication token format
  • ECS (Elastic Container Service): AWS container orchestration platform
  • OpenTelemetry: Observability framework for distributed tracing and metrics
  • Sliding Window: Rate limiting algorithm counting requests in a moving time window
  • Trace ID: Unique identifier for tracking requests across services
  • Connection Pooling: Reusing HTTP connections for better performance

B. Referencesโ€‹

Internal Documentation:

  • /apps/backend/api-gateway/CLAUDE.md - Development guide and commands
  • /apps/backend/CLAUDE.md - Backend architecture overview
  • /docs/tech-design/new_workflow_spec.md - Workflow data models
  • /apps/backend/api-gateway/README.md - Quick start and API examples

External Resources:

API Specifications:

  • OpenAPI/Swagger: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc
  • Health Check: http://localhost:8000/health

Document Version: 2.0.0 Last Updated: January 2025 Architecture Status: Production-ready with three-layer API implementation Next Review: April 2025