import os import json import logging from pathlib import Path from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Route, Mount from proxmoxer import ProxmoxAPI from dotenv import load_dotenv # Load environment variables load_dotenv() # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Cluster Configuration --- CLUSTERS_CONFIG_PATH = os.getenv("CLUSTERS_CONFIG_PATH", "/app/clusters.json") class ClusterManager: """Manages multiple Proxmox cluster connections.""" def __init__(self, config_path: str): self.clusters: dict[str, ProxmoxAPI] = {} self.config: dict = {} self._load_config(config_path) def _load_config(self, config_path: str) -> None: """Load cluster configuration from JSON file.""" path = Path(config_path) if not path.exists(): logger.warning(f"Clusters config not found at {config_path}") return try: with open(path) as f: self.config = json.load(f) clusters_config = self.config.get("clusters", {}) for name, cfg in clusters_config.items(): self._connect_cluster(name, cfg) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in clusters config: {e}") except Exception as e: logger.error(f"Failed to load clusters config: {e}") def _connect_cluster(self, name: str, cfg: dict) -> None: """Connect to a single Proxmox cluster.""" try: url = cfg.get("url") user = cfg.get("user") token_id = cfg.get("token_id") token_secret = cfg.get("token_secret") verify_ssl = cfg.get("verify_ssl", False) if not all([url, user, token_id, token_secret]): logger.warning(f"Cluster '{name}' missing required fields, skipping") return client = ProxmoxAPI( url, user=user, token_name=token_id, token_value=token_secret, verify_ssl=verify_ssl ) self.clusters[name] = client logger.info(f"Connected to cluster '{name}' ({url})") except Exception as e: logger.error(f"Failed to connect to cluster '{name}': {e}") def get(self, name: str = None) -> ProxmoxAPI | None: """Get a cluster client by name. If name is None and only one cluster, return it.""" if not self.clusters: return None if name: return self.clusters.get(name) # If only one cluster, return it as default if len(self.clusters) == 1: return next(iter(self.clusters.values())) return None def list_names(self) -> list[str]: """Return list of configured cluster names.""" return list(self.clusters.keys()) def require_cluster_param(self) -> bool: """Returns True if cluster parameter is required (multiple clusters configured).""" return len(self.clusters) > 1 # Initialize cluster manager cluster_manager = ClusterManager(CLUSTERS_CONFIG_PATH) # --- FastMCP Server --- mcp = FastMCP("Proxmox MCP") def _get_cluster_or_error(cluster: str = None) -> tuple[ProxmoxAPI | None, dict | None]: """Helper to get cluster client or return error dict.""" if not cluster_manager.clusters: return None, {"error": "No clusters configured. Mount clusters.json to /app/clusters.json"} if cluster_manager.require_cluster_param() and not cluster: return None, { "error": "Multiple clusters configured. Specify 'cluster' parameter.", "available_clusters": cluster_manager.list_names() } client = cluster_manager.get(cluster) if not client: return None, { "error": f"Cluster '{cluster}' not found", "available_clusters": cluster_manager.list_names() } return client, None @mcp.tool() def list_clusters() -> dict: """Lists all configured Proxmox clusters.""" names = cluster_manager.list_names() if not names: return {"error": "No clusters configured", "clusters": []} return {"clusters": names} @mcp.tool() def list_nodes(cluster: str = None) -> dict: """Lists all nodes in a Proxmox cluster. Args: cluster: Cluster name (optional if only one cluster configured) """ client, error = _get_cluster_or_error(cluster) if error: return error try: nodes = client.nodes.get() return {"cluster": cluster or cluster_manager.list_names()[0], "nodes": nodes} except Exception as e: return {"error": str(e)} @mcp.tool() def get_cluster_resources(cluster: str = None) -> dict: """Gets a summary of all resources in a Proxmox cluster. Args: cluster: Cluster name (optional if only one cluster configured) """ client, error = _get_cluster_or_error(cluster) if error: return error try: resources = client.cluster.resources.get() return {"cluster": cluster or cluster_manager.list_names()[0], "resources": resources} except Exception as e: return {"error": str(e)} @mcp.tool() def proxmox_api_call( path: str, method: str = "GET", data: dict = None, cluster: str = None, node: str = None, vmid: int = None, lxcid: int = None ) -> dict: """Executes a raw Proxmox API call. Args: path: API path (e.g., 'nodes/pve1/qemu/100/status/start') method: HTTP method (GET, POST, PUT, DELETE) data: Optional JSON payload for POST/PUT cluster: Cluster name (optional if only one cluster configured) node: Node name (for path substitution) vmid: VM ID (for path substitution) lxcid: LXC container ID (for path substitution) """ client, error = _get_cluster_or_error(cluster) if error: return error if data is None: data = {} try: # Build path api_path = client for segment in path.strip('/').split('/'): if segment == "nodes" and node: api_path = api_path.nodes(node) elif segment == "qemu" and vmid: api_path = api_path.qemu(vmid) elif segment == "lxc" and lxcid: api_path = api_path.lxc(lxcid) elif segment: api_path = api_path(segment) # Execute method_func = getattr(api_path, method.lower()) if method.upper() in ["POST", "PUT"]: result = method_func(**data) else: result = method_func() return {"cluster": cluster or cluster_manager.list_names()[0], "result": result} except Exception as e: return {"error": str(e)} # --- Health Check Endpoint --- async def health(request): """Health check endpoint - bypasses transport security.""" # Optionally verify cluster connectivity if not cluster_manager.clusters: return JSONResponse({"status": "degraded", "error": "No clusters configured"}, status_code=503) return JSONResponse({"status": "ok"}) # --- ASGI Application --- def create_app(): """Create the ASGI application with health check and MCP routes.""" mcp_app = mcp.http_app() # Wrapper app: /health is standalone, everything else goes to MCP # IMPORTANT: Must pass mcp_app.lifespan for task group initialization routes = [ Route("/health", health, methods=["GET"]), Mount("/", app=mcp_app), # MCP handles /mcp endpoint ] return Starlette(routes=routes, lifespan=mcp_app.lifespan) # Create the app instance for uvicorn app = create_app() if __name__ == "__main__": import uvicorn # Run the wrapper app (includes /health and /mcp endpoints) uvicorn.run(app, host="0.0.0.0", port=8000)