From 946f8bd97ed72aa6b7a6b71a7cc8f77b837cc2a9 Mon Sep 17 00:00:00 2001 From: Ben Date: Fri, 26 Dec 2025 18:49:11 +0000 Subject: [PATCH] Initial implementation of n8n MCP server Implements Hybrid MCP Light pattern with: - 5 curated tools: list_workflows, get_workflow, list_executions, get_execution, trigger_webhook - API pass-through tool: n8n_api_call for complete coverage - API reference resource: n8n://api-reference - Dockerfile and CI workflow for Gitea container registry --- .env.example | 11 + .gitea/workflows/build.yaml | 30 +++ .gitignore | 45 ++++ Dockerfile | 25 +++ README.md | 138 ++++++++++++ docker-compose.yml | 16 ++ pyproject.toml | 27 +++ server.py | 423 ++++++++++++++++++++++++++++++++++++ 8 files changed, 715 insertions(+) create mode 100644 .env.example create mode 100644 .gitea/workflows/build.yaml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 docker-compose.yml create mode 100644 pyproject.toml create mode 100644 server.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..54144e8 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# n8n MCP Server Configuration +# Copy this file to .env and fill in your values + +# n8n Instance URL (without trailing slash) +N8N_URL=https://n8n.example.com + +# n8n API Key (generate from n8n Settings > API) +N8N_API_KEY=your-api-key-here + +# Optional: Server port (default: 8000) +# PORT=8000 diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml new file mode 100644 index 0000000..bdcd88e --- /dev/null +++ b/.gitea/workflows/build.yaml @@ -0,0 +1,30 @@ +name: Build and Push n8n MCP Docker Image + +on: + push: + branches: + - main + - master + +jobs: + build: + runs-on: docker + container: + image: catthehacker/ubuntu:act-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Login to Gitea Container Registry + uses: docker/login-action@v2 + with: + registry: gitea.ext.ben.io + username: ${{ gitea.actor }} + password: ${{ secrets.CR_PAT }} + + - name: Build and Push + uses: docker/build-push-action@v4 + with: + context: . + push: true + tags: gitea.ext.ben.io/${{ gitea.repository }}:latest diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a52eb3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Environment +.env +.env.local + +# Testing +.pytest_cache/ +.coverage +htmlcov/ + +# OS +.DS_Store +Thumbs.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..60941ab --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install uv for fast dependency management +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +# Copy project files +COPY pyproject.toml . +COPY server.py . +COPY README.md . + +# Install dependencies +RUN uv pip install --system --no-cache . + +# Install curl for health checks +RUN apt-get update && apt-get install -y --no-install-recommends curl && \ + rm -rf /var/lib/apt/lists/* + +EXPOSE 8000 + +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +CMD ["python", "server.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..6b560fa --- /dev/null +++ b/README.md @@ -0,0 +1,138 @@ +# n8n MCP Server (Custom) + +A lightweight MCP server for n8n API following the Hybrid MCP Light Blueprint pattern. + +## Design Philosophy + +This server implements the "Hybrid MCP Light" pattern: + +1. **5 Curated Tools** - Most frequently used operations as dedicated tools +2. **API Pass-through** - Raw API access for complete coverage +3. **Embedded Documentation** - API reference as an MCP resource + +This approach provides a clean tool surface for AI agents while ensuring 100% API capability coverage. + +## Curated Tools + +| Tool | Description | +|------|-------------| +| `list_workflows` | List all workflows with pagination | +| `get_workflow` | Get complete workflow by ID | +| `list_executions` | List execution history with filters | +| `get_execution` | Get execution details with optional data | +| `trigger_webhook` | Trigger workflow via webhook | +| `n8n_api_call` | Raw API pass-through for any operation | + +## Configuration + +| Variable | Required | Description | +|----------|----------|-------------| +| `N8N_URL` | Yes | n8n instance URL (without trailing slash) | +| `N8N_API_KEY` | Yes | API key from n8n Settings > API | +| `PORT` | No | Server port (default: 8000) | + +## Quick Start + +### Docker (Recommended) + +```bash +docker run -d \ + -e N8N_URL=https://n8n.example.com \ + -e N8N_API_KEY=your-api-key \ + -p 8000:8000 \ + gitea.ext.ben.io/b3nw/n8n-mcp-custom:latest +``` + +### Docker Compose + +```bash +cp .env.example .env +# Edit .env with your n8n credentials +docker-compose up -d +``` + +### Local Development + +```bash +# Create virtual environment +python -m venv .venv +source .venv/bin/activate + +# Install dependencies +pip install -e . + +# Copy and configure environment +cp .env.example .env +# Edit .env with your credentials + +# Run server +python server.py +``` + +## MCP Client Configuration + +Configure your MCP client to connect to: + +``` +http://localhost:8000/mcp +``` + +For streamable HTTP transport (recommended): +```json +{ + "mcpServers": { + "n8n": { + "url": "http://localhost:8000/mcp" + } + } +} +``` + +## API Reference + +The server exposes an API reference resource at `n8n://api-reference` which documents all available n8n API endpoints for use with the `n8n_api_call` tool. + +### Common Operations via API Pass-through + +```python +# Create a workflow +n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}') + +# Activate a workflow +n8n_api_call('/workflows/123/activate', 'POST') + +# List credentials +n8n_api_call('/credentials') + +# Create a tag +n8n_api_call('/tags', 'POST', body='{"name": "production"}') + +# Delete an execution +n8n_api_call('/executions/456', 'DELETE') +``` + +## Health Check + +The server provides a health check endpoint at `/health` that verifies: +- Server is running +- n8n instance is accessible + +```bash +curl http://localhost:8000/health +``` + +## Architecture + +``` +n8n-mcp-custom/ +├── server.py # Main MCP server with tools and API client +├── pyproject.toml # Python dependencies +├── Dockerfile # Container build +├── docker-compose.yml # Local orchestration +├── .env.example # Configuration template +└── README.md # This file +``` + +## License + +MIT diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..02893b7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,16 @@ +services: + n8n-mcp: + image: gitea.ext.ben.io/b3nw/n8n-mcp-custom:latest + container_name: n8n-mcp + restart: unless-stopped + ports: + - "${PORT:-8000}:8000" + environment: + - N8N_URL=${N8N_URL} + - N8N_API_KEY=${N8N_API_KEY} + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 5s diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..14db3c0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[project] +name = "n8n-mcp" +version = "0.1.0" +description = "Hybrid MCP Light server for n8n API - provides curated tools plus API pass-through" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "fastmcp>=2.0", + "httpx>=0.27", + "starlette>=0.40", + "uvicorn>=0.30", + "python-dotenv>=1.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.24", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["."] +include = ["server.py"] diff --git a/server.py b/server.py new file mode 100644 index 0000000..464fd1f --- /dev/null +++ b/server.py @@ -0,0 +1,423 @@ +""" +n8n MCP Server - Hybrid MCP Light implementation for n8n API. + +Provides 5 curated tools plus an API pass-through for complete API coverage. +Follows the Hybrid MCP Light Blueprint pattern. +""" + +import json +import os +from contextlib import asynccontextmanager +from typing import Any + +import httpx +from dotenv import load_dotenv +from fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Mount, Route + +load_dotenv() + +# Configuration +N8N_URL = os.getenv("N8N_URL", "").rstrip("/") +N8N_API_KEY = os.getenv("N8N_API_KEY", "") + +# Initialize MCP server +mcp = FastMCP( + "n8n MCP", + instructions="MCP server for n8n API - provides workflow and execution management", +) + + +class N8nClient: + """HTTP client for n8n API with API key authentication.""" + + def __init__(self, base_url: str, api_key: str): + self.base_url = base_url.rstrip("/") + self.api_url = f"{self.base_url}/api/v1" + self.api_key = api_key + self._client: httpx.AsyncClient | None = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient( + headers={ + "X-N8N-API-KEY": self.api_key, + "Accept": "application/json", + "Content-Type": "application/json", + }, + timeout=30.0, + ) + return self._client + + async def close(self): + if self._client: + await self._client.aclose() + self._client = None + + async def request( + self, + method: str, + endpoint: str, + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + ) -> dict[str, Any] | list[Any]: + """Execute an API request to n8n.""" + client = await self._get_client() + url = f"{self.api_url}{endpoint}" + + try: + response = await client.request( + method=method.upper(), + url=url, + params=params, + json=json_body, + ) + response.raise_for_status() + + if response.status_code == 204: + return {"status": "success", "message": "No content"} + + content_type = response.headers.get("content-type", "") + if "application/json" in content_type: + return response.json() + else: + return {"content": response.text, "content_type": content_type} + except httpx.HTTPStatusError as e: + return { + "error": True, + "status_code": e.response.status_code, + "message": str(e), + "detail": e.response.text[:500] if e.response.text else None, + } + except Exception as e: + return {"error": True, "message": str(e)} + + async def health_check(self) -> bool: + """Check if n8n is accessible.""" + try: + client = await self._get_client() + # Try the healthz endpoint first + healthz_url = f"{self.base_url}/healthz" + response = await client.get(healthz_url) + if response.status_code == 200: + return True + # Fallback to listing workflows + response = await client.get( + f"{self.api_url}/workflows", params={"limit": 1} + ) + return response.status_code == 200 + except Exception: + return False + + +# Global client instance +client = N8nClient(N8N_URL, N8N_API_KEY) + + +# ============================================================================= +# API Reference Resource +# ============================================================================= + +API_REFERENCE = """ +# n8n API Quick Reference + +Base URL: {base_url}/api/v1 + +## Common Endpoints for `n8n_api_call` tool + +### Workflow Management +- GET `/workflows` - List all workflows + - Params: limit, cursor, active (bool), tags (array) +- GET `/workflows/{{id}}` - Get workflow by ID +- POST `/workflows` - Create workflow (body: name, nodes, connections) +- PUT `/workflows/{{id}}` - Update workflow +- DELETE `/workflows/{{id}}` - Delete workflow +- POST `/workflows/{{id}}/activate` - Activate workflow +- POST `/workflows/{{id}}/deactivate` - Deactivate workflow + +### Execution Management +- GET `/executions` - List executions + - Params: limit, cursor, workflowId, status (success/error/waiting) +- GET `/executions/{{id}}` - Get execution details + - Params: includeData (bool) +- DELETE `/executions/{{id}}` - Delete execution + +### Credentials +- GET `/credentials` - List credentials +- GET `/credentials/{{id}}` - Get credential (no secrets) +- POST `/credentials` - Create credential +- PATCH `/credentials/{{id}}` - Update credential +- DELETE `/credentials/{{id}}` - Delete credential + +### Tags +- GET `/tags` - List tags +- POST `/tags` - Create tag (body: name) +- PATCH `/tags/{{id}}` - Update tag +- DELETE `/tags/{{id}}` - Delete tag + +### Variables (if enabled) +- GET `/variables` - List variables +- POST `/variables` - Create variable (body: key, value) +- PATCH `/variables/{{id}}` - Update variable +- DELETE `/variables/{{id}}` - Delete variable + +### Webhook Execution +Webhooks are triggered at: {base_url}/webhook/{{path}} or {base_url}/webhook-test/{{path}} +Use the `n8n_trigger_webhook` tool for webhook execution. + +### Health Check +- GET `/healthz` - Check n8n instance health + +## Notes +- All list endpoints support pagination via `cursor` parameter +- Use `limit` to control result count (typically 1-100) +- Status filters: success, error, waiting +- Active filter: true/false for workflows +""".format(base_url=N8N_URL) + + +@mcp.resource("n8n://api-reference") +def get_api_reference() -> str: + """Returns the n8n API quick reference for using the n8n_api_call tool.""" + return API_REFERENCE + + +# ============================================================================= +# MCP Tools - Curated Operations (Most Frequently Used) +# ============================================================================= + + +@mcp.tool() +async def list_workflows( + limit: int = 100, + cursor: str | None = None, + active: bool | None = None, +) -> str: + """List all workflows in the n8n instance. + + Returns workflow metadata including id, name, active status, and tags. + Use cursor for pagination when there are more results. + + Args: + limit: Number of workflows to return (1-100, default: 100) + cursor: Pagination cursor from previous response + active: Filter by active status (optional) + """ + params: dict[str, Any] = {"limit": min(limit, 100)} + if cursor: + params["cursor"] = cursor + if active is not None: + params["active"] = active + + result = await client.request("GET", "/workflows", params=params) + return json.dumps(result) + + +@mcp.tool() +async def get_workflow(workflow_id: str) -> str: + """Get a workflow by ID. + + Returns the complete workflow including nodes, connections, settings, and metadata. + + Args: + workflow_id: The workflow ID + """ + result = await client.request("GET", f"/workflows/{workflow_id}") + return json.dumps(result) + + +@mcp.tool() +async def list_executions( + limit: int = 100, + cursor: str | None = None, + workflow_id: str | None = None, + status: str | None = None, +) -> str: + """List workflow executions. + + Returns execution history with status, timing, and workflow info. + Use cursor for pagination when there are more results. + + Args: + limit: Number of executions to return (1-100, default: 100) + cursor: Pagination cursor from previous response + workflow_id: Filter by specific workflow ID (optional) + status: Filter by status: 'success', 'error', or 'waiting' (optional) + """ + params: dict[str, Any] = {"limit": min(limit, 100)} + if cursor: + params["cursor"] = cursor + if workflow_id: + params["workflowId"] = workflow_id + if status and status in ("success", "error", "waiting"): + params["status"] = status + + result = await client.request("GET", "/executions", params=params) + return json.dumps(result) + + +@mcp.tool() +async def get_execution( + execution_id: str, + include_data: bool = False, +) -> str: + """Get execution details by ID. + + Returns execution status, timing, and optionally the full execution data. + Warning: include_data=True can return very large responses for complex workflows. + + Args: + execution_id: The execution ID + include_data: Include full execution data (default: False to save context) + """ + params: dict[str, Any] = {} + if include_data: + params["includeData"] = True + + result = await client.request("GET", f"/executions/{execution_id}", params=params) + return json.dumps(result) + + +@mcp.tool() +async def trigger_webhook( + webhook_path: str, + method: str = "POST", + data: str = "{}", + test_mode: bool = False, +) -> str: + """Trigger a workflow via webhook. + + The workflow must be active (or use test_mode for inactive workflows). + The webhook path is the path configured in the Webhook node. + + Args: + webhook_path: The webhook path (e.g., 'my-webhook' or full path like '/webhook/my-webhook') + method: HTTP method - GET, POST, PUT, DELETE (default: POST) + data: JSON string of data to send (default: "{}") + test_mode: Use webhook-test endpoint for inactive workflows (default: False) + """ + # Normalize webhook path + path = webhook_path.strip("/") + if path.startswith("webhook/") or path.startswith("webhook-test/"): + path = path.split("/", 1)[1] + + base = "webhook-test" if test_mode else "webhook" + url = f"{N8N_URL}/{base}/{path}" + + try: + body = json.loads(data) if data else {} + except json.JSONDecodeError as e: + return json.dumps({"error": True, "message": f"Invalid JSON data: {e}"}) + + try: + async with httpx.AsyncClient(timeout=120.0) as http_client: + response = await http_client.request( + method=method.upper(), + url=url, + json=body if method.upper() != "GET" else None, + params=body if method.upper() == "GET" else None, + ) + return json.dumps( + { + "status_code": response.status_code, + "response": response.json() + if response.headers.get("content-type", "").startswith( + "application/json" + ) + else response.text, + } + ) + except Exception as e: + return json.dumps({"error": True, "message": str(e)}) + + +# ============================================================================= +# API Pass-through Tool +# ============================================================================= + + +@mcp.tool() +async def n8n_api_call( + endpoint: str, + method: str = "GET", + params: str = "{}", + body: str = "{}", +) -> str: + """Execute a raw API call to n8n. + + Use this for any operation not covered by the other tools. + Refer to the 'n8n://api-reference' resource for available endpoints. + + Args: + endpoint: API endpoint path (e.g., '/workflows', '/credentials') + method: HTTP method (GET, POST, PUT, PATCH, DELETE) + params: JSON string of query parameters (optional) + body: JSON string of request body for POST/PUT/PATCH (optional) + + Examples: + - Create workflow: n8n_api_call('/workflows', 'POST', body='{"name": "My Workflow", "nodes": [...], "connections": {...}}') + - Activate workflow: n8n_api_call('/workflows/123/activate', 'POST') + - List credentials: n8n_api_call('/credentials') + - Create tag: n8n_api_call('/tags', 'POST', body='{"name": "production"}') + """ + try: + params_dict = json.loads(params) if params else {} + body_dict = json.loads(body) if body else {} + except json.JSONDecodeError as e: + return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) + + result = await client.request( + method=method, + endpoint=endpoint, + params=params_dict if params_dict else None, + json_body=body_dict if body_dict else None, + ) + return json.dumps(result) + + +# ============================================================================= +# Starlette Wrapper for Health Checks +# ============================================================================= + + +async def health_check(request): + """Health check endpoint for Docker/Kubernetes. + + Validates both server and n8n connectivity. + """ + n8n_healthy = await client.health_check() + status = "ok" if n8n_healthy else "degraded" + status_code = 200 if n8n_healthy else 503 + + return JSONResponse( + {"status": status, "n8n_url": N8N_URL, "n8n_connected": n8n_healthy}, + status_code=status_code, + ) + + +@asynccontextmanager +async def lifespan(app): + """Manage client lifecycle.""" + yield + await client.close() + + +def create_app() -> Starlette: + """Create the Starlette application with health check and MCP.""" + mcp_app = mcp.http_app() + + # Add health check route directly to the MCP app + mcp_app.add_route("/health", health_check, methods=["GET"]) + + return mcp_app + + +app = create_app() + +if __name__ == "__main__": + import uvicorn + + port = int(os.getenv("PORT", "8000")) + uvicorn.run(app, host="0.0.0.0", port=port)