All checks were successful
Build and Push n8n MCP Docker Image / build (push) Successful in 19s
Handle the case where MCP clients may pass JSON as either a string or dict object. Adds _normalize_json_param helper to handle both formats gracefully, preventing Pydantic validation errors.
443 lines
14 KiB
Python
443 lines
14 KiB
Python
"""
|
|
n8n MCP Server - Hybrid MCP Light implementation for n8n API.
|
|
|
|
Provides 5 curated tools plus an API pass-through for complete API coverage.
|
|
Follows the Hybrid MCP Light Blueprint pattern.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastmcp import FastMCP
|
|
from starlette.applications import Starlette
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Mount, Route
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
N8N_URL = os.getenv("N8N_URL", "").rstrip("/")
|
|
N8N_API_KEY = os.getenv("N8N_API_KEY", "")
|
|
|
|
# Initialize MCP server
|
|
mcp = FastMCP(
|
|
"n8n MCP",
|
|
instructions="MCP server for n8n API - provides workflow and execution management",
|
|
)
|
|
|
|
|
|
class N8nClient:
|
|
"""HTTP client for n8n API with API key authentication."""
|
|
|
|
def __init__(self, base_url: str, api_key: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_url = f"{self.base_url}/api/v1"
|
|
self.api_key = api_key
|
|
self._client: httpx.AsyncClient | None = None
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
if self._client is None:
|
|
self._client = httpx.AsyncClient(
|
|
headers={
|
|
"X-N8N-API-KEY": self.api_key,
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
},
|
|
timeout=30.0,
|
|
)
|
|
return self._client
|
|
|
|
async def close(self):
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
|
|
async def request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
params: dict[str, Any] | None = None,
|
|
json_body: dict[str, Any] | None = None,
|
|
) -> dict[str, Any] | list[Any]:
|
|
"""Execute an API request to n8n."""
|
|
client = await self._get_client()
|
|
url = f"{self.api_url}{endpoint}"
|
|
|
|
try:
|
|
response = await client.request(
|
|
method=method.upper(),
|
|
url=url,
|
|
params=params,
|
|
json=json_body,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
if response.status_code == 204:
|
|
return {"status": "success", "message": "No content"}
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
if "application/json" in content_type:
|
|
return response.json()
|
|
else:
|
|
return {"content": response.text, "content_type": content_type}
|
|
except httpx.HTTPStatusError as e:
|
|
return {
|
|
"error": True,
|
|
"status_code": e.response.status_code,
|
|
"message": str(e),
|
|
"detail": e.response.text[:500] if e.response.text else None,
|
|
}
|
|
except Exception as e:
|
|
return {"error": True, "message": str(e)}
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Check if n8n is accessible."""
|
|
try:
|
|
client = await self._get_client()
|
|
# Try the healthz endpoint first
|
|
healthz_url = f"{self.base_url}/healthz"
|
|
response = await client.get(healthz_url)
|
|
if response.status_code == 200:
|
|
return True
|
|
# Fallback to listing workflows
|
|
response = await client.get(
|
|
f"{self.api_url}/workflows", params={"limit": 1}
|
|
)
|
|
return response.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# Global client instance
|
|
client = N8nClient(N8N_URL, N8N_API_KEY)
|
|
|
|
|
|
# =============================================================================
|
|
# API Reference Resource
|
|
# =============================================================================
|
|
|
|
API_REFERENCE = """
|
|
# n8n API Quick Reference
|
|
|
|
Base URL: {base_url}/api/v1
|
|
|
|
## Common Endpoints for `n8n_api_call` tool
|
|
|
|
### Workflow Management
|
|
- GET `/workflows` - List all workflows
|
|
- Params: limit, cursor, active (bool), tags (array)
|
|
- GET `/workflows/{{id}}` - Get workflow by ID
|
|
- POST `/workflows` - Create workflow (body: name, nodes, connections)
|
|
- PUT `/workflows/{{id}}` - Update workflow
|
|
- DELETE `/workflows/{{id}}` - Delete workflow
|
|
- POST `/workflows/{{id}}/activate` - Activate workflow
|
|
- POST `/workflows/{{id}}/deactivate` - Deactivate workflow
|
|
|
|
### Execution Management
|
|
- GET `/executions` - List executions
|
|
- Params: limit, cursor, workflowId, status (success/error/waiting)
|
|
- GET `/executions/{{id}}` - Get execution details
|
|
- Params: includeData (bool)
|
|
- DELETE `/executions/{{id}}` - Delete execution
|
|
|
|
### Credentials
|
|
- GET `/credentials` - List credentials
|
|
- GET `/credentials/{{id}}` - Get credential (no secrets)
|
|
- POST `/credentials` - Create credential
|
|
- PATCH `/credentials/{{id}}` - Update credential
|
|
- DELETE `/credentials/{{id}}` - Delete credential
|
|
|
|
### Tags
|
|
- GET `/tags` - List tags
|
|
- POST `/tags` - Create tag (body: name)
|
|
- PATCH `/tags/{{id}}` - Update tag
|
|
- DELETE `/tags/{{id}}` - Delete tag
|
|
|
|
### Variables (if enabled)
|
|
- GET `/variables` - List variables
|
|
- POST `/variables` - Create variable (body: key, value)
|
|
- PATCH `/variables/{{id}}` - Update variable
|
|
- DELETE `/variables/{{id}}` - Delete variable
|
|
|
|
### Webhook Execution
|
|
Webhooks are triggered at: {base_url}/webhook/{{path}} or {base_url}/webhook-test/{{path}}
|
|
Use the `n8n_trigger_webhook` tool for webhook execution.
|
|
|
|
### Health Check
|
|
- GET `/healthz` - Check n8n instance health
|
|
|
|
## Notes
|
|
- All list endpoints support pagination via `cursor` parameter
|
|
- Use `limit` to control result count (typically 1-100)
|
|
- Status filters: success, error, waiting
|
|
- Active filter: true/false for workflows
|
|
""".format(base_url=N8N_URL)
|
|
|
|
|
|
@mcp.resource("n8n://api-reference")
|
|
def get_api_reference() -> str:
|
|
"""Returns the n8n API quick reference for using the n8n_api_call tool."""
|
|
return API_REFERENCE
|
|
|
|
|
|
# =============================================================================
|
|
# MCP Tools - Curated Operations (Most Frequently Used)
|
|
# =============================================================================
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_workflows(
|
|
limit: int = 100,
|
|
cursor: str | None = None,
|
|
active: bool | None = None,
|
|
) -> str:
|
|
"""List all workflows in the n8n instance.
|
|
|
|
Returns workflow metadata including id, name, active status, and tags.
|
|
Use cursor for pagination when there are more results.
|
|
|
|
Args:
|
|
limit: Number of workflows to return (1-100, default: 100)
|
|
cursor: Pagination cursor from previous response
|
|
active: Filter by active status (optional)
|
|
"""
|
|
params: dict[str, Any] = {"limit": min(limit, 100)}
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
if active is not None:
|
|
params["active"] = active
|
|
|
|
result = await client.request("GET", "/workflows", params=params)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_workflow(workflow_id: str) -> str:
|
|
"""Get a workflow by ID.
|
|
|
|
Returns the complete workflow including nodes, connections, settings, and metadata.
|
|
|
|
Args:
|
|
workflow_id: The workflow ID
|
|
"""
|
|
result = await client.request("GET", f"/workflows/{workflow_id}")
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_executions(
|
|
limit: int = 100,
|
|
cursor: str | None = None,
|
|
workflow_id: str | None = None,
|
|
status: str | None = None,
|
|
) -> str:
|
|
"""List workflow executions.
|
|
|
|
Returns execution history with status, timing, and workflow info.
|
|
Use cursor for pagination when there are more results.
|
|
|
|
Args:
|
|
limit: Number of executions to return (1-100, default: 100)
|
|
cursor: Pagination cursor from previous response
|
|
workflow_id: Filter by specific workflow ID (optional)
|
|
status: Filter by status: 'success', 'error', or 'waiting' (optional)
|
|
"""
|
|
params: dict[str, Any] = {"limit": min(limit, 100)}
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
if workflow_id:
|
|
params["workflowId"] = workflow_id
|
|
if status and status in ("success", "error", "waiting"):
|
|
params["status"] = status
|
|
|
|
result = await client.request("GET", "/executions", params=params)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_execution(
|
|
execution_id: str,
|
|
include_data: bool = False,
|
|
) -> str:
|
|
"""Get execution details by ID.
|
|
|
|
Returns execution status, timing, and optionally the full execution data.
|
|
Warning: include_data=True can return very large responses for complex workflows.
|
|
|
|
Args:
|
|
execution_id: The execution ID
|
|
include_data: Include full execution data (default: False to save context)
|
|
"""
|
|
params: dict[str, Any] = {}
|
|
if include_data:
|
|
params["includeData"] = True
|
|
|
|
result = await client.request("GET", f"/executions/{execution_id}", params=params)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def trigger_webhook(
|
|
webhook_path: str,
|
|
method: str = "POST",
|
|
data: str = "{}",
|
|
test_mode: bool = False,
|
|
) -> str:
|
|
"""Trigger a workflow via webhook.
|
|
|
|
The workflow must be active (or use test_mode for inactive workflows).
|
|
The webhook path is the path configured in the Webhook node.
|
|
|
|
Args:
|
|
webhook_path: The webhook path (e.g., 'my-webhook' or full path like '/webhook/my-webhook')
|
|
method: HTTP method - GET, POST, PUT, DELETE (default: POST)
|
|
data: JSON string of data to send (default: "{}")
|
|
test_mode: Use webhook-test endpoint for inactive workflows (default: False)
|
|
"""
|
|
# Normalize webhook path
|
|
path = webhook_path.strip("/")
|
|
if path.startswith("webhook/") or path.startswith("webhook-test/"):
|
|
path = path.split("/", 1)[1]
|
|
|
|
base = "webhook-test" if test_mode else "webhook"
|
|
url = f"{N8N_URL}/{base}/{path}"
|
|
|
|
try:
|
|
body = json.loads(data) if data else {}
|
|
except json.JSONDecodeError as e:
|
|
return json.dumps({"error": True, "message": f"Invalid JSON data: {e}"})
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as http_client:
|
|
response = await http_client.request(
|
|
method=method.upper(),
|
|
url=url,
|
|
json=body if method.upper() != "GET" else None,
|
|
params=body if method.upper() == "GET" else None,
|
|
)
|
|
return json.dumps(
|
|
{
|
|
"status_code": response.status_code,
|
|
"response": response.json()
|
|
if response.headers.get("content-type", "").startswith(
|
|
"application/json"
|
|
)
|
|
else response.text,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return json.dumps({"error": True, "message": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# API Pass-through Tool
|
|
# =============================================================================
|
|
|
|
|
|
def _normalize_json_param(value: str | dict[str, Any] | None) -> dict[str, Any]:
|
|
"""Normalize a parameter that can be either a JSON string or a dict.
|
|
|
|
This handles the case where MCP clients may pass either:
|
|
- A JSON string: '{"key": "value"}'
|
|
- A dict object: {"key": "value"}
|
|
"""
|
|
if value is None:
|
|
return {}
|
|
if isinstance(value, dict):
|
|
return value
|
|
if isinstance(value, str):
|
|
if not value or value == "{}":
|
|
return {}
|
|
try:
|
|
parsed = json.loads(value)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
@mcp.tool()
|
|
async def n8n_api_call(
|
|
endpoint: str,
|
|
method: str = "GET",
|
|
params: str | dict[str, Any] | None = None,
|
|
body: str | dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Execute a raw API call to n8n.
|
|
|
|
Use this for any operation not covered by the other tools.
|
|
Refer to the 'n8n://api-reference' resource for available endpoints.
|
|
|
|
Args:
|
|
endpoint: API endpoint path (e.g., '/workflows', '/credentials')
|
|
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
|
|
params: Query parameters as JSON string or dict (optional)
|
|
body: Request body as JSON string or dict for POST/PUT/PATCH (optional)
|
|
|
|
Examples:
|
|
- Create workflow: n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}')
|
|
- Activate workflow: n8n_api_call('/workflows/123/activate', 'POST')
|
|
- List credentials: n8n_api_call('/credentials')
|
|
- Create tag: n8n_api_call('/tags', 'POST', body='{"name": "production"}')
|
|
"""
|
|
params_dict = _normalize_json_param(params)
|
|
body_dict = _normalize_json_param(body)
|
|
|
|
result = await client.request(
|
|
method=method,
|
|
endpoint=endpoint,
|
|
params=params_dict if params_dict else None,
|
|
json_body=body_dict if body_dict else None,
|
|
)
|
|
return json.dumps(result)
|
|
|
|
|
|
# =============================================================================
|
|
# Starlette Wrapper for Health Checks
|
|
# =============================================================================
|
|
|
|
|
|
async def health_check(request):
|
|
"""Health check endpoint for Docker/Kubernetes.
|
|
|
|
Validates both server and n8n connectivity.
|
|
"""
|
|
n8n_healthy = await client.health_check()
|
|
status = "ok" if n8n_healthy else "degraded"
|
|
status_code = 200 if n8n_healthy else 503
|
|
|
|
return JSONResponse(
|
|
{"status": status, "n8n_url": N8N_URL, "n8n_connected": n8n_healthy},
|
|
status_code=status_code,
|
|
)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app):
|
|
"""Manage client lifecycle."""
|
|
yield
|
|
await client.close()
|
|
|
|
|
|
def create_app() -> Starlette:
|
|
"""Create the Starlette application with health check and MCP."""
|
|
mcp_app = mcp.http_app()
|
|
|
|
# Add health check route directly to the MCP app
|
|
mcp_app.add_route("/health", health_check, methods=["GET"])
|
|
|
|
return mcp_app
|
|
|
|
|
|
app = create_app()
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
port = int(os.getenv("PORT", "8000"))
|
|
uvicorn.run(app, host="0.0.0.0", port=port)
|