""" n8n MCP Server - Hybrid MCP Light implementation for n8n API. Provides 5 curated tools plus an API pass-through for complete API coverage. Follows the Hybrid MCP Light Blueprint pattern. """ import json import os from contextlib import asynccontextmanager from typing import Any import httpx from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Mount, Route load_dotenv() # Configuration N8N_URL = os.getenv("N8N_URL", "").rstrip("/") N8N_API_KEY = os.getenv("N8N_API_KEY", "") # Initialize MCP server mcp = FastMCP( "n8n MCP", instructions="MCP server for n8n API - provides workflow and execution management", ) class N8nClient: """HTTP client for n8n API with API key authentication.""" def __init__(self, base_url: str, api_key: str): self.base_url = base_url.rstrip("/") self.api_url = f"{self.base_url}/api/v1" self.api_key = api_key self._client: httpx.AsyncClient | None = None async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient( headers={ "X-N8N-API-KEY": self.api_key, "Accept": "application/json", "Content-Type": "application/json", }, timeout=30.0, ) return self._client async def close(self): if self._client: await self._client.aclose() self._client = None async def request( self, method: str, endpoint: str, params: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, ) -> dict[str, Any] | list[Any]: """Execute an API request to n8n.""" client = await self._get_client() url = f"{self.api_url}{endpoint}" try: response = await client.request( method=method.upper(), url=url, params=params, json=json_body, ) response.raise_for_status() if response.status_code == 204: return {"status": "success", "message": "No content"} content_type = response.headers.get("content-type", "") if "application/json" in content_type: return response.json() else: return {"content": response.text, "content_type": content_type} except httpx.HTTPStatusError as e: return { "error": True, "status_code": e.response.status_code, "message": str(e), "detail": e.response.text[:500] if e.response.text else None, } except Exception as e: return {"error": True, "message": str(e)} async def health_check(self) -> bool: """Check if n8n is accessible.""" try: client = await self._get_client() # Try the healthz endpoint first healthz_url = f"{self.base_url}/healthz" response = await client.get(healthz_url) if response.status_code == 200: return True # Fallback to listing workflows response = await client.get( f"{self.api_url}/workflows", params={"limit": 1} ) return response.status_code == 200 except Exception: return False # Global client instance client = N8nClient(N8N_URL, N8N_API_KEY) # ============================================================================= # API Reference Resource # ============================================================================= API_REFERENCE = """ # n8n API Quick Reference Base URL: {base_url}/api/v1 ## Common Endpoints for `n8n_api_call` tool ### Workflow Management - GET `/workflows` - List all workflows - Params: limit, cursor, active (bool), tags (array) - GET `/workflows/{{id}}` - Get workflow by ID - POST `/workflows` - Create workflow (body: name, nodes, connections) - PUT `/workflows/{{id}}` - Update workflow - DELETE `/workflows/{{id}}` - Delete workflow - POST `/workflows/{{id}}/activate` - Activate workflow - POST `/workflows/{{id}}/deactivate` - Deactivate workflow ### Execution Management - GET `/executions` - List executions - Params: limit, cursor, workflowId, status (success/error/waiting) - GET `/executions/{{id}}` - Get execution details - Params: includeData (bool) - DELETE `/executions/{{id}}` - Delete execution ### Credentials - GET `/credentials` - List credentials - GET `/credentials/{{id}}` - Get credential (no secrets) - POST `/credentials` - Create credential - PATCH `/credentials/{{id}}` - Update credential - DELETE `/credentials/{{id}}` - Delete credential ### Tags - GET `/tags` - List tags - POST `/tags` - Create tag (body: name) - PATCH `/tags/{{id}}` - Update tag - DELETE `/tags/{{id}}` - Delete tag ### Variables (if enabled) - GET `/variables` - List variables - POST `/variables` - Create variable (body: key, value) - PATCH `/variables/{{id}}` - Update variable - DELETE `/variables/{{id}}` - Delete variable ### Webhook Execution Webhooks are triggered at: {base_url}/webhook/{{path}} or {base_url}/webhook-test/{{path}} Use the `n8n_trigger_webhook` tool for webhook execution. ### Health Check - GET `/healthz` - Check n8n instance health ## Notes - All list endpoints support pagination via `cursor` parameter - Use `limit` to control result count (typically 1-100) - Status filters: success, error, waiting - Active filter: true/false for workflows """.format(base_url=N8N_URL) @mcp.resource("n8n://api-reference") def get_api_reference() -> str: """Returns the n8n API quick reference for using the n8n_api_call tool.""" return API_REFERENCE # ============================================================================= # MCP Tools - Curated Operations (Most Frequently Used) # ============================================================================= @mcp.tool() async def list_workflows( limit: int = 100, cursor: str | None = None, active: bool | None = None, ) -> str: """List all workflows in the n8n instance. Returns workflow metadata including id, name, active status, and tags. Use cursor for pagination when there are more results. Args: limit: Number of workflows to return (1-100, default: 100) cursor: Pagination cursor from previous response active: Filter by active status (optional) """ params: dict[str, Any] = {"limit": min(limit, 100)} if cursor: params["cursor"] = cursor if active is not None: params["active"] = active result = await client.request("GET", "/workflows", params=params) return json.dumps(result) @mcp.tool() async def get_workflow(workflow_id: str) -> str: """Get a workflow by ID. Returns the complete workflow including nodes, connections, settings, and metadata. Args: workflow_id: The workflow ID """ result = await client.request("GET", f"/workflows/{workflow_id}") return json.dumps(result) @mcp.tool() async def list_executions( limit: int = 100, cursor: str | None = None, workflow_id: str | None = None, status: str | None = None, ) -> str: """List workflow executions. Returns execution history with status, timing, and workflow info. Use cursor for pagination when there are more results. Args: limit: Number of executions to return (1-100, default: 100) cursor: Pagination cursor from previous response workflow_id: Filter by specific workflow ID (optional) status: Filter by status: 'success', 'error', or 'waiting' (optional) """ params: dict[str, Any] = {"limit": min(limit, 100)} if cursor: params["cursor"] = cursor if workflow_id: params["workflowId"] = workflow_id if status and status in ("success", "error", "waiting"): params["status"] = status result = await client.request("GET", "/executions", params=params) return json.dumps(result) @mcp.tool() async def get_execution( execution_id: str, include_data: bool = False, ) -> str: """Get execution details by ID. Returns execution status, timing, and optionally the full execution data. Warning: include_data=True can return very large responses for complex workflows. Args: execution_id: The execution ID include_data: Include full execution data (default: False to save context) """ params: dict[str, Any] = {} if include_data: params["includeData"] = True result = await client.request("GET", f"/executions/{execution_id}", params=params) return json.dumps(result) @mcp.tool() async def trigger_webhook( webhook_path: str, method: str = "POST", data: str = "{}", test_mode: bool = False, ) -> str: """Trigger a workflow via webhook. The workflow must be active (or use test_mode for inactive workflows). The webhook path is the path configured in the Webhook node. Args: webhook_path: The webhook path (e.g., 'my-webhook' or full path like '/webhook/my-webhook') method: HTTP method - GET, POST, PUT, DELETE (default: POST) data: JSON string of data to send (default: "{}") test_mode: Use webhook-test endpoint for inactive workflows (default: False) """ # Normalize webhook path path = webhook_path.strip("/") if path.startswith("webhook/") or path.startswith("webhook-test/"): path = path.split("/", 1)[1] base = "webhook-test" if test_mode else "webhook" url = f"{N8N_URL}/{base}/{path}" try: body = json.loads(data) if data else {} except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON data: {e}"}) try: async with httpx.AsyncClient(timeout=120.0) as http_client: response = await http_client.request( method=method.upper(), url=url, json=body if method.upper() != "GET" else None, params=body if method.upper() == "GET" else None, ) return json.dumps( { "status_code": response.status_code, "response": response.json() if response.headers.get("content-type", "").startswith( "application/json" ) else response.text, } ) except Exception as e: return json.dumps({"error": True, "message": str(e)}) # ============================================================================= # API Pass-through Tool # ============================================================================= @mcp.tool() async def n8n_api_call( endpoint: str, method: str = "GET", params: str = "{}", body: str = "{}", ) -> str: """Execute a raw API call to n8n. Use this for any operation not covered by the other tools. Refer to the 'n8n://api-reference' resource for available endpoints. Args: endpoint: API endpoint path (e.g., '/workflows', '/credentials') method: HTTP method (GET, POST, PUT, PATCH, DELETE) params: JSON string of query parameters (optional) body: JSON string of request body for POST/PUT/PATCH (optional) Examples: - Create workflow: n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}') - Activate workflow: n8n_api_call('/workflows/123/activate', 'POST') - List credentials: n8n_api_call('/credentials') - Create tag: n8n_api_call('/tags', 'POST', body='{"name": "production"}') """ try: params_dict = json.loads(params) if params else {} body_dict = json.loads(body) if body else {} except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) result = await client.request( method=method, endpoint=endpoint, params=params_dict if params_dict else None, json_body=body_dict if body_dict else None, ) return json.dumps(result) # ============================================================================= # Starlette Wrapper for Health Checks # ============================================================================= async def health_check(request): """Health check endpoint for Docker/Kubernetes. Validates both server and n8n connectivity. """ n8n_healthy = await client.health_check() status = "ok" if n8n_healthy else "degraded" status_code = 200 if n8n_healthy else 503 return JSONResponse( {"status": status, "n8n_url": N8N_URL, "n8n_connected": n8n_healthy}, status_code=status_code, ) @asynccontextmanager async def lifespan(app): """Manage client lifecycle.""" yield await client.close() def create_app() -> Starlette: """Create the Starlette application with health check and MCP.""" mcp_app = mcp.http_app() # Add health check route directly to the MCP app mcp_app.add_route("/health", health_check, methods=["GET"]) return mcp_app app = create_app() if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "8000")) uvicorn.run(app, host="0.0.0.0", port=port)