mirror of
https://github.com/b3nw/nginx-proxy-manager-mcp.git
synced 2026-05-19 23:35:47 -05:00
Fix Pydantic backward compatibility and add update/certificate tools (#3)
- Make Owner and Certificate model fields optional with defaults to fix parsing errors when NPM API returns null/missing nested objects - Add update_proxy_host tool for modifying existing proxy host configs - Add create_certificate tool for provisioning Let's Encrypt SSL certs - Add corresponding client methods with full parameter support
This commit is contained in:
@@ -323,3 +323,72 @@ class NpmClient:
|
||||
|
||||
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
|
||||
return ProxyHost(**response.json())
|
||||
|
||||
async def update_proxy_host(
|
||||
self,
|
||||
host_id: int,
|
||||
**kwargs,
|
||||
) -> ProxyHost:
|
||||
"""Update an existing proxy host.
|
||||
|
||||
Args:
|
||||
host_id: The proxy host ID to update
|
||||
**kwargs: Fields to update (same as create_proxy_host)
|
||||
|
||||
Returns:
|
||||
Updated ProxyHost object
|
||||
"""
|
||||
# Get existing host to merge with updates
|
||||
existing = await self.get_proxy_host(host_id)
|
||||
payload = {
|
||||
"domain_names": existing.domain_names,
|
||||
"forward_host": existing.forward_host,
|
||||
"forward_port": existing.forward_port,
|
||||
"forward_scheme": existing.forward_scheme,
|
||||
"certificate_id": existing.certificate_id or 0,
|
||||
"ssl_forced": existing.ssl_forced,
|
||||
"hsts_enabled": existing.hsts_enabled,
|
||||
"hsts_subdomains": existing.hsts_subdomains,
|
||||
"http2_support": existing.http2_support,
|
||||
"block_exploits": existing.block_exploits,
|
||||
"caching_enabled": existing.caching_enabled,
|
||||
"allow_websocket_upgrade": existing.allow_websocket_upgrade,
|
||||
"access_list_id": existing.access_list_id,
|
||||
"advanced_config": existing.advanced_config,
|
||||
"meta": existing.meta,
|
||||
}
|
||||
payload.update({k: v for k, v in kwargs.items() if v is not None})
|
||||
|
||||
response = await self._request("PUT", f"/nginx/proxy-hosts/{host_id}", json=payload)
|
||||
return ProxyHost(**response.json())
|
||||
|
||||
async def create_certificate(
|
||||
self,
|
||||
domain_names: list[str],
|
||||
email: str,
|
||||
provider: str = "letsencrypt",
|
||||
dns_challenge: bool = False,
|
||||
) -> Certificate:
|
||||
"""Create/provision a new SSL certificate.
|
||||
|
||||
Args:
|
||||
domain_names: List of domain names for the certificate
|
||||
email: Email address for Let's Encrypt notifications
|
||||
provider: Certificate provider (default: "letsencrypt")
|
||||
dns_challenge: Use DNS challenge instead of HTTP (default: False)
|
||||
|
||||
Returns:
|
||||
Created Certificate object
|
||||
"""
|
||||
payload = {
|
||||
"domain_names": domain_names,
|
||||
"meta": {
|
||||
"letsencrypt_email": email,
|
||||
"letsencrypt_agree": True,
|
||||
"dns_challenge": dns_challenge,
|
||||
},
|
||||
"provider": provider,
|
||||
}
|
||||
|
||||
response = await self._request("POST", "/nginx/certificates", json=payload)
|
||||
return Certificate(**response.json())
|
||||
|
||||
@@ -23,15 +23,15 @@ class UserMeta(BaseModel):
|
||||
class Owner(BaseModel):
|
||||
"""Proxy host owner information."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
is_disabled: bool
|
||||
email: str
|
||||
name: str
|
||||
nickname: str
|
||||
avatar: str
|
||||
roles: list[str]
|
||||
id: int | None = None
|
||||
created_on: datetime | None = None
|
||||
modified_on: datetime | None = None
|
||||
is_disabled: bool = False
|
||||
email: str | None = None
|
||||
name: str = ""
|
||||
nickname: str = ""
|
||||
avatar: str = ""
|
||||
roles: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class AccessList(BaseModel):
|
||||
@@ -49,13 +49,13 @@ class AccessList(BaseModel):
|
||||
class Certificate(BaseModel):
|
||||
"""SSL Certificate information."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
owner_user_id: int
|
||||
provider: str
|
||||
nice_name: str
|
||||
domain_names: list[str]
|
||||
id: int | None = None
|
||||
created_on: datetime | None = None
|
||||
modified_on: datetime | None = None
|
||||
owner_user_id: int | None = None
|
||||
provider: str = ""
|
||||
nice_name: str = ""
|
||||
domain_names: list[str] = Field(default_factory=list)
|
||||
expires_on: datetime | None = None
|
||||
meta: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
@@ -376,3 +376,112 @@ async def create_proxy_host(
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def update_proxy_host(
|
||||
host_id: int,
|
||||
forward_host: str | None = None,
|
||||
forward_port: int | None = None,
|
||||
forward_scheme: str | None = None,
|
||||
certificate_id: int | None = None,
|
||||
ssl_forced: bool | None = None,
|
||||
block_exploits: bool | None = None,
|
||||
allow_websocket_upgrade: bool | None = None,
|
||||
access_list_id: int | None = None,
|
||||
advanced_config: str | None = None,
|
||||
) -> str:
|
||||
"""Update an existing proxy host in Nginx Proxy Manager.
|
||||
|
||||
Only provided fields will be updated; all others remain unchanged.
|
||||
|
||||
Args:
|
||||
host_id: The ID of the proxy host to update
|
||||
forward_host: Backend host/IP to forward to
|
||||
forward_port: Backend port to forward to
|
||||
forward_scheme: Backend protocol - "http" or "https"
|
||||
certificate_id: SSL certificate ID (use list_certificates to find, 0 for none)
|
||||
ssl_forced: Force HTTPS redirect
|
||||
block_exploits: Enable common exploit blocking
|
||||
allow_websocket_upgrade: Allow WebSocket connections
|
||||
access_list_id: Access list ID (0 for no restrictions)
|
||||
advanced_config: Custom nginx configuration block
|
||||
|
||||
Returns:
|
||||
Details of the updated proxy host.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
kwargs = {}
|
||||
if forward_host is not None:
|
||||
kwargs["forward_host"] = forward_host
|
||||
if forward_port is not None:
|
||||
kwargs["forward_port"] = forward_port
|
||||
if forward_scheme is not None:
|
||||
kwargs["forward_scheme"] = forward_scheme
|
||||
if certificate_id is not None:
|
||||
kwargs["certificate_id"] = certificate_id
|
||||
if ssl_forced is not None:
|
||||
kwargs["ssl_forced"] = ssl_forced
|
||||
if block_exploits is not None:
|
||||
kwargs["block_exploits"] = block_exploits
|
||||
if allow_websocket_upgrade is not None:
|
||||
kwargs["allow_websocket_upgrade"] = allow_websocket_upgrade
|
||||
if access_list_id is not None:
|
||||
kwargs["access_list_id"] = access_list_id
|
||||
if advanced_config is not None:
|
||||
kwargs["advanced_config"] = advanced_config
|
||||
|
||||
host = await client.update_proxy_host(host_id, **kwargs)
|
||||
|
||||
domains = ", ".join(host.domain_names)
|
||||
return (
|
||||
f"Successfully updated proxy host!\n\n"
|
||||
f"ID: {host.id}\n"
|
||||
f"Domains: {domains}\n"
|
||||
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
|
||||
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}\n"
|
||||
f"Certificate ID: {host.certificate_id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def create_certificate(
|
||||
domain_names: list[str],
|
||||
email: str,
|
||||
dns_challenge: bool = False,
|
||||
) -> str:
|
||||
"""Provision a new Let's Encrypt SSL certificate.
|
||||
|
||||
Args:
|
||||
domain_names: List of domain names for the certificate
|
||||
email: Email address for Let's Encrypt notifications
|
||||
dns_challenge: Use DNS challenge instead of HTTP (default: False)
|
||||
|
||||
Returns:
|
||||
Details of the created certificate including its ID.
|
||||
Use the returned ID with create_proxy_host or update_proxy_host.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
cert = await client.create_certificate(
|
||||
domain_names=domain_names,
|
||||
email=email,
|
||||
dns_challenge=dns_challenge,
|
||||
)
|
||||
|
||||
domains = ", ".join(cert.domain_names)
|
||||
expiry = cert.expires_on.strftime("%Y-%m-%d") if cert.expires_on else "N/A"
|
||||
return (
|
||||
f"Successfully created certificate!\n\n"
|
||||
f"ID: {cert.id}\n"
|
||||
f"Provider: {cert.provider}\n"
|
||||
f"Domains: {domains}\n"
|
||||
f"Expires: {expiry}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
Reference in New Issue
Block a user