import os import json import logging from pathlib import Path from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from proxmoxer import ProxmoxAPI from dotenv import load_dotenv # Load environment variables load_dotenv() # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- MCP Transport Security --- MCP_ALLOWED_HOSTS = os.getenv("MCP_ALLOWED_HOSTS", "localhost:*,127.0.0.1:*") # --- Cluster Configuration --- CLUSTERS_CONFIG_PATH = os.getenv("CLUSTERS_CONFIG_PATH", "/app/clusters.json") class ClusterManager: """Manages multiple Proxmox cluster connections.""" def __init__(self, config_path: str): self.clusters: dict[str, ProxmoxAPI] = {} self.config: dict = {} self._load_config(config_path) def _load_config(self, config_path: str) -> None: """Load cluster configuration from JSON file.""" path = Path(config_path) if not path.exists(): logger.warning(f"Clusters config not found at {config_path}") return try: with open(path) as f: self.config = json.load(f) clusters_config = self.config.get("clusters", {}) for name, cfg in clusters_config.items(): self._connect_cluster(name, cfg) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in clusters config: {e}") except Exception as e: logger.error(f"Failed to load clusters config: {e}") def _connect_cluster(self, name: str, cfg: dict) -> None: """Connect to a single Proxmox cluster.""" try: url = cfg.get("url") user = cfg.get("user") token_id = cfg.get("token_id") token_secret = cfg.get("token_secret") verify_ssl = cfg.get("verify_ssl", False) if not all([url, user, token_id, token_secret]): logger.warning(f"Cluster '{name}' missing required fields, skipping") return client = ProxmoxAPI( url, user=user, token_name=token_id, token_value=token_secret, verify_ssl=verify_ssl ) self.clusters[name] = client logger.info(f"Connected to cluster '{name}' ({url})") except Exception as e: logger.error(f"Failed to connect to cluster '{name}': {e}") def get(self, name: str = None) -> ProxmoxAPI | None: """Get a cluster client by name. If name is None and only one cluster, return it.""" if not self.clusters: return None if name: return self.clusters.get(name) # If only one cluster, return it as default if len(self.clusters) == 1: return next(iter(self.clusters.values())) return None def list_names(self) -> list[str]: """Return list of configured cluster names.""" return list(self.clusters.keys()) def require_cluster_param(self) -> bool: """Returns True if cluster parameter is required (multiple clusters configured).""" return len(self.clusters) > 1 # Initialize cluster manager cluster_manager = ClusterManager(CLUSTERS_CONFIG_PATH) # --- FastMCP Server --- transport_security = TransportSecuritySettings( enable_dns_rebinding_protection=True, allowed_hosts=[h.strip() for h in MCP_ALLOWED_HOSTS.split(",")], allowed_origins=[], ) mcp = FastMCP("Proxmox MCP", transport_security=transport_security) def _get_cluster_or_error(cluster: str = None) -> tuple[ProxmoxAPI | None, dict | None]: """Helper to get cluster client or return error dict.""" if not cluster_manager.clusters: return None, {"error": "No clusters configured. Mount clusters.json to /app/clusters.json"} if cluster_manager.require_cluster_param() and not cluster: return None, { "error": "Multiple clusters configured. Specify 'cluster' parameter.", "available_clusters": cluster_manager.list_names() } client = cluster_manager.get(cluster) if not client: return None, { "error": f"Cluster '{cluster}' not found", "available_clusters": cluster_manager.list_names() } return client, None @mcp.tool() def list_clusters() -> dict: """Lists all configured Proxmox clusters.""" names = cluster_manager.list_names() if not names: return {"error": "No clusters configured", "clusters": []} return {"clusters": names} @mcp.tool() def list_nodes(cluster: str = None) -> dict: """Lists all nodes in a Proxmox cluster. Args: cluster: Cluster name (optional if only one cluster configured) """ client, error = _get_cluster_or_error(cluster) if error: return error try: nodes = client.nodes.get() return {"cluster": cluster or cluster_manager.list_names()[0], "nodes": nodes} except Exception as e: return {"error": str(e)} @mcp.tool() def get_cluster_resources(cluster: str = None) -> dict: """Gets a summary of all resources in a Proxmox cluster. Args: cluster: Cluster name (optional if only one cluster configured) """ client, error = _get_cluster_or_error(cluster) if error: return error try: resources = client.cluster.resources.get() return {"cluster": cluster or cluster_manager.list_names()[0], "resources": resources} except Exception as e: return {"error": str(e)} @mcp.tool() def proxmox_api_call( path: str, method: str = "GET", data: dict = None, cluster: str = None, node: str = None, vmid: int = None, lxcid: int = None ) -> dict: """Executes a raw Proxmox API call. Args: path: API path (e.g., 'nodes/pve1/qemu/100/status/start') method: HTTP method (GET, POST, PUT, DELETE) data: Optional JSON payload for POST/PUT cluster: Cluster name (optional if only one cluster configured) node: Node name (for path substitution) vmid: VM ID (for path substitution) lxcid: LXC container ID (for path substitution) """ client, error = _get_cluster_or_error(cluster) if error: return error if data is None: data = {} try: # Build path api_path = client for segment in path.strip('/').split('/'): if segment == "nodes" and node: api_path = api_path.nodes(node) elif segment == "qemu" and vmid: api_path = api_path.qemu(vmid) elif segment == "lxc" and lxcid: api_path = api_path.lxc(lxcid) elif segment: api_path = api_path(segment) # Execute method_func = getattr(api_path, method.lower()) if method.upper() in ["POST", "PUT"]: result = method_func(**data) else: result = method_func() return {"cluster": cluster or cluster_manager.list_names()[0], "result": result} except Exception as e: return {"error": str(e)} if __name__ == "__main__": import uvicorn uvicorn.run(mcp.sse_app, host="0.0.0.0", port=8000)