Initial implementation of n8n MCP server
All checks were successful
Build and Push n8n MCP Docker Image / build (push) Successful in 8s

Implements Hybrid MCP Light pattern with:
- 5 curated tools: list_workflows, get_workflow, list_executions, get_execution, trigger_webhook
- API pass-through tool: n8n_api_call for complete coverage
- API reference resource: n8n://api-reference
- Dockerfile and CI workflow for Gitea container registry
This commit is contained in:
Ben
2025-12-26 18:49:11 +00:00
commit d717d12c57
8 changed files with 715 additions and 0 deletions

423
server.py Normal file
View File

@@ -0,0 +1,423 @@
"""
n8n MCP Server - Hybrid MCP Light implementation for n8n API.
Provides 5 curated tools plus an API pass-through for complete API coverage.
Follows the Hybrid MCP Light Blueprint pattern.
"""
import json
import os
from contextlib import asynccontextmanager
from typing import Any
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
load_dotenv()
# Configuration
N8N_URL = os.getenv("N8N_URL", "").rstrip("/")
N8N_API_KEY = os.getenv("N8N_API_KEY", "")
# Initialize MCP server
mcp = FastMCP(
"n8n MCP",
instructions="MCP server for n8n API - provides workflow and execution management",
)
class N8nClient:
"""HTTP client for n8n API with API key authentication."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
self.api_key = api_key
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
headers={
"X-N8N-API-KEY": self.api_key,
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=30.0,
)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
async def request(
self,
method: str,
endpoint: str,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> dict[str, Any] | list[Any]:
"""Execute an API request to n8n."""
client = await self._get_client()
url = f"{self.api_url}{endpoint}"
try:
response = await client.request(
method=method.upper(),
url=url,
params=params,
json=json_body,
)
response.raise_for_status()
if response.status_code == 204:
return {"status": "success", "message": "No content"}
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
else:
return {"content": response.text, "content_type": content_type}
except httpx.HTTPStatusError as e:
return {
"error": True,
"status_code": e.response.status_code,
"message": str(e),
"detail": e.response.text[:500] if e.response.text else None,
}
except Exception as e:
return {"error": True, "message": str(e)}
async def health_check(self) -> bool:
"""Check if n8n is accessible."""
try:
client = await self._get_client()
# Try the healthz endpoint first
healthz_url = f"{self.base_url}/healthz"
response = await client.get(healthz_url)
if response.status_code == 200:
return True
# Fallback to listing workflows
response = await client.get(
f"{self.api_url}/workflows", params={"limit": 1}
)
return response.status_code == 200
except Exception:
return False
# Global client instance
client = N8nClient(N8N_URL, N8N_API_KEY)
# =============================================================================
# API Reference Resource
# =============================================================================
API_REFERENCE = """
# n8n API Quick Reference
Base URL: {base_url}/api/v1
## Common Endpoints for `n8n_api_call` tool
### Workflow Management
- GET `/workflows` - List all workflows
- Params: limit, cursor, active (bool), tags (array)
- GET `/workflows/{{id}}` - Get workflow by ID
- POST `/workflows` - Create workflow (body: name, nodes, connections)
- PUT `/workflows/{{id}}` - Update workflow
- DELETE `/workflows/{{id}}` - Delete workflow
- POST `/workflows/{{id}}/activate` - Activate workflow
- POST `/workflows/{{id}}/deactivate` - Deactivate workflow
### Execution Management
- GET `/executions` - List executions
- Params: limit, cursor, workflowId, status (success/error/waiting)
- GET `/executions/{{id}}` - Get execution details
- Params: includeData (bool)
- DELETE `/executions/{{id}}` - Delete execution
### Credentials
- GET `/credentials` - List credentials
- GET `/credentials/{{id}}` - Get credential (no secrets)
- POST `/credentials` - Create credential
- PATCH `/credentials/{{id}}` - Update credential
- DELETE `/credentials/{{id}}` - Delete credential
### Tags
- GET `/tags` - List tags
- POST `/tags` - Create tag (body: name)
- PATCH `/tags/{{id}}` - Update tag
- DELETE `/tags/{{id}}` - Delete tag
### Variables (if enabled)
- GET `/variables` - List variables
- POST `/variables` - Create variable (body: key, value)
- PATCH `/variables/{{id}}` - Update variable
- DELETE `/variables/{{id}}` - Delete variable
### Webhook Execution
Webhooks are triggered at: {base_url}/webhook/{{path}} or {base_url}/webhook-test/{{path}}
Use the `n8n_trigger_webhook` tool for webhook execution.
### Health Check
- GET `/healthz` - Check n8n instance health
## Notes
- All list endpoints support pagination via `cursor` parameter
- Use `limit` to control result count (typically 1-100)
- Status filters: success, error, waiting
- Active filter: true/false for workflows
""".format(base_url=N8N_URL)
@mcp.resource("n8n://api-reference")
def get_api_reference() -> str:
"""Returns the n8n API quick reference for using the n8n_api_call tool."""
return API_REFERENCE
# =============================================================================
# MCP Tools - Curated Operations (Most Frequently Used)
# =============================================================================
@mcp.tool()
async def list_workflows(
limit: int = 100,
cursor: str | None = None,
active: bool | None = None,
) -> str:
"""List all workflows in the n8n instance.
Returns workflow metadata including id, name, active status, and tags.
Use cursor for pagination when there are more results.
Args:
limit: Number of workflows to return (1-100, default: 100)
cursor: Pagination cursor from previous response
active: Filter by active status (optional)
"""
params: dict[str, Any] = {"limit": min(limit, 100)}
if cursor:
params["cursor"] = cursor
if active is not None:
params["active"] = active
result = await client.request("GET", "/workflows", params=params)
return json.dumps(result)
@mcp.tool()
async def get_workflow(workflow_id: str) -> str:
"""Get a workflow by ID.
Returns the complete workflow including nodes, connections, settings, and metadata.
Args:
workflow_id: The workflow ID
"""
result = await client.request("GET", f"/workflows/{workflow_id}")
return json.dumps(result)
@mcp.tool()
async def list_executions(
limit: int = 100,
cursor: str | None = None,
workflow_id: str | None = None,
status: str | None = None,
) -> str:
"""List workflow executions.
Returns execution history with status, timing, and workflow info.
Use cursor for pagination when there are more results.
Args:
limit: Number of executions to return (1-100, default: 100)
cursor: Pagination cursor from previous response
workflow_id: Filter by specific workflow ID (optional)
status: Filter by status: 'success', 'error', or 'waiting' (optional)
"""
params: dict[str, Any] = {"limit": min(limit, 100)}
if cursor:
params["cursor"] = cursor
if workflow_id:
params["workflowId"] = workflow_id
if status and status in ("success", "error", "waiting"):
params["status"] = status
result = await client.request("GET", "/executions", params=params)
return json.dumps(result)
@mcp.tool()
async def get_execution(
execution_id: str,
include_data: bool = False,
) -> str:
"""Get execution details by ID.
Returns execution status, timing, and optionally the full execution data.
Warning: include_data=True can return very large responses for complex workflows.
Args:
execution_id: The execution ID
include_data: Include full execution data (default: False to save context)
"""
params: dict[str, Any] = {}
if include_data:
params["includeData"] = True
result = await client.request("GET", f"/executions/{execution_id}", params=params)
return json.dumps(result)
@mcp.tool()
async def trigger_webhook(
webhook_path: str,
method: str = "POST",
data: str = "{}",
test_mode: bool = False,
) -> str:
"""Trigger a workflow via webhook.
The workflow must be active (or use test_mode for inactive workflows).
The webhook path is the path configured in the Webhook node.
Args:
webhook_path: The webhook path (e.g., 'my-webhook' or full path like '/webhook/my-webhook')
method: HTTP method - GET, POST, PUT, DELETE (default: POST)
data: JSON string of data to send (default: "{}")
test_mode: Use webhook-test endpoint for inactive workflows (default: False)
"""
# Normalize webhook path
path = webhook_path.strip("/")
if path.startswith("webhook/") or path.startswith("webhook-test/"):
path = path.split("/", 1)[1]
base = "webhook-test" if test_mode else "webhook"
url = f"{N8N_URL}/{base}/{path}"
try:
body = json.loads(data) if data else {}
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON data: {e}"})
try:
async with httpx.AsyncClient(timeout=120.0) as http_client:
response = await http_client.request(
method=method.upper(),
url=url,
json=body if method.upper() != "GET" else None,
params=body if method.upper() == "GET" else None,
)
return json.dumps(
{
"status_code": response.status_code,
"response": response.json()
if response.headers.get("content-type", "").startswith(
"application/json"
)
else response.text,
}
)
except Exception as e:
return json.dumps({"error": True, "message": str(e)})
# =============================================================================
# API Pass-through Tool
# =============================================================================
@mcp.tool()
async def n8n_api_call(
endpoint: str,
method: str = "GET",
params: str = "{}",
body: str = "{}",
) -> str:
"""Execute a raw API call to n8n.
Use this for any operation not covered by the other tools.
Refer to the 'n8n://api-reference' resource for available endpoints.
Args:
endpoint: API endpoint path (e.g., '/workflows', '/credentials')
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
params: JSON string of query parameters (optional)
body: JSON string of request body for POST/PUT/PATCH (optional)
Examples:
- Create workflow: n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}')
- Activate workflow: n8n_api_call('/workflows/123/activate', 'POST')
- List credentials: n8n_api_call('/credentials')
- Create tag: n8n_api_call('/tags', 'POST', body='{"name": "production"}')
"""
try:
params_dict = json.loads(params) if params else {}
body_dict = json.loads(body) if body else {}
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
result = await client.request(
method=method,
endpoint=endpoint,
params=params_dict if params_dict else None,
json_body=body_dict if body_dict else None,
)
return json.dumps(result)
# =============================================================================
# Starlette Wrapper for Health Checks
# =============================================================================
async def health_check(request):
"""Health check endpoint for Docker/Kubernetes.
Validates both server and n8n connectivity.
"""
n8n_healthy = await client.health_check()
status = "ok" if n8n_healthy else "degraded"
status_code = 200 if n8n_healthy else 503
return JSONResponse(
{"status": status, "n8n_url": N8N_URL, "n8n_connected": n8n_healthy},
status_code=status_code,
)
@asynccontextmanager
async def lifespan(app):
"""Manage client lifecycle."""
yield
await client.close()
def create_app() -> Starlette:
"""Create the Starlette application with health check and MCP."""
mcp_app = mcp.http_app()
# Add health check route directly to the MCP app
mcp_app.add_route("/health", health_check, methods=["GET"])
return mcp_app
app = create_app()
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
uvicorn.run(app, host="0.0.0.0", port=port)