Some checks failed
Build and Push Proxmox MCP Docker Image / build (push) Failing after 13s
160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
from fastapi import FastAPI, Request
|
|
from fastapi.responses import StreamingResponse
|
|
from uvicorn import Config, Server
|
|
from mcp.server.fastapi import FastAPIAppBuilder
|
|
from mcp.server.models import CallToolRequest, CallToolResponse, Tool
|
|
from mcp.server.transports.sse import SSEServerTransport
|
|
from mcp.server.server import MCPServer
|
|
from mcp.exceptions import MCPException
|
|
import os
|
|
import json
|
|
import logging
|
|
from proxmoxer import ProxmoxAPI, ProxmoxAPIException
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables for local development
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Proxmox API Configuration ---
|
|
PROXMOX_URL = os.getenv("PROXMOX_URL")
|
|
PROXMOX_USER = os.getenv("PROXMOX_USER")
|
|
PROXMOX_PASSWORD = os.getenv("PROXMOX_PASSWORD") # Or token secret
|
|
PROXMOX_TOKEN_ID = os.getenv("PROXMOX_TOKEN_ID")
|
|
PROXMOX_VERIFY_SSL = os.getenv("PROXMOX_VERIFY_SSL", "false").lower() == "true"
|
|
|
|
if not PROXMOX_URL or not (PROXMOX_PASSWORD and PROXMOX_TOKEN_ID):
|
|
logger.fatal("Proxmox API credentials (URL, User, Password/Token) not fully set.")
|
|
exit(1)
|
|
|
|
# Initialize ProxmoxAPI client
|
|
try:
|
|
if PROXMOX_TOKEN_ID and PROXMOX_PASSWORD: # Using API Token
|
|
proxmox = ProxmoxAPI(PROXMOX_URL, user=f"{PROXMOX_USER}@{PROXMOX_TOKEN_ID}", token_name=PROXMOX_TOKEN_ID, token_value=PROXMOX_PASSWORD, verify_ssl=PROXMOX_VERIFY_SSL)
|
|
else: # Using Username/Password
|
|
proxmox = ProxmoxAPI(PROXMOX_URL, user=PROXMOX_USER, password=PROXMOX_PASSWORD, verify_ssl=PROXMOX_VERIFY_SSL)
|
|
# Test connection
|
|
proxmox.version.get()
|
|
logger.info("Successfully connected to Proxmox API.")
|
|
except ProxmoxAPIException as e:
|
|
logger.fatal(f"Failed to connect to Proxmox API: {e}")
|
|
exit(1)
|
|
except Exception as e:
|
|
logger.fatal(f"An unexpected error occurred during Proxmox API connection: {e}")
|
|
exit(1)
|
|
|
|
# --- MCP Server Setup ---
|
|
mcp_server = MCPServer(
|
|
name="Proxmox MCP Server",
|
|
version="1.0",
|
|
description="MCP Server for Proxmox VE via proxmoxer",
|
|
capabilities={"tools": True}
|
|
)
|
|
|
|
# --- Core Tools ---
|
|
@mcp_server.tool(
|
|
name="list_nodes",
|
|
description="Returns a list of all Proxmox nodes in the cluster with their status.",
|
|
parameters={} # No parameters needed
|
|
)
|
|
def list_nodes():
|
|
"""Lists all Proxmox nodes."""
|
|
try:
|
|
nodes = proxmox.nodes.get()
|
|
return {"nodes": nodes}
|
|
except ProxmoxAPIException as e:
|
|
raise MCPException(f"Proxmox API Error: {e}")
|
|
except Exception as e:
|
|
raise MCPException(f"Error listing nodes: {e}")
|
|
|
|
@mcp_server.tool(
|
|
name="get_cluster_resources",
|
|
description="Returns a summary of all resources (VMs, LXC containers, storage, etc.) across the cluster.",
|
|
parameters={} # No parameters needed
|
|
)
|
|
def get_cluster_resources():
|
|
"""Gets a summary of all cluster resources."""
|
|
try:
|
|
resources = proxmox.cluster.resources.get()
|
|
return {"resources": resources}
|
|
except ProxmoxAPIException as e:
|
|
raise MCPException(f"Proxmox API Error: {e}")
|
|
except Exception as e:
|
|
raise MCPException(f"Error getting cluster resources: {e}")
|
|
|
|
# --- Raw API Access Tool ---
|
|
@mcp_server.tool(
|
|
name="proxmox_api_call",
|
|
description="Executes a raw API call to Proxmox VE. Use carefully!",
|
|
parameters={
|
|
"path": {"type": "string", "description": "The API path (e.g., 'nodes/pve1/qemu/100/status/start')."},
|
|
"method": {"type": "string", "enum": ["GET", "POST", "PUT", "DELETE"], "description": "HTTP Method."},
|
|
"data": {"type": "object", "description": "Optional JSON payload for POST/PUT methods.", "default": {}},
|
|
"node": {"type": "string", "description": "Optional node name if path is relative to a node.", "default": None},
|
|
"vmid": {"type": "integer", "description": "Optional VM ID if path is relative to a VM.", "default": None},
|
|
"lxcid": {"type": "integer", "description": "Optional LXC ID if path is relative to an LXC container.", "default": None},
|
|
},
|
|
)
|
|
def proxmox_api_call(path: str, method: str, data: dict = {}, node: str = None, vmid: int = None, lxcid: int = None):
|
|
"""Executes a raw Proxmox API call."""
|
|
try:
|
|
# Dynamically build the ProxmoxAPI path
|
|
api_path = proxmox
|
|
path_segments = path.strip('/').split('/')
|
|
|
|
for segment in path_segments:
|
|
if segment: # Skip empty segments
|
|
if segment == "nodes" and node:
|
|
api_path = api_path.nodes(node)
|
|
elif segment == "qemu" and vmid:
|
|
api_path = api_path.qemu(vmid)
|
|
elif segment == "lxc" and lxcid:
|
|
api_path = api_path.lxc(lxcid)
|
|
else:
|
|
api_path = api_path(segment)
|
|
|
|
method_func = getattr(api_path, method.lower(), None)
|
|
if not method_func:
|
|
raise MCPException(f"Unsupported method '{method}' for path segment '{path}'")
|
|
|
|
if method.upper() in ["POST", "PUT"]:
|
|
result = method_func(**data)
|
|
else:
|
|
result = method_func()
|
|
|
|
return {"result": result}
|
|
except ProxmoxAPIException as e:
|
|
raise MCPException(f"Proxmox API Error: {e}")
|
|
except Exception as e:
|
|
raise MCPException(f"Error executing raw API call: {e}")
|
|
|
|
|
|
# --- FastAPI Integration ---
|
|
app_builder = FastAPIAppBuilder(mcp_server)
|
|
app = app_builder.build_app()
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {"message": "Proxmox MCP Server is running. Access /sse for events."}
|
|
|
|
@app.get("/sse")
|
|
async def sse(request: Request):
|
|
"""Server-Sent Events endpoint for MCP communication."""
|
|
logger.info("New SSE connection from client.")
|
|
return StreamingResponse(
|
|
SSEServerTransport(mcp_server).get_response_generator(request),
|
|
media_type="text/event-stream"
|
|
)
|
|
|
|
@app.post("/messages")
|
|
async def messages(request: Request):
|
|
"""Endpoint for MCP client messages."""
|
|
logger.info("New POST /messages from client.")
|
|
return await SSEServerTransport(mcp_server).handle_request(request)
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|