feat: Add proxy host creation and access list tools (#1)

Add MCP tools for creating proxy hosts and listing access lists, with configurable defaults via environment variable.

New tools:
- create_proxy_host: Create new proxy hosts with SSL, access control, and websocket support
- list_access_lists: List available access lists for use in proxy host creation

New features:
- NPM_PROXY_DEFAULTS env var for configurable default values (certificate_id, access_list_id, ssl settings, etc.)
This commit is contained in:
b3nw
2025-12-24 15:20:45 -06:00
committed by GitHub
parent 37ad76f012
commit 52eb484432
9 changed files with 491 additions and 12 deletions
+70 -6
View File
@@ -13,6 +13,7 @@ from .exceptions import (
NpmNotFoundError,
)
from .models import (
AccessList,
AuditLogEntry,
Certificate,
HealthStatus,
@@ -92,9 +93,7 @@ class NpmClient:
raise NpmAuthenticationError("Invalid credentials")
if response.status_code != 200:
raise NpmApiError(
f"Login failed: {response.text}", status_code=response.status_code
)
raise NpmApiError(f"Login failed: {response.text}", status_code=response.status_code)
data = response.json()
token_response = TokenResponse(**data)
@@ -172,9 +171,7 @@ class NpmClient:
raise NpmNotFoundError(f"Resource not found: {endpoint}")
if response.status_code >= 400:
raise NpmApiError(
f"API error: {response.text}", status_code=response.status_code
)
raise NpmApiError(f"API error: {response.text}", status_code=response.status_code)
return response
@@ -259,3 +256,70 @@ class NpmClient:
)
data = response.json()
return [AuditLogEntry(**entry) for entry in data]
async def get_access_lists(self) -> list[AccessList]:
"""Get all access lists."""
response = await self._request("GET", "/nginx/access-lists")
data = response.json()
return [AccessList(**item) for item in data]
async def create_proxy_host(
self,
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str = "http",
certificate_id: int | None = None,
ssl_forced: bool = True,
hsts_enabled: bool = True,
hsts_subdomains: bool = False,
http2_support: bool = True,
block_exploits: bool = True,
caching_enabled: bool = False,
allow_websocket_upgrade: bool = True,
access_list_id: int = 0,
advanced_config: str = "",
meta: dict | None = None,
) -> ProxyHost:
"""Create a new proxy host.
Args:
domain_names: List of domain names for this host
forward_host: Backend host to forward to
forward_port: Backend port to forward to
forward_scheme: http or https
certificate_id: SSL certificate ID (0 for none, use list_certificates to find)
ssl_forced: Force SSL/HTTPS
hsts_enabled: Enable HSTS
hsts_subdomains: Include subdomains in HSTS
http2_support: Enable HTTP/2
block_exploits: Enable exploit blocking
caching_enabled: Enable caching
allow_websocket_upgrade: Allow WebSocket upgrades
access_list_id: Access list ID (0 for none, use list_access_lists to find)
advanced_config: Custom nginx configuration
meta: Additional metadata
Returns:
Created ProxyHost object
"""
payload = {
"domain_names": domain_names,
"forward_host": forward_host,
"forward_port": forward_port,
"forward_scheme": forward_scheme,
"certificate_id": certificate_id or 0,
"ssl_forced": ssl_forced,
"hsts_enabled": hsts_enabled,
"hsts_subdomains": hsts_subdomains,
"http2_support": http2_support,
"block_exploits": block_exploits,
"caching_enabled": caching_enabled,
"allow_websocket_upgrade": allow_websocket_upgrade,
"access_list_id": access_list_id,
"advanced_config": advanced_config,
"meta": meta or {},
}
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
return ProxyHost(**response.json())
+44
View File
@@ -1,7 +1,26 @@
"""Configuration management using pydantic-settings."""
from typing import Any
from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
# Default values for proxy host creation
DEFAULT_PROXY_SETTINGS: dict[str, Any] = {
"forward_scheme": "http",
"certificate_id": 0,
"ssl_forced": True,
"hsts_enabled": True,
"hsts_subdomains": False,
"http2_support": True,
"caching_enabled": False,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
"meta": {},
}
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
@@ -23,5 +42,30 @@ class Settings(BaseSettings):
mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http"
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}
@field_validator("proxy_defaults", mode="before")
@classmethod
def parse_proxy_defaults(cls, v: Any) -> dict[str, Any]:
"""Parse JSON string to dict, or pass through if already dict."""
if isinstance(v, dict):
return v
if isinstance(v, str) and v.strip():
import json
try:
return json.loads(v)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in NPM_PROXY_DEFAULTS: {e}") from e
return {}
def get_proxy_defaults(self) -> dict[str, Any]:
"""Get merged proxy defaults (base defaults + user overrides)."""
merged = DEFAULT_PROXY_SETTINGS.copy()
merged.update(self.proxy_defaults)
return merged
settings = Settings()
+13 -1
View File
@@ -34,6 +34,18 @@ class Owner(BaseModel):
roles: list[str]
class AccessList(BaseModel):
"""Access list for authentication/IP restrictions."""
id: int
created_on: datetime
modified_on: datetime
owner_user_id: int = 0
name: str
satisfy_any: bool = False
pass_auth: bool = False
class Certificate(BaseModel):
"""SSL Certificate information."""
@@ -81,7 +93,7 @@ class ProxyHost(BaseModel):
advanced_config: str = ""
enabled: bool = True
meta: dict[str, Any] = Field(default_factory=dict)
locations: list[ProxyHostLocation] = Field(default_factory=list)
locations: list[ProxyHostLocation] | None = None
# Optional expanded relations
owner: Owner | None = None
certificate: Certificate | None = None
+118 -3
View File
@@ -179,7 +179,7 @@ async def get_system_health() -> str:
try:
await client._ensure_authenticated()
result.append("Authenticated: ✅")
# Try to get settings (admin only)
try:
settings_list = await client.get_settings()
@@ -253,11 +253,126 @@ async def list_certificates() -> str:
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
result.append(
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n"
f" Domains: {domains}{expiry}"
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n Domains: {domains}{expiry}"
)
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def list_access_lists() -> str:
"""List all access lists configured in Nginx Proxy Manager.
Returns a summary of all access lists including their IDs and names.
Use these IDs when creating proxy hosts that require access control.
"""
try:
client = get_client()
access_lists = await client.get_access_lists()
if not access_lists:
return "No access lists configured."
result = []
for al in access_lists:
result.append(f"[{al.id}] {al.name}")
return f"Found {len(access_lists)} access list(s):\n\n" + "\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_proxy_host(
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
) -> str:
"""Create a new proxy host in Nginx Proxy Manager.
Args:
domain_names: List of domain names (e.g., ["app.ext.ben.io"])
forward_host: Backend host/IP to forward to (e.g., "192.168.1.100" or "container-name")
forward_port: Backend port to forward to (e.g., 8080)
forward_scheme: Backend protocol - "http" or "https" (default from config)
certificate_id: SSL certificate ID. Use list_certificates to find available certs.
Use 0 for no SSL, or the ID of a wildcard cert. (default from config)
ssl_forced: Force HTTPS redirect (default from config)
block_exploits: Enable common exploit blocking (default from config)
allow_websocket_upgrade: Allow WebSocket connections (default from config)
access_list_id: Access list ID for authentication. Use list_access_lists to find.
Use 0 for no access restrictions. (default from config)
advanced_config: Custom nginx configuration block (default from config)
Returns:
Details of the created proxy host including the new host ID.
Note:
Default values can be configured via NPM_PROXY_DEFAULTS environment variable.
Example: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
Example:
create_proxy_host(
domain_names=["myapp.ext.ben.io"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24, # *.ext.ben.io wildcard
)
"""
try:
# Get defaults from config, then override with provided values
defaults = settings.get_proxy_defaults()
client = get_client()
host = await client.create_proxy_host(
domain_names=domain_names,
forward_host=forward_host,
forward_port=forward_port,
forward_scheme=forward_scheme
if forward_scheme is not None
else defaults["forward_scheme"],
certificate_id=certificate_id
if certificate_id is not None
else defaults["certificate_id"],
ssl_forced=ssl_forced if ssl_forced is not None else defaults["ssl_forced"],
hsts_enabled=defaults.get("hsts_enabled", True),
hsts_subdomains=defaults.get("hsts_subdomains", False),
http2_support=defaults.get("http2_support", True),
block_exploits=block_exploits
if block_exploits is not None
else defaults["block_exploits"],
caching_enabled=defaults.get("caching_enabled", False),
allow_websocket_upgrade=allow_websocket_upgrade
if allow_websocket_upgrade is not None
else defaults["allow_websocket_upgrade"],
access_list_id=access_list_id
if access_list_id is not None
else defaults["access_list_id"],
advanced_config=advanced_config
if advanced_config is not None
else defaults["advanced_config"],
meta=defaults.get("meta", {}),
)
domains = ", ".join(host.domain_names)
return (
f"Successfully created proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}"
)
except Exception as e:
return _format_error(e)