mirror of
https://github.com/b3nw/nginx-proxy-manager-mcp.git
synced 2026-06-15 09:09:41 -05:00
feat: Initial NPM MCP server implementation
- NpmClient with JWT auth and auto-refresh - FastMCP server with 5 tools (list_proxy_hosts, get_proxy_host_details, get_system_health, search_audit_logs, list_certificates) - Docker support with multi-stage build using uv - Supports stdio and streamable HTTP transports - GitHub Actions for tests and Docker builds
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
"""NPM MCP Server package."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
@@ -0,0 +1,261 @@
|
||||
"""Async HTTP client for Nginx Proxy Manager API."""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import settings
|
||||
from .exceptions import (
|
||||
NpmApiError,
|
||||
NpmAuthenticationError,
|
||||
NpmConnectionError,
|
||||
NpmNotFoundError,
|
||||
)
|
||||
from .models import (
|
||||
AuditLogEntry,
|
||||
Certificate,
|
||||
HealthStatus,
|
||||
ProxyHost,
|
||||
Setting,
|
||||
TokenResponse,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NpmClient:
|
||||
"""Async client for interacting with the NPM API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str | None = None,
|
||||
identity: str | None = None,
|
||||
secret: str | None = None,
|
||||
timeout: float = 30.0,
|
||||
):
|
||||
"""Initialize the NPM client.
|
||||
|
||||
Args:
|
||||
base_url: NPM API base URL (defaults to settings)
|
||||
identity: NPM user email (defaults to settings)
|
||||
secret: NPM user password (defaults to settings)
|
||||
timeout: Request timeout in seconds
|
||||
"""
|
||||
self.base_url = (base_url or settings.api_url).rstrip("/")
|
||||
self._identity = identity or settings.identity
|
||||
self._secret = secret or settings.secret
|
||||
self._token: str | None = None
|
||||
self._token_expires: datetime | None = None
|
||||
self._client = httpx.AsyncClient(timeout=timeout)
|
||||
|
||||
async def __aenter__(self) -> "NpmClient":
|
||||
"""Async context manager entry."""
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args) -> None:
|
||||
"""Async context manager exit."""
|
||||
await self.close()
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the HTTP client."""
|
||||
await self._client.aclose()
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Authentication
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def login(self) -> TokenResponse:
|
||||
"""Authenticate with NPM and obtain a JWT token.
|
||||
|
||||
Returns:
|
||||
TokenResponse with token and expiration
|
||||
|
||||
Raises:
|
||||
NpmAuthenticationError: If credentials are invalid
|
||||
NpmConnectionError: If NPM is unreachable
|
||||
"""
|
||||
if not self._identity or not self._secret:
|
||||
raise NpmAuthenticationError("NPM_IDENTITY and NPM_SECRET must be configured")
|
||||
|
||||
try:
|
||||
response = await self._client.post(
|
||||
f"{self.base_url}/tokens",
|
||||
json={"identity": self._identity, "secret": self._secret},
|
||||
)
|
||||
except httpx.ConnectError as e:
|
||||
raise NpmConnectionError(f"Failed to connect to NPM at {self.base_url}: {e}") from e
|
||||
except httpx.TimeoutException as e:
|
||||
raise NpmConnectionError(f"Connection to NPM timed out: {e}") from e
|
||||
|
||||
if response.status_code == 401:
|
||||
raise NpmAuthenticationError("Invalid credentials")
|
||||
|
||||
if response.status_code != 200:
|
||||
raise NpmApiError(
|
||||
f"Login failed: {response.text}", status_code=response.status_code
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
token_response = TokenResponse(**data)
|
||||
|
||||
self._token = token_response.token
|
||||
self._token_expires = token_response.expires
|
||||
|
||||
logger.info("Successfully authenticated with NPM")
|
||||
return token_response
|
||||
|
||||
def _is_token_valid(self) -> bool:
|
||||
"""Check if the current token is still valid (with 1 min buffer)."""
|
||||
if not self._token or not self._token_expires:
|
||||
return False
|
||||
buffer = timedelta(minutes=1)
|
||||
return datetime.now(UTC) < (self._token_expires - buffer)
|
||||
|
||||
async def _ensure_authenticated(self) -> None:
|
||||
"""Ensure we have a valid token, refreshing if needed."""
|
||||
if not self._is_token_valid():
|
||||
await self.login()
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Base Request Handler
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def _request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
**kwargs,
|
||||
) -> httpx.Response:
|
||||
"""Make an authenticated request to the NPM API.
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, PUT, DELETE)
|
||||
endpoint: API endpoint path (e.g., "/proxy-hosts")
|
||||
**kwargs: Additional arguments passed to httpx
|
||||
|
||||
Returns:
|
||||
httpx.Response object
|
||||
|
||||
Raises:
|
||||
NpmAuthenticationError: If authentication fails
|
||||
NpmConnectionError: If NPM is unreachable
|
||||
NpmNotFoundError: If resource not found
|
||||
NpmApiError: For other API errors
|
||||
"""
|
||||
await self._ensure_authenticated()
|
||||
|
||||
headers = kwargs.pop("headers", {})
|
||||
headers["Authorization"] = f"Bearer {self._token}"
|
||||
|
||||
url = f"{self.base_url}{endpoint}"
|
||||
|
||||
try:
|
||||
response = await self._client.request(method, url, headers=headers, **kwargs)
|
||||
except httpx.ConnectError as e:
|
||||
raise NpmConnectionError(f"Failed to connect to NPM: {e}") from e
|
||||
except httpx.TimeoutException as e:
|
||||
raise NpmConnectionError(f"Request to NPM timed out: {e}") from e
|
||||
|
||||
# Handle 401 - try to re-authenticate once
|
||||
if response.status_code == 401:
|
||||
logger.info("Token expired, re-authenticating...")
|
||||
await self.login()
|
||||
headers["Authorization"] = f"Bearer {self._token}"
|
||||
response = await self._client.request(method, url, headers=headers, **kwargs)
|
||||
|
||||
if response.status_code == 401:
|
||||
raise NpmAuthenticationError("Re-authentication failed")
|
||||
|
||||
# Handle other error responses
|
||||
if response.status_code == 404:
|
||||
raise NpmNotFoundError(f"Resource not found: {endpoint}")
|
||||
|
||||
if response.status_code >= 400:
|
||||
raise NpmApiError(
|
||||
f"API error: {response.text}", status_code=response.status_code
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# API Endpoints
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def get_status(self) -> HealthStatus:
|
||||
"""Get NPM health/status information."""
|
||||
# Status endpoint doesn't require auth
|
||||
try:
|
||||
response = await self._client.get(f"{self.base_url.replace('/api', '')}/")
|
||||
return HealthStatus(status="online", version=response.json().get("version"))
|
||||
except Exception:
|
||||
# Fallback - try authenticated endpoint
|
||||
await self._ensure_authenticated()
|
||||
return HealthStatus(status="online")
|
||||
|
||||
async def get_proxy_hosts(self, expand: str = "owner,certificate") -> list[ProxyHost]:
|
||||
"""Get all proxy hosts.
|
||||
|
||||
Args:
|
||||
expand: Comma-separated list of relations to expand
|
||||
|
||||
Returns:
|
||||
List of ProxyHost objects
|
||||
"""
|
||||
response = await self._request("GET", "/nginx/proxy-hosts", params={"expand": expand})
|
||||
data = response.json()
|
||||
return [ProxyHost(**host) for host in data]
|
||||
|
||||
async def get_proxy_host(self, host_id: int, expand: str = "owner,certificate") -> ProxyHost:
|
||||
"""Get a specific proxy host by ID.
|
||||
|
||||
Args:
|
||||
host_id: The proxy host ID
|
||||
expand: Comma-separated list of relations to expand
|
||||
|
||||
Returns:
|
||||
ProxyHost object
|
||||
"""
|
||||
response = await self._request(
|
||||
"GET", f"/nginx/proxy-hosts/{host_id}", params={"expand": expand}
|
||||
)
|
||||
return ProxyHost(**response.json())
|
||||
|
||||
async def get_certificates(self) -> list[Certificate]:
|
||||
"""Get all SSL certificates."""
|
||||
response = await self._request("GET", "/nginx/certificates")
|
||||
data = response.json()
|
||||
return [Certificate(**cert) for cert in data]
|
||||
|
||||
async def get_certificate(self, cert_id: int) -> Certificate:
|
||||
"""Get a specific certificate by ID."""
|
||||
response = await self._request("GET", f"/nginx/certificates/{cert_id}")
|
||||
return Certificate(**response.json())
|
||||
|
||||
async def get_settings(self) -> list[Setting]:
|
||||
"""Get all NPM settings."""
|
||||
response = await self._request("GET", "/settings")
|
||||
data = response.json()
|
||||
return [Setting(**s) for s in data]
|
||||
|
||||
async def get_audit_log(
|
||||
self,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> list[AuditLogEntry]:
|
||||
"""Get audit log entries.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of entries to return
|
||||
offset: Offset for pagination
|
||||
|
||||
Returns:
|
||||
List of AuditLogEntry objects
|
||||
"""
|
||||
response = await self._request(
|
||||
"GET",
|
||||
"/audit-log",
|
||||
params={"limit": limit, "offset": offset},
|
||||
)
|
||||
data = response.json()
|
||||
return [AuditLogEntry(**entry) for entry in data]
|
||||
@@ -0,0 +1,27 @@
|
||||
"""Configuration management using pydantic-settings."""
|
||||
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Application settings loaded from environment variables."""
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_prefix="NPM_",
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
extra="ignore",
|
||||
)
|
||||
|
||||
# NPM API Configuration
|
||||
api_url: str = "http://localhost:81/api"
|
||||
identity: str = ""
|
||||
secret: str = ""
|
||||
|
||||
# MCP Server Configuration
|
||||
mcp_host: str = "0.0.0.0"
|
||||
mcp_port: int = 8000
|
||||
mcp_transport: str = "stdio" # "stdio" or "http"
|
||||
|
||||
|
||||
settings = Settings()
|
||||
@@ -0,0 +1,33 @@
|
||||
"""Custom exceptions for NPM API client."""
|
||||
|
||||
|
||||
class NpmClientError(Exception):
|
||||
"""Base exception for NPM client errors."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NpmAuthenticationError(NpmClientError):
|
||||
"""Raised when authentication fails."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NpmConnectionError(NpmClientError):
|
||||
"""Raised when connection to NPM fails."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NpmNotFoundError(NpmClientError):
|
||||
"""Raised when a resource is not found."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NpmApiError(NpmClientError):
|
||||
"""Raised for general API errors."""
|
||||
|
||||
def __init__(self, message: str, status_code: int | None = None):
|
||||
super().__init__(message)
|
||||
self.status_code = status_code
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Entry point for npm-mcp server."""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from .config import settings
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
"""Configure logging for the server."""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
stream=sys.stderr,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for the MCP server."""
|
||||
parser = argparse.ArgumentParser(description="NPM MCP Server")
|
||||
parser.add_argument(
|
||||
"--transport",
|
||||
choices=["stdio", "http"],
|
||||
default=settings.mcp_transport,
|
||||
help="Transport mode (default: from NPM_MCP_TRANSPORT env or 'stdio')",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
default=settings.mcp_host,
|
||||
help="Host for HTTP transport (default: 0.0.0.0)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
type=int,
|
||||
default=settings.mcp_port,
|
||||
help="Port for HTTP transport (default: 8000)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
setup_logging()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Import server here to avoid circular imports
|
||||
from .server import mcp
|
||||
|
||||
if args.transport == "stdio":
|
||||
logger.info("Starting MCP server in stdio mode")
|
||||
mcp.run(transport="stdio")
|
||||
else:
|
||||
logger.info(f"Starting MCP server in HTTP mode on {settings.mcp_host}:{settings.mcp_port}")
|
||||
# Use FastMCP's native streamable-http transport (host/port configured in server.py)
|
||||
mcp.run(transport="streamable-http")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
"""Pydantic models for NPM API responses."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class TokenResponse(BaseModel):
|
||||
"""Response from the login/token endpoint."""
|
||||
|
||||
token: str
|
||||
expires: datetime
|
||||
|
||||
|
||||
class UserMeta(BaseModel):
|
||||
"""User metadata in responses."""
|
||||
|
||||
nickname: str | None = None
|
||||
avatar: str | None = None
|
||||
|
||||
|
||||
class Owner(BaseModel):
|
||||
"""Proxy host owner information."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
is_disabled: bool
|
||||
email: str
|
||||
name: str
|
||||
nickname: str
|
||||
avatar: str
|
||||
roles: list[str]
|
||||
|
||||
|
||||
class Certificate(BaseModel):
|
||||
"""SSL Certificate information."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
owner_user_id: int
|
||||
provider: str
|
||||
nice_name: str
|
||||
domain_names: list[str]
|
||||
expires_on: datetime | None = None
|
||||
meta: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class ProxyHostLocation(BaseModel):
|
||||
"""Custom location configuration for a proxy host."""
|
||||
|
||||
path: str
|
||||
forward_host: str | None = None
|
||||
forward_port: int | None = None
|
||||
forward_scheme: str | None = None
|
||||
advanced_config: str = ""
|
||||
|
||||
|
||||
class ProxyHost(BaseModel):
|
||||
"""Proxy host configuration."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
owner_user_id: int
|
||||
domain_names: list[str]
|
||||
forward_host: str
|
||||
forward_port: int
|
||||
forward_scheme: str = "http"
|
||||
certificate_id: int | None = None
|
||||
ssl_forced: bool = False
|
||||
hsts_enabled: bool = False
|
||||
hsts_subdomains: bool = False
|
||||
http2_support: bool = False
|
||||
block_exploits: bool = False
|
||||
caching_enabled: bool = False
|
||||
allow_websocket_upgrade: bool = False
|
||||
access_list_id: int = 0
|
||||
advanced_config: str = ""
|
||||
enabled: bool = True
|
||||
meta: dict[str, Any] = Field(default_factory=dict)
|
||||
locations: list[ProxyHostLocation] = Field(default_factory=list)
|
||||
# Optional expanded relations
|
||||
owner: Owner | None = None
|
||||
certificate: Certificate | None = None
|
||||
|
||||
|
||||
class ProxyHostSummary(BaseModel):
|
||||
"""Summarized proxy host for list responses."""
|
||||
|
||||
id: int
|
||||
domain_names: list[str]
|
||||
forward_host: str
|
||||
forward_port: int
|
||||
forward_scheme: str
|
||||
enabled: bool
|
||||
ssl_forced: bool
|
||||
certificate_id: int | None = None
|
||||
|
||||
|
||||
class HealthStatus(BaseModel):
|
||||
"""System health/status response."""
|
||||
|
||||
status: str
|
||||
version: dict[str, str] | None = None
|
||||
|
||||
|
||||
class Setting(BaseModel):
|
||||
"""NPM setting entry."""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
description: str
|
||||
value: Any
|
||||
meta: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class AuditLogEntry(BaseModel):
|
||||
"""Audit log entry."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
user_id: int
|
||||
object_type: str
|
||||
object_id: int
|
||||
action: str
|
||||
meta: dict[str, Any] = Field(default_factory=dict)
|
||||
@@ -0,0 +1,263 @@
|
||||
"""MCP Server implementation for Nginx Proxy Manager."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from .client import NpmClient
|
||||
from .config import settings
|
||||
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Create global client instance (lazy initialization)
|
||||
_client: NpmClient | None = None
|
||||
|
||||
|
||||
def get_client() -> NpmClient:
|
||||
"""Get or create the NPM client instance."""
|
||||
global _client
|
||||
if _client is None:
|
||||
_client = NpmClient()
|
||||
return _client
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(server: FastMCP):
|
||||
"""Manage client lifecycle."""
|
||||
global _client
|
||||
_client = NpmClient()
|
||||
logger.info(f"NPM MCP Server starting, connecting to {settings.api_url}")
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if _client:
|
||||
await _client.close()
|
||||
_client = None
|
||||
logger.info("NPM MCP Server stopped")
|
||||
|
||||
|
||||
# Initialize FastMCP server
|
||||
mcp = FastMCP(
|
||||
"npm-mcp",
|
||||
instructions="MCP server for Nginx Proxy Manager - manage reverse proxy configurations",
|
||||
lifespan=lifespan,
|
||||
host=settings.mcp_host,
|
||||
port=settings.mcp_port,
|
||||
)
|
||||
|
||||
|
||||
def _format_error(e: Exception) -> str:
|
||||
"""Format exception for tool response."""
|
||||
if isinstance(e, NpmAuthenticationError):
|
||||
return f"Authentication failed: {e}"
|
||||
elif isinstance(e, NpmConnectionError):
|
||||
return f"Connection error: {e}"
|
||||
elif isinstance(e, NpmApiError):
|
||||
return f"API error: {e}"
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tools
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_proxy_hosts() -> str:
|
||||
"""List all proxy hosts configured in Nginx Proxy Manager.
|
||||
|
||||
Returns a summary of all proxy hosts including their domains,
|
||||
forward destinations, and SSL status.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
hosts = await client.get_proxy_hosts()
|
||||
|
||||
if not hosts:
|
||||
return "No proxy hosts configured."
|
||||
|
||||
result = []
|
||||
for host in hosts:
|
||||
domains = ", ".join(host.domain_names)
|
||||
ssl_status = "🔒 SSL" if host.ssl_forced else "🔓 HTTP"
|
||||
enabled_status = "✅" if host.enabled else "❌"
|
||||
|
||||
result.append(
|
||||
f"{enabled_status} [{host.id}] {domains}\n"
|
||||
f" → {host.forward_scheme}://{host.forward_host}:{host.forward_port} {ssl_status}"
|
||||
)
|
||||
|
||||
return f"Found {len(hosts)} proxy host(s):\n\n" + "\n\n".join(result)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_proxy_host_details(host_id: int) -> str:
|
||||
"""Get detailed configuration for a specific proxy host.
|
||||
|
||||
Args:
|
||||
host_id: The ID of the proxy host to retrieve
|
||||
|
||||
Returns full configuration including SSL settings, locations,
|
||||
and advanced configuration.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
host = await client.get_proxy_host(host_id)
|
||||
|
||||
details: dict[str, Any] = {
|
||||
"id": host.id,
|
||||
"domains": host.domain_names,
|
||||
"forward": {
|
||||
"scheme": host.forward_scheme,
|
||||
"host": host.forward_host,
|
||||
"port": host.forward_port,
|
||||
},
|
||||
"enabled": host.enabled,
|
||||
"ssl": {
|
||||
"forced": host.ssl_forced,
|
||||
"certificate_id": host.certificate_id,
|
||||
"hsts_enabled": host.hsts_enabled,
|
||||
"http2_support": host.http2_support,
|
||||
},
|
||||
"security": {
|
||||
"block_exploits": host.block_exploits,
|
||||
"access_list_id": host.access_list_id,
|
||||
},
|
||||
"performance": {
|
||||
"caching_enabled": host.caching_enabled,
|
||||
"allow_websocket_upgrade": host.allow_websocket_upgrade,
|
||||
},
|
||||
"created_on": host.created_on.isoformat(),
|
||||
"modified_on": host.modified_on.isoformat(),
|
||||
}
|
||||
|
||||
if host.advanced_config:
|
||||
details["advanced_config"] = host.advanced_config
|
||||
|
||||
if host.locations:
|
||||
details["locations"] = [
|
||||
{
|
||||
"path": loc.path,
|
||||
"forward_host": loc.forward_host,
|
||||
"forward_port": loc.forward_port,
|
||||
}
|
||||
for loc in host.locations
|
||||
]
|
||||
|
||||
if host.owner:
|
||||
details["owner"] = host.owner.name
|
||||
|
||||
return json.dumps(details, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_system_health() -> str:
|
||||
"""Check the health and status of the Nginx Proxy Manager instance.
|
||||
|
||||
Returns system status, version information, and connectivity status.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
status = await client.get_status()
|
||||
|
||||
result = [f"Status: {status.status}"]
|
||||
|
||||
if status.version:
|
||||
result.append(f"Version: {status.version}")
|
||||
|
||||
# Test authentication by getting proxy hosts (lower permission requirement)
|
||||
try:
|
||||
await client._ensure_authenticated()
|
||||
result.append("Authenticated: ✅")
|
||||
|
||||
# Try to get settings (admin only)
|
||||
try:
|
||||
settings_list = await client.get_settings()
|
||||
result.append(f"Admin access: ✅ ({len(settings_list)} settings)")
|
||||
except NpmApiError:
|
||||
result.append("Admin access: ❌ (limited permissions)")
|
||||
except NpmAuthenticationError:
|
||||
result.append("Authenticated: ❌ (check credentials)")
|
||||
|
||||
return "\n".join(result)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def search_audit_logs(limit: int = 50, offset: int = 0) -> str:
|
||||
"""Search the audit log for recent actions in Nginx Proxy Manager.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of entries to return (default: 50, max: 100)
|
||||
offset: Number of entries to skip for pagination (default: 0)
|
||||
|
||||
Returns recent audit log entries showing user actions and changes.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
limit = min(limit, 100) # Cap at 100
|
||||
entries = await client.get_audit_log(limit=limit, offset=offset)
|
||||
|
||||
if not entries:
|
||||
return "No audit log entries found."
|
||||
|
||||
result = []
|
||||
for entry in entries:
|
||||
timestamp = entry.created_on.strftime("%Y-%m-%d %H:%M:%S")
|
||||
result.append(
|
||||
f"[{timestamp}] User {entry.user_id}: "
|
||||
f"{entry.action} {entry.object_type} #{entry.object_id}"
|
||||
)
|
||||
|
||||
header = f"Audit log entries ({len(entries)} of {limit} requested, offset {offset}):\n"
|
||||
return header + "\n".join(result)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_certificates() -> str:
|
||||
"""List all SSL certificates managed by Nginx Proxy Manager.
|
||||
|
||||
Returns a summary of all certificates including their domains,
|
||||
provider, and expiration dates.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
certs = await client.get_certificates()
|
||||
|
||||
if not certs:
|
||||
return "No certificates configured."
|
||||
|
||||
result = []
|
||||
for cert in certs:
|
||||
domains = ", ".join(cert.domain_names[:3])
|
||||
if len(cert.domain_names) > 3:
|
||||
domains += f" (+{len(cert.domain_names) - 3} more)"
|
||||
|
||||
expiry = ""
|
||||
if cert.expires_on:
|
||||
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
|
||||
|
||||
result.append(
|
||||
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n"
|
||||
f" Domains: {domains}{expiry}"
|
||||
)
|
||||
|
||||
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
Reference in New Issue
Block a user