API Gateway Technical Design
1. Executive Summaryโ
The API Gateway is the primary entry point for the Workflow Agent Team system, implementing a three-layer API architecture with FastAPI. It provides unified authentication, rate limiting, and request routing for different client types: public consumers, web/mobile applications, and LLM clients.
Key Architectural Decisionsโ
- Three-Layer API Architecture: Separation of public, app, and MCP APIs with distinct authentication patterns
- HTTP/REST Migration: Complete migration from gRPC to HTTP/REST for simplified inter-service communication
- Supabase Integration: Leverages Supabase for authentication, database access with Row Level Security (RLS)
- Redis Caching: Performance optimization with Redis-based rate limiting and session state caching
- FastAPI Framework: Async-first architecture with comprehensive middleware stack
Technology Stackโ
- Framework: FastAPI 0.115.5 with async/await support
- Authentication: Supabase Auth with JWT token validation
- Database: Supabase PostgreSQL with Row Level Security (RLS)
- Cache: Redis for rate limiting and session state
- HTTP Client: httpx for async service-to-service communication
- Observability: OpenTelemetry with custom telemetry middleware
- Deployment: Docker + AWS ECS Fargate
2. System Architectureโ
2.1 High-Level Architectureโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Client Layer โ
โโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโค
โ Web Frontend โ Mobile Apps โ External APIs โ LLM Clients โ
โ (Next.js) โ (iOS/Android) โ (Third-party) โ (Claude/GPT) โ
โโโโโโโโโโฌโโโโโโโโโดโโโโโโโโโฌโโโโโโโโโโดโโโโโโโโโฌโโโโโโโโโดโโโโโโโโโฌโโโโโโโโโโ
โ โ โ โ
โ JWT Token โ JWT Token โ No Auth โ API Key
โ โ โ โ
โผ โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API Gateway (FastAPI - Port 8000) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Middleware Stack โ โ
โ โ 1. CORS Middleware โ โ
โ โ 2. Tracking Middleware (trace_id generation) โ โ
โ โ 3. Metrics Middleware (OpenTelemetry) โ โ
โ โ 4. Rate Limit Middleware (Redis sliding window) โ โ
โ โ 5. Unified Auth Middleware (layer-based auth) โ โ
โ โ 6. Request Logging Middleware (process time tracking) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Public API โ App API โ MCP API โ โ
โ โ /api/v1/public โ /api/v1/app โ /api/v1/mcp โ โ
โ โ โ โ โ โ
โ โ โ No Auth โ โ JWT Auth โ โ API Key Auth โ โ
โ โ โ Rate Limited โ โ RLS Enabled โ โ Scoped Permissions โ โ
โ โ โ Health/Status โ โ Sessions โ โ Tool Discovery โ โ
โ โ โ Docs/Webhook โ โ Chat/Workflowsโ โ Tool Invocation โ โ
โ โโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โ HTTP/REST โ HTTP/REST โ HTTP/REST
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ
โ Workflow Agent โ โ Workflow Engine โ โ Workflow Scheduler โ
โ (Port 8001) โ โ (Port 8002) โ โ (Port 8003) โ
โ AI Generation โ โ Execution โ โ Trigger Management โ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโ
โ Supabase โ
โ - PostgreSQL + RLS โ
โ - Auth + JWT โ
โ - Vector Store โ
โโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโ
โ Redis โ
โ - Rate Limiting โ
โ - Session Cache โ
โโโโโโโโโโโโโโโโโโโโโโโโโ
2.2 Component Architectureโ
API Layer Organizationโ
apps/backend/api-gateway/
โโโ app/
โ โโโ main.py # Application factory with middleware stack
โ โโโ core/ # Core infrastructure
โ โ โโโ config.py # Settings with Pydantic validation
โ โ โโโ database.py # Supabase integration with RLS
โ โ โโโ database_direct.py # Direct PostgreSQL for high performance
โ โ โโโ events.py # Lifespan events and health checks
โ โโโ api/ # Three-layer API structure
โ โ โโโ public/ # Public API endpoints
โ โ โ โโโ health.py # Service health checks
โ โ โ โโโ status.py # System status information
โ โ โ โโโ docs.py # API documentation
โ โ โ โโโ auth.py # OAuth callback handlers
โ โ โ โโโ workflows.py # Public workflow information
โ โ โ โโโ webhooks.py # Webhook receivers
โ โ โ โโโ validation.py # Workflow validation service
โ โ โโโ app/ # App API endpoints (JWT auth)
โ โ โ โโโ sessions.py # Session management with RLS
โ โ โ โโโ chat.py # SSE streaming chat interface
โ โ โ โโโ workflows.py # Workflow CRUD operations
โ โ โ โโโ executions.py # Execution management and logs
โ โ โ โโโ integrations.py # OAuth integration management
โ โ โโโ mcp/ # MCP API endpoints (API key auth)
โ โ โโโ tools.py # Tool discovery and invocation
โ โ โโโ slack_tools.py # Slack MCP integration
โ โ โโโ notion_tools.py # Notion MCP integration
โ โ โโโ gmail_tools.py # Gmail MCP integration
โ โ โโโ github_tools.py # GitHub MCP integration
โ โโโ middleware/ # Middleware components
โ โ โโโ auth.py # Unified authentication middleware
โ โ โโโ rate_limit.py # Redis-based rate limiting
โ โ โโโ validation.py # Request validation
โ โโโ services/ # Business logic services
โ โ โโโ auth_service.py # Supabase authentication
โ โ โโโ workflow_agent_http_client.py # HTTP client for workflow agent
โ โ โโโ workflow_engine_http_client.py # HTTP client for workflow engine
โ โ โโโ workflow_scheduler_http_client.py # HTTP client for scheduler
โ โ โโโ state_manager.py # Session state management
โ โ โโโ response_processor.py # SSE response processing
โ โ โโโ cache.py # Redis cache operations
โ โโโ models/ # Pydantic data models
โ โ โโโ __init__.py # Shared models
โ โโโ utils/ # Utility functions
โ โ โโโ logger.py # Structured logging
โ โ โโโ sse.py # Server-Sent Events utilities
โ โ โโโ node_converter.py # Node format conversion
โ โโโ dependencies.py # FastAPI dependency injection
โ โโโ exceptions.py # Custom exception handlers
โโโ tests/ # Test suite
โ โโโ test_basic.py # Core functionality tests
โ โโโ test_integration.py # Integration tests
โ โโโ interactive_chat_client.py # Manual testing client
โโโ pyproject.toml # uv package configuration
โโโ Dockerfile # Multi-stage Docker build
โโโ .env.example # Environment template
3. Data Architectureโ
3.1 Data Modelsโ
Core Pydantic Modelsโ
Authentication Models (app/middleware/auth.py):
class AuthResult:
success: bool
user: Optional[Dict[str, Any]] # Supabase user data
client: Optional[Dict[str, Any]] # MCP client data
token: Optional[str]
error: Optional[str]
class MCPApiKey:
id: str
client_name: str
scopes: List[str] # ["tools:read", "tools:execute", "health:check"]
rate_limit_tier: str # "standard", "premium", "development"
active: bool
created_at: datetime
expires_at: Optional[datetime]
Session Models:
class SessionCreate:
action: str # "create", "edit", "copy"
workflow_id: Optional[str]
meta_data: Dict[str, Any]
class SessionResponse:
session_id: str
user_id: str
created_at: str
updated_at: str
Chat Models:
class ChatRequest:
session_id: str
user_message: str
workflow_context: Optional[Dict[str, Any]]
class ConversationResponse:
session_id: str
response_type: str # "MESSAGE", "STATUS", "ERROR", "FINAL"
is_final: bool
message: Optional[str]
status: Optional[Dict[str, Any]]
error: Optional[Dict[str, Any]]
Workflow Models:
class WorkflowCreateRequest:
user_id: str
name: str
description: Optional[str]
nodes: List[Dict[str, Any]]
connections: Union[List[Dict[str, str]], Dict[str, Any]] # Supports both formats
triggers: Optional[List[str]]
settings: Optional[Dict[str, Any]]
tags: Optional[List[str]]
class WorkflowResponse:
workflow_id: str
name: str
description: Optional[str]
nodes: List[Dict[str, Any]]
connections: List[Dict[str, str]]
triggers: List[str]
created_by: str
created_time_ms: int
3.2 Data Flowโ
Session and Chat Data Flowโ
Workflow Execution Flowโ
4. Implementation Detailsโ
4.1 Core Componentsโ
FastAPI Application Factory (app/main.py)โ
def create_application() -> FastAPI:
"""Factory pattern application creation"""
app = FastAPI(
title="Workflow Agent API Gateway",
description="Three-layer API architecture: Public, App, MCP",
version="2.0.0",
lifespan=lifespan, # Async lifespan events
debug=settings.DEBUG,
)
# CORS configuration - Allow all origins for development
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Middleware stack (order matters!)
# 1. Tracking middleware (generates trace_id)
# 2. Metrics middleware (OpenTelemetry)
# 3. Rate limiting middleware
# 4. Authentication middleware
# 5. Request logging middleware
# Register routes
app.include_router(public_router, prefix="/api/v1/public", tags=["Public API"])
app.include_router(app_router, prefix="/api/v1/app", tags=["App API"])
app.include_router(mcp_router, prefix="/api/v1/mcp", tags=["MCP API"])
return app
Lifespan Events (app/core/events.py):
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events"""
# Startup: Connect to workflow services
await workflow_agent_client.connect()
await workflow_engine_client.connect()
yield
# Shutdown: Close connections
await workflow_agent_client.close()
await workflow_engine_client.close()
4.2 Three-Layer Authenticationโ
Unified Authentication Middleware (app/middleware/auth.py)โ
async def unified_auth_middleware(request: Request, call_next):
"""Layer-based authentication routing"""
path = request.url.path
# Public API - No authentication, rate limited only
if path.startswith("/api/v1/public/"):
return await call_next(request)
# MCP API - API Key authentication
if path.startswith("/api/v1/mcp/"):
if not settings.MCP_API_KEY_REQUIRED:
return await call_next(request)
auth_result = await authenticate_mcp_client(request)
if not auth_result.success:
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": f"MCP authentication failed: {auth_result.error}",
}
)
request.state.client = auth_result.client
request.state.auth_type = "mcp_api_key"
# App API - Supabase JWT authentication
elif path.startswith("/api/v1/app/"):
if not settings.SUPABASE_AUTH_ENABLED:
return await call_next(request)
auth_result = await authenticate_supabase_user(request)
if not auth_result.success:
return JSONResponse(
status_code=401 if auth_result.error != "malformed_token" else 400,
content={
"error": "unauthorized",
"message": f"Authentication failed: {auth_result.error}",
}
)
request.state.user = auth_result.user
request.state.user_id = auth_result.user.get("sub")
request.state.access_token = auth_result.token
request.state.auth_type = "supabase"
return await call_next(request)
Supabase JWT Validation:
async def authenticate_supabase_user(request: Request) -> AuthResult:
"""Validate Supabase JWT token"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return AuthResult(success=False, error="missing_token")
token = auth_header.split(" ", 1)[1].strip()
# Validate JWT format (3 segments separated by dots)
if not _validate_jwt_format_middleware(token):
return AuthResult(success=False, error="malformed_token")
# Verify with Supabase
user_data = await verify_supabase_token(token)
if not user_data:
return AuthResult(success=False, error="invalid_token")
return AuthResult(success=True, user=user_data, token=token)
MCP API Key Authentication:
class MCPAuthenticator:
"""API Key management and verification"""
def verify_api_key(self, api_key: str) -> Optional[MCPApiKey]:
"""Verify API key and check activation/expiration"""
api_key_obj = self.api_keys.get(api_key)
if not api_key_obj or not api_key_obj.active:
return None
if api_key_obj.expires_at and datetime.now(timezone.utc) > api_key_obj.expires_at:
return None
return api_key_obj
def has_required_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> bool:
"""Check scope permissions"""
if "admin" in user_scopes:
return True
return any(scope in user_scopes for scope in required_scopes)
4.3 Rate Limiting Systemโ
Redis-Based Sliding Window (app/middleware/rate_limit.py)โ
Rate Limit Configuration:
class RateLimitConfig:
# Public API - Per IP address
PUBLIC_LIMITS = {
"global": "1000/hour",
"/api/v1/public/health": "100/minute",
"/api/v1/public/status": "60/minute",
}
# App API - Per authenticated user
APP_LIMITS = {
"authenticated_user": "10000/hour",
"/api/v1/app/chat/stream": "100/hour",
"/api/v1/app/workflows/execute": "100/hour",
}
# MCP API - Per API key
MCP_LIMITS = {
"api_client": "50000/hour",
"/api/v1/mcp/invoke": "1000/hour",
}
Sliding Window Algorithm:
async def check_rate_limit(key: str, limit_str: str, request: Request) -> tuple[bool, Dict[str, Any]]:
"""Redis sliding window rate limiting"""
count, window_seconds = self._parse_limit(limit_str)
current_time = int(time.time())
window_start = current_time - window_seconds
# Redis pipeline for atomic operations
pipe = redis_client.pipeline()
# Remove expired entries
pipe.zremrangebyscore(key, 0, window_start)
# Get current count
pipe.zcard(key)
# Add current request
pipe.zadd(key, {f"{current_time}:{hash(str(request.url))}": current_time})
# Set expiration
pipe.expire(key, window_seconds + 10)
results = pipe.execute()
current_count = results[1]
allowed = current_count < count
return allowed, {
"current_count": current_count,
"limit": count,
"window_seconds": window_seconds,
"reset_time": current_time + (window_seconds - (current_time % window_seconds)),
}
4.4 Service Orchestrationโ
Workflow Agent HTTP Client (app/services/workflow_agent_http_client.py)โ
SSE Streaming Communication:
async def process_conversation_stream(
self,
session_id: str,
user_message: str,
user_id: str = "anonymous",
workflow_context: Optional[Dict[str, Any]] = None,
access_token: Optional[str] = None,
trace_id: Optional[str] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream conversation processing via HTTP SSE"""
request_data = {
"session_id": session_id,
"user_id": user_id,
"user_message": user_message,
"access_token": access_token or "",
}
if workflow_context:
request_data["workflow_context"] = {
"origin": workflow_context.get("origin", "create"),
"source_workflow_id": workflow_context.get("source_workflow_id", ""),
}
async with httpx.AsyncClient(timeout=self.timeout, http2=True) as client:
async with client.stream(
"POST",
f"{self.base_url}/process-conversation",
json=request_data,
headers={
"Accept": "text/event-stream",
"X-Trace-ID": trace_id or "",
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:]) # Remove "data: " prefix
yield data
Timeout Configuration:
self.timeout = httpx.Timeout(
timeout=1800.0, # 30 minutes total for complex workflows
connect=15.0, # 15 seconds connection timeout
read=600.0, # 10 minutes read timeout per chunk
write=60.0, # 60 seconds write timeout
)
Workflow Engine HTTP Client (app/services/workflow_engine_http_client.py)โ
Workflow Creation:
async def create_workflow(
self,
name: str,
nodes: List[Dict[str, Any]],
connections: Union[List[Dict], Dict],
user_id: str = "anonymous",
trace_id: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""Create workflow in Workflow Engine v2"""
# Convert to v2 format
v2_nodes = [
{
"id": node.get("id") or node.get("name"),
"name": node.get("name"),
"type": node.get("type"),
"subtype": node.get("subtype"),
"configurations": node.get("configurations") or node.get("parameters", {}),
"attached_nodes": node.get("attached_nodes"),
}
for node in nodes
]
v2_payload = {
"workflow_id": str(uuid.uuid4()),
"name": name,
"created_by": user_id,
"nodes": v2_nodes,
"connections": _convert_connections(connections),
"triggers": [n["id"] for n in v2_nodes if n["type"] == "TRIGGER"],
}
async with httpx.AsyncClient(timeout=self.query_timeout) as client:
response = await client.post(
f"{self.base_url}/v2/workflows",
json=v2_payload,
headers={"X-Trace-ID": trace_id} if trace_id else {}
)
response.raise_for_status()
return response.json()
Workflow Execution:
async def execute_workflow(
self,
workflow_id: str,
user_id: str,
input_data: Optional[Dict[str, Any]] = None,
async_execution: bool = False,
access_token: Optional[str] = None,
) -> Dict[str, Any]:
"""Execute workflow with RLS"""
headers = {}
if access_token:
headers["Authorization"] = f"Bearer {access_token}"
v2_payload = {
"trigger_type": "manual",
"trigger_data": input_data or {},
"async_execution": async_execution,
"user_id": user_id,
}
client = await self._get_client()
response = await client.post(
f"{self.base_url}/v2/workflows/{workflow_id}/execute",
json=v2_payload,
headers=headers,
timeout=self.execute_timeout, # 5 minutes for long-running workflows
)
response.raise_for_status()
return response.json()
5. System Interactionsโ
5.1 Internal Service Communicationโ
HTTP/REST Communication Pattern:
- All services migrated from gRPC to HTTP/REST
- Service discovery via environment variables
- Connection pooling with httpx for performance
- Trace ID propagation for distributed tracing
Service Endpoints:
Workflow Agent (8001):
POST /process-conversation # SSE streaming workflow generation
GET /health # Health check
Workflow Engine (8002):
POST /v2/workflows # Create workflow
GET /v2/workflows/{id} # Get workflow
POST /v2/workflows/{id}/execute # Execute workflow
GET /v2/executions/{id} # Execution status
GET /v2/executions/{id}/logs # Execution logs
Workflow Scheduler (8003):
POST /triggers # Create trigger
GET /triggers/{id} # Get trigger status
5.2 External Integrationsโ
Supabase Integrationโ
Authentication Flow:
# JWT token verification
async def verify_supabase_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT with Supabase Auth"""
try:
user_response = supabase.auth.get_user(token)
if user_response.user:
return {
"sub": user_response.user.id,
"email": user_response.user.email,
"email_confirmed": user_response.user.email_confirmed_at is not None,
}
except Exception as e:
log_error(f"Token verification failed: {e}")
return None
Row Level Security (RLS):
class SupabaseRepository:
"""RLS-enabled repository pattern"""
def create(self, data: dict, access_token: Optional[str] = None) -> Optional[dict]:
"""Create with RLS enforcement"""
if access_token:
# User-scoped operation with RLS
client = get_user_supabase_client(access_token)
return client.table(self.table_name).insert(data).execute()
else:
# Admin operation bypassing RLS
return self.client.table(self.table_name).insert(data).execute()
Performance Optimization:
# Header-based auth (fast) vs session-based auth (slow)
# โ SLOW - Avoid making HTTP call on every request
client.auth.set_session(access_token, access_token)
# โ
FAST - Use headers for authentication
client.headers["Authorization"] = f"Bearer {access_token}"
OAuth Integration Managementโ
Supported Integrations:
- Slack: OAuth for workspace integration
- Notion: OAuth for page/database access
- Google: OAuth for Gmail and Calendar
- GitHub: OAuth for repository operations
OAuth Callback Handler (app/api/public/auth.py):
@router.get("/auth/{provider}/callback")
async def oauth_callback(
provider: str,
code: str,
state: str,
request: Request,
):
"""Handle OAuth callbacks from external providers"""
# Exchange code for access token
# Store in user_integrations table
# Redirect to frontend success page
Integration Storage (Supabase user_integrations table):
CREATE TABLE user_integrations (
id UUID PRIMARY KEY,
user_id UUID REFERENCES auth.users NOT NULL,
provider TEXT NOT NULL,
access_token TEXT NOT NULL,
refresh_token TEXT,
expires_at TIMESTAMPTZ,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
6. Non-Functional Requirementsโ
6.1 Performanceโ
Targets:
- API Gateway response time: < 50ms (excluding downstream services)
- SSE streaming latency: < 100ms first byte
- Rate limit check: < 5ms (Redis)
- Supabase query: < 100ms with RLS
Optimization Strategies:
- Connection pooling for HTTP clients (max 20 connections)
- HTTP/2 multiplexing for service communication
- Redis caching for rate limiting and session state
- Async/await throughout the stack
- Header-based Supabase auth (not session-based)
Caching Strategy:
# Redis TTL configuration
CACHE_TTL_JWT_TOKEN = 1800 # 30 minutes
CACHE_TTL_USER_INFO = 900 # 15 minutes
CACHE_TTL_RATE_LIMIT = 3600 # 1 hour
CACHE_TTL_SESSION_STATE = 7200 # 2 hours
6.2 Scalabilityโ
Horizontal Scaling:
- Stateless application design
- Redis for shared state (rate limits, session cache)
- Supabase connection pooling
- AWS ECS auto-scaling based on CPU/memory
Load Distribution:
AWS Application Load Balancer
โ
โโโโโโดโโโโโฌโโโโโโโโโฌโโโโโโโโโ
โ โ โ โ
ECS Task ECS Task ECS Task ECS Task
(API GW) (API GW) (API GW) (API GW)
โ โ โ โ
โโโโโโฌโโโโโดโโโโโโโโโดโโโโโโโโโ
โ
Shared Redis & Supabase
Resource Considerations:
- CPU: 0.5 vCPU per task (ECS Fargate)
- Memory: 1GB per task
- Redis: Single instance (ElastiCache for production)
- Concurrent connections: ~100 per task with connection pooling
6.3 Securityโ
Authentication Measures:
- JWT token validation with Supabase
- API key hashing and secure storage (future enhancement)
- Token expiration and refresh handling
- Rate limiting per user/IP/API key
Authorization Approach:
- Row Level Security (RLS) for user data isolation
- API key scopes for fine-grained MCP permissions
- Admin service role key for system operations
Data Protection:
- HTTPS/TLS for all external communication
- Secure headers (CORS, CSP)
- JWT tokens in Authorization header (not cookies)
- No sensitive data in logs or traces
Input Validation:
# Pydantic models for request validation
class ChatRequest(BaseModel):
session_id: str = Field(..., min_length=1, max_length=255)
user_message: str = Field(..., min_length=1, max_length=10000)
workflow_context: Optional[Dict[str, Any]] = None
# Middleware validation
@app.middleware("http")
async def validation_middleware(request: Request, call_next):
# JWT format validation before Supabase call
if not _validate_jwt_format_middleware(token):
return JSONResponse(status_code=400, content={"error": "malformed_token"})
6.4 Reliabilityโ
Error Handling:
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
log_exception(f"Unhandled exception: {type(exc).__name__}: {str(exc)}")
return JSONResponse(
status_code=500,
content={
"error": "internal_server_error",
"message": "Internal server error occurred",
}
)
# Structured error responses
{
"error": "error_type",
"message": "Human-readable description",
"error_code": "SPECIFIC_ERROR_CODE",
"details": {...},
"trace_id": "uuid"
}
Retry Mechanisms:
- HTTP client automatic retries (3 attempts) for network errors
- Exponential backoff for transient failures
- Circuit breaker pattern for failing downstream services (future)
Health Checks:
@app.get("/health")
async def health_check():
"""Comprehensive health check"""
checks = {
"api_gateway": True,
"workflow_agent": await check_workflow_agent_health(),
"workflow_engine": await check_workflow_engine_health(),
"supabase": await check_supabase_connection(),
"redis": await check_redis_connection(),
}
status = "healthy" if all(checks.values()) else "degraded"
return {"status": status, "checks": checks}
AWS ECS Health Check Configuration:
Health Check Command: curl -f http://localhost:8000/health || exit 1
Start Period: 120 seconds
Interval: 30 seconds
Timeout: 5 seconds
Retries: 3
6.5 Testing & Observabilityโ
Testing Strategyโ
Unit Testing:
# Run comprehensive test suite
uv run pytest tests/ -v
# Current test coverage (12 tests)
tests/test_basic.py:
- Root endpoint response validation
- Health check availability
- Version endpoint verification
- Authentication requirement for App API
- Authentication requirement for MCP API
- CORS headers validation
- Request ID generation
- Process time tracking
- Application factory pattern
- Configuration loading
Integration Testing:
# Test authentication flow
async def test_supabase_auth_flow():
# Obtain JWT token from Supabase
token = await get_test_jwt_token()
# Create authenticated session
response = await client.post(
"/api/v1/app/sessions",
headers={"Authorization": f"Bearer {token}"},
json={"action": "create"}
)
assert response.status_code == 200
Manual Testing:
# Interactive chat client
python tests/interactive_chat_client.py
# Health check
curl http://localhost:8000/health
# Authenticated workflow creation
curl -X POST http://localhost:8000/api/v1/app/workflows \
-H "Authorization: Bearer ${JWT_TOKEN}" \
-H "Content-Type: application/json" \
-d @workflow.json
Observabilityโ
OpenTelemetry Integration:
# Automatic trace generation
setup_telemetry(
app,
service_name="api-gateway",
service_version="2.0.0",
otlp_endpoint="http://otel-collector:4317"
)
# Custom span attributes
request.state.trace_id = generate_trace_id()
request.state.user_id = user.get("sub")
Key Metrics:
- Request rate per API layer (public/app/mcp)
- Response time percentiles (p50, p95, p99)
- Error rate by error type
- Rate limit rejections per user/IP
- Concurrent SSE streams
- Downstream service latency
Logging Strategy:
# Structured logging with emoji indicators
logger.info(f"๐จ {method} {path} [Trace:{trace_id}]") # Request start
logger.info(f"โ
Supabase auth successful: {user_email}") # Auth success
logger.warning(f"๐ซ Rate limit exceeded: {path}") # Rate limit
logger.error(f"โ HTTP error: {status_code}") # Errors
logger.info(f"๐ค {method} {path} -> {status} [{time}ms]") # Response
Log Levels:
- DEBUG: Development with detailed request/response bodies
- INFO: Production with request flow and key events
- WARNING: Rate limits, auth failures, degraded service
- ERROR: Exceptions, service failures, data errors
Monitoring & Alertingโ
Dashboard Metrics:
API Gateway Dashboard:
- Request rate by API layer
- Authentication success/failure rate
- Rate limit hit rate
- Response time histogram
- Error rate by status code
- Active SSE streams
- Downstream service health
Alert Thresholds:
- Error rate > 5% for 5 minutes โ Page on-call
- Response time p95 > 500ms for 10 minutes โ Warning
- Rate limit rejection > 20% for 5 minutes โ Warning
- Health check failure > 3 consecutive โ Page on-call
- Redis connection failure โ Immediate page
SLIs/SLOs:
Service Level Indicators:
- Availability: 99.9% uptime
- Latency: 95% of requests \< 200ms
- Error rate: \< 1% of requests
- Authentication success: \> 99%
Service Level Objectives:
- Monthly uptime: 99.9% (43 minutes downtime/month)
- P95 latency: \< 200ms for App API
- P99 latency: \< 500ms for App API
7. Technical Debt and Future Considerationsโ
7.1 Known Limitationsโ
Authentication & Security:
- API keys stored in configuration (not encrypted)
- No API key rotation mechanism
- Basic scope validation (no complex RBAC)
- Rate limiting per-layer only (no per-endpoint customization in code)
Performance:
- Single Redis instance (no cluster/replication)
- No circuit breaker pattern for downstream services
- Limited connection pooling configuration
- No request queuing for overload scenarios
Observability:
- Basic OpenTelemetry setup (no custom spans)
- Limited distributed tracing across services
- No performance profiling in production
- Alert configuration not in code
7.2 Areas for Improvementโ
Short-term (1-3 months):
- Implement API key encryption and secure storage
- Add circuit breaker pattern for service resilience
- Enhanced OpenTelemetry with custom spans and metrics
- Automated integration test suite with real services
- Pre-commit hooks enforcement (already configured)
Medium-term (3-6 months):
- Redis Cluster for high availability
- Advanced rate limiting with burst handling
- Role-Based Access Control (RBAC) system
- Request/response caching layer
- GraphQL API layer for flexible queries
Long-term (6-12 months):
- Multi-region deployment with global load balancing
- Service mesh integration (Istio/Linkerd)
- Advanced security (mTLS, certificate rotation)
- ML-based anomaly detection for security
- Self-healing infrastructure automation
7.3 Migration Pathsโ
gRPC to HTTP/REST Migration (Completed):
- โ All services migrated to HTTP/REST
- โ Connection pooling implemented
- โ Trace ID propagation working
- โ Performance comparable to gRPC
Supabase RLS Optimization:
- Current: Header-based authentication (fast)
- Previous: Session-based authentication (3+ seconds)
- Result: 20% performance improvement
Future: GraphQL Layer:
type Query {
workflows(filter: WorkflowFilter): [Workflow!]!
workflow(id: ID!): Workflow
executions(workflowId: ID!, limit: Int): [Execution!]!
}
type Mutation {
createWorkflow(input: CreateWorkflowInput!): Workflow!
executeWorkflow(id: ID!, input: ExecutionInput!): Execution!
updateWorkflow(id: ID!, input: UpdateWorkflowInput!): Workflow!
}
type Subscription {
executionUpdates(executionId: ID!): ExecutionUpdate!
}
8. Appendicesโ
A. Glossaryโ
- API Gateway: Unified entry point for all client requests with authentication and routing
- RLS (Row Level Security): PostgreSQL security feature isolating data by user
- SSE (Server-Sent Events): HTTP streaming protocol for real-time updates
- MCP (Model Context Protocol): LLM tool discovery and invocation standard
- JWT (JSON Web Token): Stateless authentication token format
- ECS (Elastic Container Service): AWS container orchestration platform
- OpenTelemetry: Observability framework for distributed tracing and metrics
- Sliding Window: Rate limiting algorithm counting requests in a moving time window
- Trace ID: Unique identifier for tracking requests across services
- Connection Pooling: Reusing HTTP connections for better performance
B. Referencesโ
Internal Documentation:
/apps/backend/api-gateway/CLAUDE.md- Development guide and commands/apps/backend/CLAUDE.md- Backend architecture overview/docs/tech-design/new_workflow_spec.md- Workflow data models/apps/backend/api-gateway/README.md- Quick start and API examples
External Resources:
- FastAPI Documentation
- Supabase Documentation
- OpenTelemetry Python
- httpx Documentation
- Redis Documentation
API Specifications:
- OpenAPI/Swagger:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc - Health Check:
http://localhost:8000/health
Document Version: 2.0.0 Last Updated: January 2025 Architecture Status: Production-ready with three-layer API implementation Next Review: April 2025