import os import json import logging import httpx from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Route, Mount from dotenv import load_dotenv # Load environment variables load_dotenv() # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Configuration --- KOMODO_URL = os.getenv("KOMODO_URL", "").rstrip("/") KOMODO_API_KEY = os.getenv("KOMODO_API_KEY", "") KOMODO_API_SECRET = os.getenv("KOMODO_API_SECRET", "") class KomodoClient: """HTTP client for Komodo Core API.""" def __init__(self, url: str, api_key: str, api_secret: str): self.url = url self.headers = { "Content-Type": "application/json", "X-Api-Key": api_key, "X-Api-Secret": api_secret, } self._client = httpx.Client(timeout=30.0) def _request(self, endpoint: str, request_type: str, params: dict = None) -> dict: """Make a request to Komodo API. Args: endpoint: API endpoint (read, write, execute) request_type: Request type name (e.g., 'ListServers', 'GetDeployment') params: Request parameters """ url = f"{self.url}/{endpoint}" body = {"type": request_type, "params": params or {}} try: response = self._client.post(url, json=body, headers=self.headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: return {"error": f"HTTP {e.response.status_code}: {e.response.text}"} except Exception as e: return {"error": str(e)} def read(self, request_type: str, params: dict = None) -> dict: """Make a read request.""" return self._request("read", request_type, params) def write(self, request_type: str, params: dict = None) -> dict: """Make a write request.""" return self._request("write", request_type, params) def execute(self, request_type: str, params: dict = None) -> dict: """Make an execute request.""" return self._request("execute", request_type, params) # --- Initialize Client --- def _get_client() -> tuple[KomodoClient | None, dict | None]: """Get Komodo client or return error dict.""" if not KOMODO_URL: return None, {"error": "KOMODO_URL not configured"} if not KOMODO_API_KEY or not KOMODO_API_SECRET: return None, {"error": "KOMODO_API_KEY and KOMODO_API_SECRET required"} return KomodoClient(KOMODO_URL, KOMODO_API_KEY, KOMODO_API_SECRET), None # --- MCP Server --- # Note: Host validation is handled at the reverse proxy/ingress level # The /health endpoint bypasses this for Docker health checks mcp = FastMCP("Komodo MCP") # --- API Documentation Resource --- API_DOCS = """# Komodo Core API Reference ## Endpoints All requests use POST method with JSON body: `{"type": "RequestType", "params": {...}}` ### /read - Get Data | Request Type | Description | Params | |-------------|-------------|--------| | `ListServers` | List all servers | `{}` | | `GetServer` | Get server details | `{"server": "name_or_id"}` | | `ListDeployments` | List deployments | `{}` | | `GetDeployment` | Get deployment details | `{"deployment": "name_or_id"}` | | `GetDeploymentContainer` | Get container info | `{"deployment": "name_or_id"}` | | `GetDeploymentLog` | Get container logs | `{"deployment": "name_or_id", "tail": 100}` | | `ListStacks` | List stacks | `{}` | | `GetStack` | Get stack details | `{"stack": "name_or_id"}` | | `GetDockerContainersSummary` | Container summary for server | `{"server": "name_or_id"}` | | `ListBuilds` | List builds | `{}` | | `ListRepos` | List repositories | `{}` | | `ListProcedures` | List procedures | `{}` | | `GetCoreInfo` | Get Komodo Core info | `{}` | ### /execute - Run Actions | Request Type | Description | Params | |-------------|-------------|--------| | `Deploy` | Deploy a deployment | `{"deployment": "name_or_id"}` | | `StartContainer` | Start container | `{"deployment": "name_or_id"}` | | `StopContainer` | Stop container | `{"deployment": "name_or_id"}` | | `PauseContainer` | Pause container | `{"deployment": "name_or_id"}` | | `RestartContainer` | Restart container | `{"deployment": "name_or_id"}` | | `DeployStack` | Deploy stack | `{"stack": "name_or_id"}` | | `StartStack` | Start stack | `{"stack": "name_or_id"}` | | `StopStack` | Stop stack | `{"stack": "name_or_id"}` | | `RestartStack` | Restart stack | `{"stack": "name_or_id"}` | | `RunBuild` | Run a build | `{"build": "name_or_id"}` | | `RunProcedure` | Run a procedure | `{"procedure": "name_or_id"}` | ### /write - Modify Resources | Request Type | Description | Params | |-------------|-------------|--------| | `CreateDeployment` | Create deployment | `{"name": "...", "config": {...}}` | | `UpdateDeployment` | Update deployment | `{"id": "...", "config": {...}}` | | `DeleteDeployment` | Delete deployment | `{"id": "..."}` | | `CreateStack` | Create stack | `{"name": "...", "config": {...}}` | | `UpdateStack` | Update stack | `{"id": "...", "config": {...}}` | | `DeleteStack` | Delete stack | `{"id": "..."}` | ## Full Documentation https://docs.rs/komodo_client/latest/komodo_client/api/index.html """ @mcp.resource("komodo://api-reference") def get_api_reference() -> str: """Komodo API reference documentation.""" return API_DOCS # --- MCP Tools --- # Note: Return str (JSON) instead of dict to work around FastMCP/Gemini CLI # schema compatibility issue with additionalProperties @mcp.tool() def list_servers() -> str: """Lists all servers (periphery nodes) connected to Komodo. Returns server names, IDs, status, and system information as JSON. """ client, error = _get_client() if error: return json.dumps(error) return json.dumps(client.read("ListServers")) @mcp.tool() def list_deployments() -> str: """Lists all deployments (single container applications). Deployments are Docker containers managed by Komodo. Returns JSON. """ client, error = _get_client() if error: return json.dumps(error) return json.dumps(client.read("ListDeployments")) @mcp.tool() def list_stacks() -> str: """Lists all stacks (docker-compose applications). Stacks are multi-container applications defined by docker-compose files. Returns JSON. """ client, error = _get_client() if error: return json.dumps(error) return json.dumps(client.read("ListStacks")) @mcp.tool() def get_container_status(deployment: str) -> str: """Gets the status of a specific container. Args: deployment: Deployment name or ID Returns container state, image, ports, and resource usage as JSON. """ client, error = _get_client() if error: return json.dumps(error) return json.dumps(client.read("GetDeploymentContainer", {"deployment": deployment})) @mcp.tool() def komodo_api_call( endpoint: str, request_type: str, params: str = "{}" ) -> str: """Execute a raw Komodo API call. Use the komodo://api-reference resource for available request types. Args: endpoint: API endpoint - 'read', 'write', or 'execute' request_type: Request type name (e.g., 'GetServer', 'Deploy', 'ListBuilds') params: Request parameters as a JSON string (default: "{}") Examples: - Get server: endpoint='read', request_type='GetServer', params='{"server": "my-server"}' - Deploy: endpoint='execute', request_type='Deploy', params='{"deployment": "my-app"}' - List builds: endpoint='read', request_type='ListBuilds', params='{}' """ client, error = _get_client() if error: return json.dumps(error) if endpoint not in ("read", "write", "execute"): return json.dumps({"error": f"Invalid endpoint '{endpoint}'. Use 'read', 'write', or 'execute'."}) try: parsed_params = json.loads(params) if params else {} except json.JSONDecodeError as e: return json.dumps({"error": f"Invalid JSON in params: {e}"}) method = getattr(client, endpoint) return json.dumps(method(request_type, parsed_params)) # --- Health Check Endpoint --- async def health(request): """Health check endpoint for Docker.""" client, error = _get_client() if error: return JSONResponse({"status": "degraded", "error": error["error"]}, status_code=503) return JSONResponse({"status": "ok"}) # --- ASGI Application --- def create_app(): """Create the ASGI application with health check and MCP routes.""" mcp_app = mcp.http_app() # Wrapper app: /health is standalone, everything else goes to MCP # IMPORTANT: Must pass mcp_app.lifespan for task group initialization routes = [ Route("/health", health, methods=["GET"]), Mount("/", app=mcp_app), # MCP handles /mcp endpoint ] return Starlette(routes=routes, lifespan=mcp_app.lifespan) # Create the app instance for uvicorn app = create_app() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)