mirror of
https://github.com/b3nw/nginx-proxy-manager-mcp.git
synced 2026-05-20 07:35:47 -05:00
Compare commits
8 Commits
37ad76f012
...
feature/pr
| Author | SHA1 | Date | |
|---|---|---|---|
| 77318c428e | |||
| 474784d4bc | |||
| 43d6ed4ca1 | |||
| a9011ff428 | |||
| ea137943fc | |||
| a9f4379c98 | |||
| e8df05e3f3 | |||
| 69158a871b |
3
.github/workflows/docker.yml
vendored
3
.github/workflows/docker.yml
vendored
@@ -26,7 +26,6 @@ jobs:
|
|||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
- name: Log in to Container Registry
|
- name: Log in to Container Registry
|
||||||
if: github.event_name != 'pull_request'
|
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
registry: ${{ env.REGISTRY }}
|
registry: ${{ env.REGISTRY }}
|
||||||
@@ -49,7 +48,7 @@ jobs:
|
|||||||
uses: docker/build-push-action@v6
|
uses: docker/build-push-action@v6
|
||||||
with:
|
with:
|
||||||
context: .
|
context: .
|
||||||
push: ${{ github.event_name != 'pull_request' }}
|
push: true
|
||||||
tags: ${{ steps.meta.outputs.tags }}
|
tags: ${{ steps.meta.outputs.tags }}
|
||||||
labels: ${{ steps.meta.outputs.labels }}
|
labels: ${{ steps.meta.outputs.labels }}
|
||||||
cache-from: type=gha
|
cache-from: type=gha
|
||||||
|
|||||||
36
README.md
36
README.md
@@ -49,8 +49,41 @@ NPM_SECRET=yourpassword
|
|||||||
# Optional: Server settings
|
# Optional: Server settings
|
||||||
NPM_MCP_PORT=8000
|
NPM_MCP_PORT=8000
|
||||||
NPM_MCP_TRANSPORT=stdio # or "http"
|
NPM_MCP_TRANSPORT=stdio # or "http"
|
||||||
|
|
||||||
|
# Optional: Default values for create_proxy_host (JSON)
|
||||||
|
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
| Variable | Required | Default | Description |
|
||||||
|
|----------|----------|---------|-------------|
|
||||||
|
| `NPM_API_URL` | Yes | `http://localhost:81/api` | NPM API endpoint |
|
||||||
|
| `NPM_IDENTITY` | Yes | - | NPM user email |
|
||||||
|
| `NPM_SECRET` | Yes | - | NPM user password |
|
||||||
|
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
|
||||||
|
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
|
||||||
|
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
|
||||||
|
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
|
||||||
|
|
||||||
|
### NPM_PROXY_DEFAULTS Keys
|
||||||
|
|
||||||
|
Configure default values for proxy host creation:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true, "block_exploits": true}'
|
||||||
|
```
|
||||||
|
|
||||||
|
| Key | Type | Default | Description |
|
||||||
|
|-----|------|---------|-------------|
|
||||||
|
| `forward_scheme` | string | `"http"` | Backend protocol (`http` or `https`) |
|
||||||
|
| `certificate_id` | int | `0` | SSL certificate ID (use `list_certificates` to find) |
|
||||||
|
| `ssl_forced` | bool | `true` | Force HTTPS redirect |
|
||||||
|
| `block_exploits` | bool | `true` | Enable common exploit blocking |
|
||||||
|
| `allow_websocket_upgrade` | bool | `true` | Allow WebSocket connections |
|
||||||
|
| `access_list_id` | int | `0` | Access list ID (use `list_access_lists` to find) |
|
||||||
|
| `advanced_config` | string | `""` | Custom nginx configuration block |
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
### Stdio Mode (for Claude Desktop, etc.)
|
### Stdio Mode (for Claude Desktop, etc.)
|
||||||
@@ -95,6 +128,9 @@ Add to your `claude_desktop_config.json`:
|
|||||||
| `get_proxy_host_details` | Get full config for a specific host |
|
| `get_proxy_host_details` | Get full config for a specific host |
|
||||||
| `get_system_health` | Check NPM version and status |
|
| `get_system_health` | Check NPM version and status |
|
||||||
| `search_audit_logs` | Query audit log entries |
|
| `search_audit_logs` | Query audit log entries |
|
||||||
|
| `list_certificates` | List SSL certificates |
|
||||||
|
| `list_access_lists` | List access lists for authentication/IP restrictions |
|
||||||
|
| `create_proxy_host` | Create a new proxy host |
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
|
|||||||
@@ -7,3 +7,8 @@ NPM_SECRET=changeme
|
|||||||
NPM_MCP_HOST=0.0.0.0
|
NPM_MCP_HOST=0.0.0.0
|
||||||
NPM_MCP_PORT=8000
|
NPM_MCP_PORT=8000
|
||||||
NPM_MCP_TRANSPORT=stdio # stdio or http
|
NPM_MCP_TRANSPORT=stdio # stdio or http
|
||||||
|
|
||||||
|
# Proxy Host Creation Defaults (JSON)
|
||||||
|
# Set default values for create_proxy_host tool parameters
|
||||||
|
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
|
||||||
|
# NPM_PROXY_DEFAULTS='{"certificate_id": 0, "ssl_forced": true, "block_exploits": true, "allow_websocket_upgrade": true}'
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ from .exceptions import (
|
|||||||
NpmNotFoundError,
|
NpmNotFoundError,
|
||||||
)
|
)
|
||||||
from .models import (
|
from .models import (
|
||||||
|
AccessList,
|
||||||
AuditLogEntry,
|
AuditLogEntry,
|
||||||
Certificate,
|
Certificate,
|
||||||
HealthStatus,
|
HealthStatus,
|
||||||
@@ -92,9 +93,7 @@ class NpmClient:
|
|||||||
raise NpmAuthenticationError("Invalid credentials")
|
raise NpmAuthenticationError("Invalid credentials")
|
||||||
|
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
raise NpmApiError(
|
raise NpmApiError(f"Login failed: {response.text}", status_code=response.status_code)
|
||||||
f"Login failed: {response.text}", status_code=response.status_code
|
|
||||||
)
|
|
||||||
|
|
||||||
data = response.json()
|
data = response.json()
|
||||||
token_response = TokenResponse(**data)
|
token_response = TokenResponse(**data)
|
||||||
@@ -172,9 +171,7 @@ class NpmClient:
|
|||||||
raise NpmNotFoundError(f"Resource not found: {endpoint}")
|
raise NpmNotFoundError(f"Resource not found: {endpoint}")
|
||||||
|
|
||||||
if response.status_code >= 400:
|
if response.status_code >= 400:
|
||||||
raise NpmApiError(
|
raise NpmApiError(f"API error: {response.text}", status_code=response.status_code)
|
||||||
f"API error: {response.text}", status_code=response.status_code
|
|
||||||
)
|
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@@ -259,3 +256,70 @@ class NpmClient:
|
|||||||
)
|
)
|
||||||
data = response.json()
|
data = response.json()
|
||||||
return [AuditLogEntry(**entry) for entry in data]
|
return [AuditLogEntry(**entry) for entry in data]
|
||||||
|
|
||||||
|
async def get_access_lists(self) -> list[AccessList]:
|
||||||
|
"""Get all access lists."""
|
||||||
|
response = await self._request("GET", "/nginx/access-lists")
|
||||||
|
data = response.json()
|
||||||
|
return [AccessList(**item) for item in data]
|
||||||
|
|
||||||
|
async def create_proxy_host(
|
||||||
|
self,
|
||||||
|
domain_names: list[str],
|
||||||
|
forward_host: str,
|
||||||
|
forward_port: int,
|
||||||
|
forward_scheme: str = "http",
|
||||||
|
certificate_id: int | None = None,
|
||||||
|
ssl_forced: bool = True,
|
||||||
|
hsts_enabled: bool = True,
|
||||||
|
hsts_subdomains: bool = False,
|
||||||
|
http2_support: bool = True,
|
||||||
|
block_exploits: bool = True,
|
||||||
|
caching_enabled: bool = False,
|
||||||
|
allow_websocket_upgrade: bool = True,
|
||||||
|
access_list_id: int = 0,
|
||||||
|
advanced_config: str = "",
|
||||||
|
meta: dict | None = None,
|
||||||
|
) -> ProxyHost:
|
||||||
|
"""Create a new proxy host.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
domain_names: List of domain names for this host
|
||||||
|
forward_host: Backend host to forward to
|
||||||
|
forward_port: Backend port to forward to
|
||||||
|
forward_scheme: http or https
|
||||||
|
certificate_id: SSL certificate ID (0 for none, use list_certificates to find)
|
||||||
|
ssl_forced: Force SSL/HTTPS
|
||||||
|
hsts_enabled: Enable HSTS
|
||||||
|
hsts_subdomains: Include subdomains in HSTS
|
||||||
|
http2_support: Enable HTTP/2
|
||||||
|
block_exploits: Enable exploit blocking
|
||||||
|
caching_enabled: Enable caching
|
||||||
|
allow_websocket_upgrade: Allow WebSocket upgrades
|
||||||
|
access_list_id: Access list ID (0 for none, use list_access_lists to find)
|
||||||
|
advanced_config: Custom nginx configuration
|
||||||
|
meta: Additional metadata
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Created ProxyHost object
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"domain_names": domain_names,
|
||||||
|
"forward_host": forward_host,
|
||||||
|
"forward_port": forward_port,
|
||||||
|
"forward_scheme": forward_scheme,
|
||||||
|
"certificate_id": certificate_id or 0,
|
||||||
|
"ssl_forced": ssl_forced,
|
||||||
|
"hsts_enabled": hsts_enabled,
|
||||||
|
"hsts_subdomains": hsts_subdomains,
|
||||||
|
"http2_support": http2_support,
|
||||||
|
"block_exploits": block_exploits,
|
||||||
|
"caching_enabled": caching_enabled,
|
||||||
|
"allow_websocket_upgrade": allow_websocket_upgrade,
|
||||||
|
"access_list_id": access_list_id,
|
||||||
|
"advanced_config": advanced_config,
|
||||||
|
"meta": meta or {},
|
||||||
|
}
|
||||||
|
|
||||||
|
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
|
||||||
|
return ProxyHost(**response.json())
|
||||||
|
|||||||
@@ -1,7 +1,26 @@
|
|||||||
"""Configuration management using pydantic-settings."""
|
"""Configuration management using pydantic-settings."""
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from pydantic import field_validator
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
# Default values for proxy host creation
|
||||||
|
DEFAULT_PROXY_SETTINGS: dict[str, Any] = {
|
||||||
|
"forward_scheme": "http",
|
||||||
|
"certificate_id": 0,
|
||||||
|
"ssl_forced": True,
|
||||||
|
"hsts_enabled": True,
|
||||||
|
"hsts_subdomains": False,
|
||||||
|
"http2_support": True,
|
||||||
|
"caching_enabled": False,
|
||||||
|
"block_exploits": True,
|
||||||
|
"allow_websocket_upgrade": True,
|
||||||
|
"access_list_id": 0,
|
||||||
|
"advanced_config": "",
|
||||||
|
"meta": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
class Settings(BaseSettings):
|
||||||
"""Application settings loaded from environment variables."""
|
"""Application settings loaded from environment variables."""
|
||||||
@@ -23,5 +42,30 @@ class Settings(BaseSettings):
|
|||||||
mcp_port: int = 8000
|
mcp_port: int = 8000
|
||||||
mcp_transport: str = "stdio" # "stdio" or "http"
|
mcp_transport: str = "stdio" # "stdio" or "http"
|
||||||
|
|
||||||
|
# Proxy host creation defaults (JSON string)
|
||||||
|
# Example: '{"certificate_id": 24, "ssl_forced": true}'
|
||||||
|
proxy_defaults: dict[str, Any] = {}
|
||||||
|
|
||||||
|
@field_validator("proxy_defaults", mode="before")
|
||||||
|
@classmethod
|
||||||
|
def parse_proxy_defaults(cls, v: Any) -> dict[str, Any]:
|
||||||
|
"""Parse JSON string to dict, or pass through if already dict."""
|
||||||
|
if isinstance(v, dict):
|
||||||
|
return v
|
||||||
|
if isinstance(v, str) and v.strip():
|
||||||
|
import json
|
||||||
|
|
||||||
|
try:
|
||||||
|
return json.loads(v)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise ValueError(f"Invalid JSON in NPM_PROXY_DEFAULTS: {e}") from e
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def get_proxy_defaults(self) -> dict[str, Any]:
|
||||||
|
"""Get merged proxy defaults (base defaults + user overrides)."""
|
||||||
|
merged = DEFAULT_PROXY_SETTINGS.copy()
|
||||||
|
merged.update(self.proxy_defaults)
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
|
|||||||
@@ -34,6 +34,18 @@ class Owner(BaseModel):
|
|||||||
roles: list[str]
|
roles: list[str]
|
||||||
|
|
||||||
|
|
||||||
|
class AccessList(BaseModel):
|
||||||
|
"""Access list for authentication/IP restrictions."""
|
||||||
|
|
||||||
|
id: int
|
||||||
|
created_on: datetime
|
||||||
|
modified_on: datetime
|
||||||
|
owner_user_id: int = 0
|
||||||
|
name: str
|
||||||
|
satisfy_any: bool = False
|
||||||
|
pass_auth: bool = False
|
||||||
|
|
||||||
|
|
||||||
class Certificate(BaseModel):
|
class Certificate(BaseModel):
|
||||||
"""SSL Certificate information."""
|
"""SSL Certificate information."""
|
||||||
|
|
||||||
@@ -81,7 +93,7 @@ class ProxyHost(BaseModel):
|
|||||||
advanced_config: str = ""
|
advanced_config: str = ""
|
||||||
enabled: bool = True
|
enabled: bool = True
|
||||||
meta: dict[str, Any] = Field(default_factory=dict)
|
meta: dict[str, Any] = Field(default_factory=dict)
|
||||||
locations: list[ProxyHostLocation] = Field(default_factory=list)
|
locations: list[ProxyHostLocation] | None = None
|
||||||
# Optional expanded relations
|
# Optional expanded relations
|
||||||
owner: Owner | None = None
|
owner: Owner | None = None
|
||||||
certificate: Certificate | None = None
|
certificate: Certificate | None = None
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ async def get_system_health() -> str:
|
|||||||
try:
|
try:
|
||||||
await client._ensure_authenticated()
|
await client._ensure_authenticated()
|
||||||
result.append("Authenticated: ✅")
|
result.append("Authenticated: ✅")
|
||||||
|
|
||||||
# Try to get settings (admin only)
|
# Try to get settings (admin only)
|
||||||
try:
|
try:
|
||||||
settings_list = await client.get_settings()
|
settings_list = await client.get_settings()
|
||||||
@@ -253,11 +253,126 @@ async def list_certificates() -> str:
|
|||||||
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
|
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
|
||||||
|
|
||||||
result.append(
|
result.append(
|
||||||
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n"
|
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n Domains: {domains}{expiry}"
|
||||||
f" Domains: {domains}{expiry}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
|
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return _format_error(e)
|
return _format_error(e)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def list_access_lists() -> str:
|
||||||
|
"""List all access lists configured in Nginx Proxy Manager.
|
||||||
|
|
||||||
|
Returns a summary of all access lists including their IDs and names.
|
||||||
|
Use these IDs when creating proxy hosts that require access control.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = get_client()
|
||||||
|
access_lists = await client.get_access_lists()
|
||||||
|
|
||||||
|
if not access_lists:
|
||||||
|
return "No access lists configured."
|
||||||
|
|
||||||
|
result = []
|
||||||
|
for al in access_lists:
|
||||||
|
result.append(f"[{al.id}] {al.name}")
|
||||||
|
|
||||||
|
return f"Found {len(access_lists)} access list(s):\n\n" + "\n".join(result)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return _format_error(e)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def create_proxy_host(
|
||||||
|
domain_names: list[str],
|
||||||
|
forward_host: str,
|
||||||
|
forward_port: int,
|
||||||
|
forward_scheme: str | None = None,
|
||||||
|
certificate_id: int | None = None,
|
||||||
|
ssl_forced: bool | None = None,
|
||||||
|
block_exploits: bool | None = None,
|
||||||
|
allow_websocket_upgrade: bool | None = None,
|
||||||
|
access_list_id: int | None = None,
|
||||||
|
advanced_config: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Create a new proxy host in Nginx Proxy Manager.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
domain_names: List of domain names (e.g., ["app.ext.ben.io"])
|
||||||
|
forward_host: Backend host/IP to forward to (e.g., "192.168.1.100" or "container-name")
|
||||||
|
forward_port: Backend port to forward to (e.g., 8080)
|
||||||
|
forward_scheme: Backend protocol - "http" or "https" (default from config)
|
||||||
|
certificate_id: SSL certificate ID. Use list_certificates to find available certs.
|
||||||
|
Use 0 for no SSL, or the ID of a wildcard cert. (default from config)
|
||||||
|
ssl_forced: Force HTTPS redirect (default from config)
|
||||||
|
block_exploits: Enable common exploit blocking (default from config)
|
||||||
|
allow_websocket_upgrade: Allow WebSocket connections (default from config)
|
||||||
|
access_list_id: Access list ID for authentication. Use list_access_lists to find.
|
||||||
|
Use 0 for no access restrictions. (default from config)
|
||||||
|
advanced_config: Custom nginx configuration block (default from config)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Details of the created proxy host including the new host ID.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
Default values can be configured via NPM_PROXY_DEFAULTS environment variable.
|
||||||
|
Example: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
|
||||||
|
|
||||||
|
Example:
|
||||||
|
create_proxy_host(
|
||||||
|
domain_names=["myapp.ext.ben.io"],
|
||||||
|
forward_host="10.0.0.50",
|
||||||
|
forward_port=3000,
|
||||||
|
certificate_id=24, # *.ext.ben.io wildcard
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Get defaults from config, then override with provided values
|
||||||
|
defaults = settings.get_proxy_defaults()
|
||||||
|
|
||||||
|
client = get_client()
|
||||||
|
host = await client.create_proxy_host(
|
||||||
|
domain_names=domain_names,
|
||||||
|
forward_host=forward_host,
|
||||||
|
forward_port=forward_port,
|
||||||
|
forward_scheme=forward_scheme
|
||||||
|
if forward_scheme is not None
|
||||||
|
else defaults["forward_scheme"],
|
||||||
|
certificate_id=certificate_id
|
||||||
|
if certificate_id is not None
|
||||||
|
else defaults["certificate_id"],
|
||||||
|
ssl_forced=ssl_forced if ssl_forced is not None else defaults["ssl_forced"],
|
||||||
|
hsts_enabled=defaults.get("hsts_enabled", True),
|
||||||
|
hsts_subdomains=defaults.get("hsts_subdomains", False),
|
||||||
|
http2_support=defaults.get("http2_support", True),
|
||||||
|
block_exploits=block_exploits
|
||||||
|
if block_exploits is not None
|
||||||
|
else defaults["block_exploits"],
|
||||||
|
caching_enabled=defaults.get("caching_enabled", False),
|
||||||
|
allow_websocket_upgrade=allow_websocket_upgrade
|
||||||
|
if allow_websocket_upgrade is not None
|
||||||
|
else defaults["allow_websocket_upgrade"],
|
||||||
|
access_list_id=access_list_id
|
||||||
|
if access_list_id is not None
|
||||||
|
else defaults["access_list_id"],
|
||||||
|
advanced_config=advanced_config
|
||||||
|
if advanced_config is not None
|
||||||
|
else defaults["advanced_config"],
|
||||||
|
meta=defaults.get("meta", {}),
|
||||||
|
)
|
||||||
|
|
||||||
|
domains = ", ".join(host.domain_names)
|
||||||
|
return (
|
||||||
|
f"Successfully created proxy host!\n\n"
|
||||||
|
f"ID: {host.id}\n"
|
||||||
|
f"Domains: {domains}\n"
|
||||||
|
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
|
||||||
|
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return _format_error(e)
|
||||||
|
|||||||
@@ -35,6 +35,53 @@ def mock_proxy_hosts():
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_access_lists():
|
||||||
|
"""Mock access lists response."""
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"created_on": "2024-01-01T00:00:00Z",
|
||||||
|
"modified_on": "2024-01-01T00:00:00Z",
|
||||||
|
"owner_user_id": 1,
|
||||||
|
"name": "Admin Only",
|
||||||
|
"satisfy_any": False,
|
||||||
|
"pass_auth": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 2,
|
||||||
|
"created_on": "2024-01-02T00:00:00Z",
|
||||||
|
"modified_on": "2024-01-02T00:00:00Z",
|
||||||
|
"owner_user_id": 1,
|
||||||
|
"name": "Internal Network",
|
||||||
|
"satisfy_any": True,
|
||||||
|
"pass_auth": False,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_created_proxy_host():
|
||||||
|
"""Mock response for created proxy host."""
|
||||||
|
return {
|
||||||
|
"id": 42,
|
||||||
|
"created_on": "2024-01-15T10:00:00Z",
|
||||||
|
"modified_on": "2024-01-15T10:00:00Z",
|
||||||
|
"owner_user_id": 1,
|
||||||
|
"domain_names": ["newapp.example.com"],
|
||||||
|
"forward_host": "10.0.0.50",
|
||||||
|
"forward_port": 3000,
|
||||||
|
"forward_scheme": "http",
|
||||||
|
"enabled": True,
|
||||||
|
"ssl_forced": True,
|
||||||
|
"certificate_id": 24,
|
||||||
|
"block_exploits": True,
|
||||||
|
"allow_websocket_upgrade": True,
|
||||||
|
"access_list_id": 0,
|
||||||
|
"advanced_config": "",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class TestNpmClientAuth:
|
class TestNpmClientAuth:
|
||||||
"""Test authentication logic."""
|
"""Test authentication logic."""
|
||||||
|
|
||||||
@@ -114,3 +161,69 @@ class TestNpmClientEndpoints:
|
|||||||
assert hosts[0].id == 1
|
assert hosts[0].id == 1
|
||||||
assert hosts[0].domain_names == ["example.com"]
|
assert hosts[0].domain_names == ["example.com"]
|
||||||
assert hosts[0].forward_host == "192.168.1.100"
|
assert hosts[0].forward_host == "192.168.1.100"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_access_lists(self, httpx_mock, mock_token_response, mock_access_lists):
|
||||||
|
"""Test fetching access lists."""
|
||||||
|
httpx_mock.add_response(
|
||||||
|
method="POST",
|
||||||
|
url="http://localhost:81/api/tokens",
|
||||||
|
json=mock_token_response,
|
||||||
|
)
|
||||||
|
httpx_mock.add_response(
|
||||||
|
method="GET",
|
||||||
|
url="http://localhost:81/api/nginx/access-lists",
|
||||||
|
json=mock_access_lists,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with NpmClient(
|
||||||
|
base_url="http://localhost:81/api",
|
||||||
|
identity="test@test.com",
|
||||||
|
secret="password",
|
||||||
|
) as client:
|
||||||
|
access_lists = await client.get_access_lists()
|
||||||
|
|
||||||
|
assert len(access_lists) == 2
|
||||||
|
assert access_lists[0].id == 1
|
||||||
|
assert access_lists[0].name == "Admin Only"
|
||||||
|
assert access_lists[0].pass_auth is True
|
||||||
|
assert access_lists[1].id == 2
|
||||||
|
assert access_lists[1].name == "Internal Network"
|
||||||
|
assert access_lists[1].satisfy_any is True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_create_proxy_host(
|
||||||
|
self, httpx_mock, mock_token_response, mock_created_proxy_host
|
||||||
|
):
|
||||||
|
"""Test creating a proxy host."""
|
||||||
|
httpx_mock.add_response(
|
||||||
|
method="POST",
|
||||||
|
url="http://localhost:81/api/tokens",
|
||||||
|
json=mock_token_response,
|
||||||
|
)
|
||||||
|
httpx_mock.add_response(
|
||||||
|
method="POST",
|
||||||
|
url="http://localhost:81/api/nginx/proxy-hosts",
|
||||||
|
json=mock_created_proxy_host,
|
||||||
|
status_code=201,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with NpmClient(
|
||||||
|
base_url="http://localhost:81/api",
|
||||||
|
identity="test@test.com",
|
||||||
|
secret="password",
|
||||||
|
) as client:
|
||||||
|
host = await client.create_proxy_host(
|
||||||
|
domain_names=["newapp.example.com"],
|
||||||
|
forward_host="10.0.0.50",
|
||||||
|
forward_port=3000,
|
||||||
|
certificate_id=24,
|
||||||
|
ssl_forced=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert host.id == 42
|
||||||
|
assert host.domain_names == ["newapp.example.com"]
|
||||||
|
assert host.forward_host == "10.0.0.50"
|
||||||
|
assert host.forward_port == 3000
|
||||||
|
assert host.ssl_forced is True
|
||||||
|
assert host.certificate_id == 24
|
||||||
|
|||||||
91
tests/test_config.py
Normal file
91
tests/test_config.py
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
"""Tests for configuration handling."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from pydantic_settings.exceptions import SettingsError
|
||||||
|
|
||||||
|
from npm_mcp.config import DEFAULT_PROXY_SETTINGS, Settings
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyDefaults:
|
||||||
|
"""Test NPM_PROXY_DEFAULTS parsing and merging."""
|
||||||
|
|
||||||
|
def test_default_proxy_settings(self):
|
||||||
|
"""Test that default settings are correct."""
|
||||||
|
settings = Settings(identity="test", secret="test")
|
||||||
|
defaults = settings.get_proxy_defaults()
|
||||||
|
|
||||||
|
assert defaults["forward_scheme"] == "http"
|
||||||
|
assert defaults["certificate_id"] == 0
|
||||||
|
assert defaults["ssl_forced"] is True
|
||||||
|
assert defaults["block_exploits"] is True
|
||||||
|
assert defaults["allow_websocket_upgrade"] is True
|
||||||
|
assert defaults["access_list_id"] == 0
|
||||||
|
assert defaults["advanced_config"] == ""
|
||||||
|
|
||||||
|
def test_proxy_defaults_json_parsing(self, monkeypatch):
|
||||||
|
"""Test parsing JSON string from environment variable."""
|
||||||
|
monkeypatch.setenv("NPM_IDENTITY", "test")
|
||||||
|
monkeypatch.setenv("NPM_SECRET", "test")
|
||||||
|
monkeypatch.setenv("NPM_PROXY_DEFAULTS", '{"certificate_id": 24, "ssl_forced": false}')
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
defaults = settings.get_proxy_defaults()
|
||||||
|
|
||||||
|
# Overridden values
|
||||||
|
assert defaults["certificate_id"] == 24
|
||||||
|
assert defaults["ssl_forced"] is False
|
||||||
|
|
||||||
|
# Default values preserved
|
||||||
|
assert defaults["forward_scheme"] == "http"
|
||||||
|
assert defaults["block_exploits"] is True
|
||||||
|
|
||||||
|
def test_proxy_defaults_dict_passthrough(self):
|
||||||
|
"""Test that dict values pass through correctly."""
|
||||||
|
settings = Settings(
|
||||||
|
identity="test",
|
||||||
|
secret="test",
|
||||||
|
proxy_defaults={"certificate_id": 18, "access_list_id": 5},
|
||||||
|
)
|
||||||
|
defaults = settings.get_proxy_defaults()
|
||||||
|
|
||||||
|
assert defaults["certificate_id"] == 18
|
||||||
|
assert defaults["access_list_id"] == 5
|
||||||
|
|
||||||
|
def test_proxy_defaults_empty_env_raises(self, monkeypatch):
|
||||||
|
"""Test that empty string env var raises SettingsError."""
|
||||||
|
monkeypatch.setenv("NPM_IDENTITY", "test")
|
||||||
|
monkeypatch.setenv("NPM_SECRET", "test")
|
||||||
|
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "")
|
||||||
|
|
||||||
|
# pydantic-settings tries to JSON decode empty string and fails
|
||||||
|
with pytest.raises(SettingsError):
|
||||||
|
Settings()
|
||||||
|
|
||||||
|
def test_proxy_defaults_invalid_json_raises(self, monkeypatch):
|
||||||
|
"""Test that invalid JSON raises SettingsError."""
|
||||||
|
monkeypatch.setenv("NPM_IDENTITY", "test")
|
||||||
|
monkeypatch.setenv("NPM_SECRET", "test")
|
||||||
|
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "{not valid json}")
|
||||||
|
|
||||||
|
# pydantic-settings tries to JSON decode and fails
|
||||||
|
with pytest.raises(SettingsError):
|
||||||
|
Settings()
|
||||||
|
|
||||||
|
def test_proxy_defaults_merges_not_replaces(self):
|
||||||
|
"""Test that user defaults merge with base defaults."""
|
||||||
|
settings = Settings(
|
||||||
|
identity="test",
|
||||||
|
secret="test",
|
||||||
|
proxy_defaults={"certificate_id": 24},
|
||||||
|
)
|
||||||
|
defaults = settings.get_proxy_defaults()
|
||||||
|
|
||||||
|
# All keys should be present
|
||||||
|
assert set(defaults.keys()) == set(DEFAULT_PROXY_SETTINGS.keys())
|
||||||
|
|
||||||
|
# User value applied
|
||||||
|
assert defaults["certificate_id"] == 24
|
||||||
|
|
||||||
|
# Other defaults preserved
|
||||||
|
assert defaults["ssl_forced"] is True
|
||||||
|
assert defaults["block_exploits"] is True
|
||||||
Reference in New Issue
Block a user