Initial implementation of n8n MCP server
All checks were successful
Build and Push n8n MCP Docker Image / build (push) Successful in 7s

Implements Hybrid MCP Light pattern with:
- 5 curated tools: list_workflows, get_workflow, list_executions, get_execution, trigger_webhook
- API pass-through tool: n8n_api_call for complete coverage
- API reference resource: n8n://api-reference
- Dockerfile and CI workflow for Gitea container registry
This commit is contained in:
Ben
2025-12-26 18:49:11 +00:00
commit 946f8bd97e
8 changed files with 715 additions and 0 deletions

11
.env.example Normal file
View File

@@ -0,0 +1,11 @@
# n8n MCP Server Configuration
# Copy this file to .env and fill in your values
# n8n Instance URL (without trailing slash)
N8N_URL=https://n8n.example.com
# n8n API Key (generate from n8n Settings > API)
N8N_API_KEY=your-api-key-here
# Optional: Server port (default: 8000)
# PORT=8000

View File

@@ -0,0 +1,30 @@
name: Build and Push n8n MCP Docker Image
on:
push:
branches:
- main
- master
jobs:
build:
runs-on: docker
container:
image: catthehacker/ubuntu:act-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Login to Gitea Container Registry
uses: docker/login-action@v2
with:
registry: gitea.ext.ben.io
username: ${{ gitea.actor }}
password: ${{ secrets.CR_PAT }}
- name: Build and Push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: gitea.ext.ben.io/${{ gitea.repository }}:latest

45
.gitignore vendored Normal file
View File

@@ -0,0 +1,45 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Environment
.env
.env.local
# Testing
.pytest_cache/
.coverage
htmlcov/
# OS
.DS_Store
Thumbs.db

25
Dockerfile Normal file
View File

