Files
komodo-mcp-custom/server.py
Ben 9b1b563a35
All checks were successful
Build and Push Komodo MCP Docker Image / build (push) Successful in 8s
fix: accept both str and dict params in komodo_api_call
Resolves issue #1 - allows params parameter to accept either JSON string
or dict type, fixing type validation errors when calling the function.
2025-12-26 20:17:00 +00:00

276 lines
9.2 KiB
Python

import os
import json
import logging
from typing import Any
import httpx
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Configuration ---
KOMODO_URL = os.getenv("KOMODO_URL", "").rstrip("/")
KOMODO_API_KEY = os.getenv("KOMODO_API_KEY", "")
KOMODO_API_SECRET = os.getenv("KOMODO_API_SECRET", "")
class KomodoClient:
"""HTTP client for Komodo Core API."""
def __init__(self, url: str, api_key: str, api_secret: str):
self.url = url
self.headers = {
"Content-Type": "application/json",
"X-Api-Key": api_key,
"X-Api-Secret": api_secret,
}
self._client = httpx.Client(timeout=30.0)
def _request(self, endpoint: str, request_type: str, params: dict = None) -> dict:
"""Make a request to Komodo API.
Args:
endpoint: API endpoint (read, write, execute)
request_type: Request type name (e.g., 'ListServers', 'GetDeployment')
params: Request parameters
"""
url = f"{self.url}/{endpoint}"
body = {"type": request_type, "params": params or {}}
try:
response = self._client.post(url, json=body, headers=self.headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}: {e.response.text}"}
except Exception as e:
return {"error": str(e)}
def read(self, request_type: str, params: dict = None) -> dict:
"""Make a read request."""
return self._request("read", request_type, params)
def write(self, request_type: str, params: dict = None) -> dict:
"""Make a write request."""
return self._request("write", request_type, params)
def execute(self, request_type: str, params: dict = None) -> dict:
"""Make an execute request."""
return self._request("execute", request_type, params)
# --- Initialize Client ---
def _get_client() -> tuple[KomodoClient | None, dict | None]:
"""Get Komodo client or return error dict."""
if not KOMODO_URL:
return None, {"error": "KOMODO_URL not configured"}
if not KOMODO_API_KEY or not KOMODO_API_SECRET:
return None, {"error": "KOMODO_API_KEY and KOMODO_API_SECRET required"}
return KomodoClient(KOMODO_URL, KOMODO_API_KEY, KOMODO_API_SECRET), None
# --- MCP Server ---
# Note: Host validation is handled at the reverse proxy/ingress level
# The /health endpoint bypasses this for Docker health checks
mcp = FastMCP("Komodo MCP")
# --- API Documentation Resource ---
API_DOCS = """# Komodo Core API Reference
## Endpoints
All requests use POST method with JSON body: `{"type": "RequestType", "params": {...}}`
### /read - Get Data
| Request Type | Description | Params |
|-------------|-------------|--------|
| `ListServers` | List all servers | `{}` |
| `GetServer` | Get server details | `{"server": "name_or_id"}` |
| `ListDeployments` | List deployments | `{}` |
| `GetDeployment` | Get deployment details | `{"deployment": "name_or_id"}` |
| `GetDeploymentContainer` | Get container info | `{"deployment": "name_or_id"}` |
| `GetDeploymentLog` | Get container logs | `{"deployment": "name_or_id", "tail": 100}` |
| `ListStacks` | List stacks | `{}` |
| `GetStack` | Get stack details | `{"stack": "name_or_id"}` |
| `GetDockerContainersSummary` | Container summary for server | `{"server": "name_or_id"}` |
| `ListBuilds` | List builds | `{}` |
| `ListRepos` | List repositories | `{}` |
| `ListProcedures` | List procedures | `{}` |
| `GetCoreInfo` | Get Komodo Core info | `{}` |
### /execute - Run Actions
| Request Type | Description | Params |
|-------------|-------------|--------|
| `Deploy` | Deploy a deployment | `{"deployment": "name_or_id"}` |
| `StartContainer` | Start container | `{"deployment": "name_or_id"}` |
| `StopContainer` | Stop container | `{"deployment": "name_or_id"}` |
| `PauseContainer` | Pause container | `{"deployment": "name_or_id"}` |
| `RestartContainer` | Restart container | `{"deployment": "name_or_id"}` |
| `DeployStack` | Deploy stack | `{"stack": "name_or_id"}` |
| `StartStack` | Start stack | `{"stack": "name_or_id"}` |
| `StopStack` | Stop stack | `{"stack": "name_or_id"}` |
| `RestartStack` | Restart stack | `{"stack": "name_or_id"}` |
| `RunBuild` | Run a build | `{"build": "name_or_id"}` |
| `RunProcedure` | Run a procedure | `{"procedure": "name_or_id"}` |
### /write - Modify Resources
| Request Type | Description | Params |
|-------------|-------------|--------|
| `CreateDeployment` | Create deployment | `{"name": "...", "config": {...}}` |
| `UpdateDeployment` | Update deployment | `{"id": "...", "config": {...}}` |
| `DeleteDeployment` | Delete deployment | `{"id": "..."}` |
| `CreateStack` | Create stack | `{"name": "...", "config": {...}}` |
| `UpdateStack` | Update stack | `{"id": "...", "config": {...}}` |
| `DeleteStack` | Delete stack | `{"id": "..."}` |
## Full Documentation
https://docs.rs/komodo_client/latest/komodo_client/api/index.html
"""
@mcp.resource("komodo://api-reference")
def get_api_reference() -> str:
"""Komodo API reference documentation."""
return API_DOCS
# --- MCP Tools ---
# Note: Return str (JSON) instead of dict to work around FastMCP/Gemini CLI
# schema compatibility issue with additionalProperties
@mcp.tool()
def list_servers() -> str:
"""Lists all servers (periphery nodes) connected to Komodo.
Returns server names, IDs, status, and system information as JSON.
"""
client, error = _get_client()
if error:
return json.dumps(error)
return json.dumps(client.read("ListServers"))
@mcp.tool()
def list_deployments() -> str:
"""Lists all deployments (single container applications).
Deployments are Docker containers managed by Komodo. Returns JSON.
"""
client, error = _get_client()
if error:
return json.dumps(error)
return json.dumps(client.read("ListDeployments"))
@mcp.tool()
def list_stacks() -> str:
"""Lists all stacks (docker-compose applications).
Stacks are multi-container applications defined by docker-compose files. Returns JSON.
"""
client, error = _get_client()
if error:
return json.dumps(error)
return json.dumps(client.read("ListStacks"))
@mcp.tool()
def get_container_status(deployment: str) -> str:
"""Gets the status of a specific container.
Args:
deployment: Deployment name or ID
Returns container state, image, ports, and resource usage as JSON.
"""
client, error = _get_client()
if error:
return json.dumps(error)
return json.dumps(client.read("GetDeploymentContainer", {"deployment": deployment}))
@mcp.tool()
def komodo_api_call(
endpoint: str, request_type: str, params: str | dict[str, Any] = "{}"
) -> str:
"""Execute a raw Komodo API call.
Use the komodo://api-reference resource for available request types.
Args:
endpoint: API endpoint - 'read', 'write', or 'execute'
request_type: Request type name (e.g., 'GetServer', 'Deploy', 'ListBuilds')
params: Request parameters as a JSON string or dict (default: "{}")
Examples:
- Get server: endpoint='read', request_type='GetServer', params='{"server": "my-server"}'
- Deploy: endpoint='execute', request_type='Deploy', params='{"deployment": "my-app"}'
- List builds: endpoint='read', request_type='ListBuilds', params='{}'
"""
client, error = _get_client()
if error:
return json.dumps(error)
if endpoint not in ("read", "write", "execute"):
return json.dumps(
{
"error": f"Invalid endpoint '{endpoint}'. Use 'read', 'write', or 'execute'."
}
)
try:
if isinstance(params, str):
parsed_params = json.loads(params) if params else {}
else:
parsed_params = params or {}
except json.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON in params: {e}"})
method = getattr(client, endpoint)
return json.dumps(method(request_type, parsed_params))
# --- Health Check Endpoint ---
async def health(request):
"""Health check endpoint for Docker."""
client, error = _get_client()
if error:
return JSONResponse(
{"status": "degraded", "error": error["error"]}, status_code=503
)
return JSONResponse({"status": "ok"})
# --- ASGI Application ---
def create_app():
"""Create the ASGI application with health check and MCP routes."""
mcp_app = mcp.http_app()
# Wrapper app: /health is standalone, everything else goes to MCP
# IMPORTANT: Must pass mcp_app.lifespan for task group initialization
routes = [
Route("/health", health, methods=["GET"]),
Mount("/", app=mcp_app), # MCP handles /mcp endpoint
]
return Starlette(routes=routes, lifespan=mcp_app.lifespan)
# Create the app instance for uvicorn
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)