Initial commit: Custom Proxmox MCP with SSE wrapper
All checks were successful
Build and Push Proxmox MCP Docker Image / build (push) Successful in 16s
All checks were successful
Build and Push Proxmox MCP Docker Image / build (push) Successful in 16s
This commit is contained in:
159
server.py
Normal file
159
server.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from uvicorn import Config, Server
|
||||
from mcp.server.fastapi import FastAPIAppBuilder
|
||||
from mcp.server.models import CallToolRequest, CallToolResponse, Tool
|
||||
from mcp.server.transports.sse import SSEServerTransport
|
||||
from mcp.server.server import MCPServer
|
||||
from mcp.exceptions import MCPException
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from proxmoxer import ProxmoxAPI, ProxmoxAPIException
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables for local development
|
||||
load_dotenv()
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# --- Proxmox API Configuration ---
|
||||
PROXMOX_URL = os.getenv("PROXMOX_URL")
|
||||
PROXMOX_USER = os.getenv("PROXMOX_USER")
|
||||
PROXMOX_PASSWORD = os.getenv("PROXMOX_PASSWORD") # Or token secret
|
||||
PROXMOX_TOKEN_ID = os.getenv("PROXMOX_TOKEN_ID")
|
||||
PROXMOX_VERIFY_SSL = os.getenv("PROXMOX_VERIFY_SSL", "false").lower() == "true"
|
||||
|
||||
if not PROXMOX_URL or not (PROXMOX_PASSWORD and PROXMOX_TOKEN_ID):
|
||||
logger.fatal("Proxmox API credentials (URL, User, Password/Token) not fully set.")
|
||||
exit(1)
|
||||
|
||||
# Initialize ProxmoxAPI client
|
||||
try:
|
||||
if PROXMOX_TOKEN_ID and PROXMOX_PASSWORD: # Using API Token
|
||||
proxmox = ProxmoxAPI(PROXMOX_URL, user=f"{PROXMOX_USER}@{PROXMOX_TOKEN_ID}", token_name=PROXMOX_TOKEN_ID, token_value=PROXMOX_PASSWORD, verify_ssl=PROXMOX_VERIFY_SSL)
|
||||
else: # Using Username/Password
|
||||
proxmox = ProxmoxAPI(PROXMOX_URL, user=PROXMOX_USER, password=PROXMOX_PASSWORD, verify_ssl=PROXMOX_VERIFY_SSL)
|
||||
# Test connection
|
||||
proxmox.version.get()
|
||||
logger.info("Successfully connected to Proxmox API.")
|
||||
except ProxmoxAPIException as e:
|
||||
logger.fatal(f"Failed to connect to Proxmox API: {e}")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
logger.fatal(f"An unexpected error occurred during Proxmox API connection: {e}")
|
||||
exit(1)
|
||||
|
||||
# --- MCP Server Setup ---
|
||||
mcp_server = MCPServer(
|
||||
name="Proxmox MCP Server",
|
||||
version="1.0",
|
||||
description="MCP Server for Proxmox VE via proxmoxer",
|
||||
capabilities={"tools": True}
|
||||
)
|
||||
|
||||
# --- Core Tools ---
|
||||
@mcp_server.tool(
|
||||
name="list_nodes",
|
||||
description="Returns a list of all Proxmox nodes in the cluster with their status.",
|
||||
parameters={} # No parameters needed
|
||||
)
|
||||
def list_nodes():
|
||||
"""Lists all Proxmox nodes."""
|
||||
try:
|
||||
nodes = proxmox.nodes.get()
|
||||
return {"nodes": nodes}
|
||||
except ProxmoxAPIException as e:
|
||||
raise MCPException(f"Proxmox API Error: {e}")
|
||||
except Exception as e:
|
||||
raise MCPException(f"Error listing nodes: {e}")
|
||||
|
||||
@mcp_server.tool(
|
||||
name="get_cluster_resources",
|
||||
description="Returns a summary of all resources (VMs, LXC containers, storage, etc.) across the cluster.",
|
||||
parameters={} # No parameters needed
|
||||
)
|
||||
def get_cluster_resources():
|
||||
"""Gets a summary of all cluster resources."""
|
||||
try:
|
||||
resources = proxmox.cluster.resources.get()
|
||||
return {"resources": resources}
|
||||
except ProxmoxAPIException as e:
|
||||
raise MCPException(f"Proxmox API Error: {e}")
|
||||
except Exception as e:
|
||||
raise MCPException(f"Error getting cluster resources: {e}")
|
||||
|
||||
# --- Raw API Access Tool ---
|
||||
@mcp_server.tool(
|
||||
name="proxmox_api_call",
|
||||
description="Executes a raw API call to Proxmox VE. Use carefully!",
|
||||
parameters={
|
||||
"path": {"type": "string", "description": "The API path (e.g., 'nodes/pve1/qemu/100/status/start')."},
|
||||
"method": {"type": "string", "enum": ["GET", "POST", "PUT", "DELETE"], "description": "HTTP Method."},
|
||||
"data": {"type": "object", "description": "Optional JSON payload for POST/PUT methods.", "default": {}},
|
||||
"node": {"type": "string", "description": "Optional node name if path is relative to a node.", "default": None},
|
||||
"vmid": {"type": "integer", "description": "Optional VM ID if path is relative to a VM.", "default": None},
|
||||
"lxcid": {"type": "integer", "description": "Optional LXC ID if path is relative to an LXC container.", "default": None},
|
||||
},
|
||||
)
|
||||
def proxmox_api_call(path: str, method: str, data: dict = {}, node: str = None, vmid: int = None, lxcid: int = None):
|
||||
"""Executes a raw Proxmox API call."""
|
||||
try:
|
||||
# Dynamically build the ProxmoxAPI path
|
||||
api_path = proxmox
|
||||
path_segments = path.strip('/').split('/')
|
||||
|
||||
for segment in path_segments:
|
||||
if segment: # Skip empty segments
|
||||
if segment == "nodes" and node:
|
||||
api_path = api_path.nodes(node)
|
||||
elif segment == "qemu" and vmid:
|
||||
api_path = api_path.qemu(vmid)
|
||||
elif segment == "lxc" and lxcid:
|
||||
api_path = api_path.lxc(lxcid)
|
||||
else:
|
||||
api_path = api_path(segment)
|
||||
|
||||
method_func = getattr(api_path, method.lower(), None)
|
||||
if not method_func:
|
||||
raise MCPException(f"Unsupported method '{method}' for path segment '{path}'")
|
||||
|
||||
if method.upper() in ["POST", "PUT"]:
|
||||
result = method_func(**data)
|
||||
else:
|
||||
result = method_func()
|
||||
|
||||
return {"result": result}
|
||||
except ProxmoxAPIException as e:
|
||||
raise MCPException(f"Proxmox API Error: {e}")
|
||||
except Exception as e:
|
||||
raise MCPException(f"Error executing raw API call: {e}")
|
||||
|
||||
|
||||
# --- FastAPI Integration ---
|
||||
app_builder = FastAPIAppBuilder(mcp_server)
|
||||
app = app_builder.build_app()
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
return {"message": "Proxmox MCP Server is running. Access /sse for events."}
|
||||
|
||||
@app.get("/sse")
|
||||
async def sse(request: Request):
|
||||
"""Server-Sent Events endpoint for MCP communication."""
|
||||
logger.info("New SSE connection from client.")
|
||||
return StreamingResponse(
|
||||
SSEServerTransport(mcp_server).get_response_generator(request),
|
||||
media_type="text/event-stream"
|
||||
)
|
||||
|
||||
@app.post("/messages")
|
||||
async def messages(request: Request):
|
||||
"""Endpoint for MCP client messages."""
|
||||
logger.info("New POST /messages from client.")
|
||||
return await SSEServerTransport(mcp_server).handle_request(request)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user