@@ -0,0 +1,25 @@
FROM python:3.12-slim
WORKDIR /app
# Install uv for fast dependency management
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
# Copy project files
COPY pyproject.toml .
COPY server.py .
COPY README.md .
# Install dependencies
RUN uv pip install --system --no-cache .
# Install curl for health checks
RUN apt-get update && apt-get install -y --no-install-recommends curl && \
rm -rf /var/lib/apt/lists/*
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["python", "server.py"]

138
README.md Normal file
View File

@@ -0,0 +1,138 @@
# n8n MCP Server (Custom)
A lightweight MCP server for n8n API following the Hybrid MCP Light Blueprint pattern.
## Design Philosophy
This server implements the "Hybrid MCP Light" pattern:
1. **5 Curated Tools** - Most frequently used operations as dedicated tools
2. **API Pass-through** - Raw API access for complete coverage
3. **Embedded Documentation** - API reference as an MCP resource
This approach provides a clean tool surface for AI agents while ensuring 100% API capability coverage.
## Curated Tools
| Tool | Description |
|------|-------------|
| `list_workflows` | List all workflows with pagination |
| `get_workflow` | Get complete workflow by ID |
| `list_executions` | List execution history with filters |
| `get_execution` | Get execution details with optional data |
| `trigger_webhook` | Trigger workflow via webhook |
| `n8n_api_call` | Raw API pass-through for any operation |
## Configuration
| Variable | Required | Description |
|----------|----------|-------------|
| `N8N_URL` | Yes | n8n instance URL (without trailing slash) |
| `N8N_API_KEY` | Yes | API key from n8n Settings > API |
| `PORT` | No | Server port (default: 8000) |
## Quick Start
### Docker (Recommended)
```bash
docker run -d \
-e N8N_URL=https://n8n.example.com \
-e N8N_API_KEY=your-api-key \
-p 8000:8000 \
gitea.ext.ben.io/b3nw/n8n-mcp-custom:latest
```
### Docker Compose
```bash
cp .env.example .env
# Edit .env with your n8n credentials
docker-compose up -d
```
### Local Development
```bash
# Create virtual environment
python -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install -e .
# Copy and configure environment
cp .env.example .env
# Edit .env with your credentials
# Run server
python server.py
```
## MCP Client Configuration
Configure your MCP client to connect to:
```
http://localhost:8000/mcp
```
For streamable HTTP transport (recommended):
```json
{
"mcpServers": {
"n8n": {
"url": "http://localhost:8000/mcp"
}
}
}
```
## API Reference
The server exposes an API reference resource at `n8n://api-reference` which documents all available n8n API endpoints for use with the `n8n_api_call` tool.
### Common Operations via API Pass-through
```python
# Create a workflow
n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}')
# Activate a workflow
n8n_api_call('/workflows/123/activate', 'POST')
# List credentials
n8n_api_call('/credentials')
# Create a tag
n8n_api_call('/tags', 'POST', body='{"name": "production"}')
# Delete an execution
n8n_api_call('/executions/456', 'DELETE')
```
## Health Check
The server provides a health check endpoint at `/health` that verifies:
- Server is running
- n8n instance is accessible
```bash
curl http://localhost:8000/health
```
## Architecture
```
n8n-mcp-custom/
├── server.py # Main MCP server with tools and API client
├── pyproject.toml # Python dependencies
├── Dockerfile # Container build
├── docker-compose.yml # Local orchestration
├── .env.example # Configuration template
└── README.md # This file
```
## License
MIT

16
docker-compose.yml Normal file
View File

@@ -0,0 +1,16 @@
services:
n8n-mcp:
image: gitea.ext.ben.io/b3nw/n8n-mcp-custom:latest
container_name: n8n-mcp
restart: unless-stopped
ports:
- "${PORT:-8000}:8000"
environment:
- N8N_URL=${N8N_URL}
- N8N_API_KEY=${N8N_API_KEY}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 5s

27
pyproject.toml Normal file
View File

@@ -0,0 +1,27 @@
[project]
name = "n8n-mcp"
version = "0.1.0"
description = "Hybrid MCP Light server for n8n API - provides curated tools plus API pass-through"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"fastmcp>=2.0",
"httpx>=0.27",
"starlette>=0.40",
"uvicorn>=0.30",
"python-dotenv>=1.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.24",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["."]
include = ["server.py"]

423
server.py Normal file
View File

@@ -0,0 +1,423 @@
"""
n8n MCP Server - Hybrid MCP Light implementation for n8n API.
Provides 5 curated tools plus an API pass-through for complete API coverage.
Follows the Hybrid MCP Light Blueprint pattern.
"""
import json
import os
from contextlib import asynccontextmanager
from typing import Any
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
load_dotenv()
# Configuration
N8N_URL = os.getenv("N8N_URL", "").rstrip("/")
N8N_API_KEY = os.getenv("N8N_API_KEY", "")
# Initialize MCP server
mcp = FastMCP(
"n8n MCP",
instructions="MCP server for n8n API - provides workflow and execution management",
)
class N8nClient:
"""HTTP client for n8n API with API key authentication."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
self.api_key = api_key
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
headers={
"X-N8N-API-KEY": self.api_key,
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=30.0,
)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
async def request(
self,
method: str,
endpoint: str,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> dict[str, Any] | list[Any]:
"""Execute an API request to n8n."""
client = await self._get_client()
url = f"{self.api_url}{endpoint}"
try:
response = await client.request(
method=method.upper(),
url=url,
params=params,
json=json_body,
)
response.raise_for_status()
if response.status_code == 204:
return {"status": "success", "message": "No content"}
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
else:
return {"content": response.text, "content_type": content_type}
except httpx.HTTPStatusError as e:
return {
"error": True,
"status_code": e.response.status_code,
"message": str(e),
"detail": e.response.text[:500] if e.response.text else None,
}
except Exception as e:
return {"error": True, "message": str(e)}
async def health_check(self) -> bool:
"""Check if n8n is accessible."""
try:
client = await self._get_client()
# Try the healthz endpoint first
healthz_url = f"{self.base_url}/healthz"
response = await client.get(healthz_url)
if response.status_code == 200:
return True
# Fallback to listing workflows
response = await client.get(
f"{self.api_url}/workflows", params={"limit": 1}
)
return response.status_code == 200
except Exception:
return False
# Global client instance
client = N8nClient(N8N_URL, N8N_API_KEY)
# =============================================================================
# API Reference Resource
# =============================================================================
API_REFERENCE = """
# n8n API Quick Reference
Base URL: {base_url}/api/v1
## Common Endpoints for `n8n_api_call` tool
### Workflow Management
- GET `/workflows` - List all workflows
- Params: limit, cursor, active (bool), tags (array)
- GET `/workflows/{{id}}` - Get workflow by ID
- POST `/workflows` - Create workflow (body: name, nodes, connections)
- PUT `/workflows/{{id}}` - Update workflow
- DELETE `/workflows/{{id}}` - Delete workflow
- POST `/workflows/{{id}}/activate` - Activate workflow
- POST `/workflows/{{id}}/deactivate` - Deactivate workflow
### Execution Management
- GET `/executions` - List executions
- Params: limit, cursor, workflowId, status (success/error/waiting)
- GET `/executions/{{id}}` - Get execution details
- Params: includeData (bool)
- DELETE `/executions/{{id}}` - Delete execution
### Credentials
- GET `/credentials` - List credentials
- GET `/credentials/{{id}}` - Get credential (no secrets)
- POST `/credentials` - Create credential
- PATCH `/credentials/{{id}}` - Update credential
- DELETE `/credentials/{{id}}` - Delete credential
### Tags
- GET `/tags` - List tags
- POST `/tags` - Create tag (body: name)
- PATCH `/tags/{{id}}` - Update tag
- DELETE `/tags/{{id}}` - Delete tag
### Variables (if enabled)
- GET `/variables` - List variables
- POST `/variables` - Create variable (body: key, value)
- PATCH `/variables/{{id}}` - Update variable
- DELETE `/variables/{{id}}` - Delete variable
### Webhook Execution
Webhooks are triggered at: {base_url}/webhook/{{path}} or {base_url}/webhook-test/{{path}}
Use the `n8n_trigger_webhook` tool for webhook execution.
### Health Check
- GET `/healthz` - Check n8n instance health
## Notes
- All list endpoints support pagination via `cursor` parameter
- Use `limit` to control result count (typically 1-100)
- Status filters: success, error, waiting
- Active filter: true/false for workflows
""".format(base_url=N8N_URL)
@mcp.resource("n8n://api-reference")
def get_api_reference() -> str:
"""Returns the n8n API quick reference for using the n8n_api_call tool."""
return API_REFERENCE
# =============================================================================
# MCP Tools - Curated Operations (Most Frequently Used)
# =============================================================================
@mcp.tool()
async def list_workflows(
limit: int = 100,
cursor: str | None = None,
active: bool | None = None,
) -> str:
"""List all workflows in the n8n instance.
Returns workflow metadata including id, name, active status, and tags.
Use cursor for pagination when there are more results.
Args:
limit: Number of workflows to return (1-100, default: 100)
cursor: Pagination cursor from previous response
active: Filter by active status (optional)
"""
params: dict[str, Any] = {"limit": min(limit, 100)}
if cursor:
params["cursor"] = cursor
if active is not None:
params["active"] = active
result = await client.request("GET", "/workflows", params=params)
return json.dumps(result)
@mcp.tool()
async def get_workflow(workflow_id: str) -> str:
"""Get a workflow by ID.
Returns the complete workflow including nodes, connections, settings, and metadata.
Args:
workflow_id: The workflow ID
"""
result = await client.request("GET", f"/workflows/{workflow_id}")
return json.dumps(result)
@mcp.tool()
async def list_executions(
limit: int = 100,
cursor: str | None = None,
workflow_id: str | None = None,
status: str | None = None,
) -> str:
"""List workflow executions.
Returns execution history with status, timing, and workflow info.
Use cursor for pagination when there are more results.
Args:
limit: Number of executions to return (1-100, default: 100)
cursor: Pagination cursor from previous response
workflow_id: Filter by specific workflow ID (optional)
status: Filter by status: 'success', 'error', or 'waiting' (optional)
"""
params: dict[str, Any] = {"limit": min(limit, 100)}
if cursor:
params["cursor"] = cursor
if workflow_id:
params["workflowId"] = workflow_id
if status and status in ("success", "error", "waiting"):
params["status"] = status
result = await client.request("GET", "/executions", params=params)
return json.dumps(result)
@mcp.tool()
async def get_execution(
execution_id: str,
include_data: bool = False,
) -> str:
"""Get execution details by ID.
Returns execution status, timing, and optionally the full execution data.
Warning: include_data=True can return very large responses for complex workflows.
Args:
execution_id: The execution ID
include_data: Include full execution data (default: False to save context)
"""
params: dict[str, Any] = {}
if include_data:
params["includeData"] = True
result = await client.request("GET", f"/executions/{execution_id}", params=params)
return json.dumps(result)
@mcp.tool()
async def trigger_webhook(
webhook_path: str,
method: str = "POST",
data: str = "{}",
test_mode: bool = False,
) -> str:
"""Trigger a workflow via webhook.
The workflow must be active (or use test_mode for inactive workflows).
The webhook path is the path configured in the Webhook node.
Args:
webhook_path: The webhook path (e.g., 'my-webhook' or full path like '/webhook/my-webhook')
method: HTTP method - GET, POST, PUT, DELETE (default: POST)
data: JSON string of data to send (default: "{}")
test_mode: Use webhook-test endpoint for inactive workflows (default: False)
"""
# Normalize webhook path
path = webhook_path.strip("/")
if path.startswith("webhook/") or path.startswith("webhook-test/"):
path = path.split("/", 1)[1]
base = "webhook-test" if test_mode else "webhook"
url = f"{N8N_URL}/{base}/{path}"
try:
body = json.loads(data) if data else {}
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON data: {e}"})
try:
async with httpx.AsyncClient(timeout=120.0) as http_client:
response = await http_client.request(
method=method.upper(),
url=url,
json=body if method.upper() != "GET" else None,
params=body if method.upper() == "GET" else None,
)
return json.dumps(
{
"status_code": response.status_code,
"response": response.json()
if response.headers.get("content-type", "").startswith(
"application/json"
)
else response.text,
}
)
except Exception as e:
return json.dumps({"error": True, "message": str(e)})
# =============================================================================
# API Pass-through Tool
# =============================================================================
@mcp.tool()
async def n8n_api_call(
endpoint: str,
method: str = "GET",
params: str = "{}",
body: str = "{}",
) -> str:
"""Execute a raw API call to n8n.
Use this for any operation not covered by the other tools.
Refer to the 'n8n://api-reference' resource for available endpoints.
Args:
endpoint: API endpoint path (e.g., '/workflows', '/credentials')
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
params: JSON string of query parameters (optional)
body: JSON string of request body for POST/PUT/PATCH (optional)
Examples:
- Create workflow: n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}')
- Activate workflow: n8n_api_call('/workflows/123/activate', 'POST')
- List credentials: n8n_api_call('/credentials')
- Create tag: n8n_api_call('/tags', 'POST', body='{"name": "production"}')
"""
try:
params_dict = json.loads(params) if params else {}
body_dict = json.loads(body) if body else {}
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
result = await client.request(
method=method,
endpoint=endpoint,
params=params_dict if params_dict else None,
json_body=body_dict if body_dict else None,
)
return json.dumps(result)
# =============================================================================
# Starlette Wrapper for Health Checks
# =============================================================================
async def health_check(request):
"""Health check endpoint for Docker/Kubernetes.
Validates both server and n8n connectivity.
"""
n8n_healthy = await client.health_check()
status = "ok" if n8n_healthy else "degraded"
status_code = 200 if n8n_healthy else 503
return JSONResponse(
{"status": status, "n8n_url": N8N_URL, "n8n_connected": n8n_healthy},
status_code=status_code,
)
@asynccontextmanager
async def lifespan(app):
"""Manage client lifecycle."""
yield
await client.close()
def create_app() -> Starlette:
"""Create the Starlette application with health check and MCP."""
mcp_app = mcp.http_app()
# Add health check route directly to the MCP app
mcp_app.add_route("/health", health_check, methods=["GET"])
return mcp_app
app = create_app()
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
uvicorn.run(app, host="0.0.0.0", port=port)