mirror of
https://github.com/b3nw/nginx-proxy-manager-mcp.git
synced 2026-05-20 15:35:48 -05:00
Compare commits
2 Commits
feature/pr
...
15b2876d7b
| Author | SHA1 | Date | |
|---|---|---|---|
| 15b2876d7b | |||
|
|
52eb484432 |
3
.github/workflows/docker.yml
vendored
3
.github/workflows/docker.yml
vendored
@@ -26,7 +26,6 @@ jobs:
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Log in to Container Registry
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
@@ -49,7 +48,7 @@ jobs:
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
cache-from: type=gha
|
||||
|
||||
36
README.md
36
README.md
@@ -49,8 +49,41 @@ NPM_SECRET=yourpassword
|
||||
# Optional: Server settings
|
||||
NPM_MCP_PORT=8000
|
||||
NPM_MCP_TRANSPORT=stdio # or "http"
|
||||
|
||||
# Optional: Default values for create_proxy_host (JSON)
|
||||
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
|
||||
```
|
||||
|
||||
### Environment Variables
|
||||
|
||||
| Variable | Required | Default | Description |
|
||||
|----------|----------|---------|-------------|
|
||||
| `NPM_API_URL` | Yes | `http://localhost:81/api` | NPM API endpoint |
|
||||
| `NPM_IDENTITY` | Yes | - | NPM user email |
|
||||
| `NPM_SECRET` | Yes | - | NPM user password |
|
||||
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
|
||||
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
|
||||
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
|
||||
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
|
||||
|
||||
### NPM_PROXY_DEFAULTS Keys
|
||||
|
||||
Configure default values for proxy host creation:
|
||||
|
||||
```bash
|
||||
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true, "block_exploits": true}'
|
||||
```
|
||||
|
||||
| Key | Type | Default | Description |
|
||||
|-----|------|---------|-------------|
|
||||
| `forward_scheme` | string | `"http"` | Backend protocol (`http` or `https`) |
|
||||
| `certificate_id` | int | `0` | SSL certificate ID (use `list_certificates` to find) |
|
||||
| `ssl_forced` | bool | `true` | Force HTTPS redirect |
|
||||
| `block_exploits` | bool | `true` | Enable common exploit blocking |
|
||||
| `allow_websocket_upgrade` | bool | `true` | Allow WebSocket connections |
|
||||
| `access_list_id` | int | `0` | Access list ID (use `list_access_lists` to find) |
|
||||
| `advanced_config` | string | `""` | Custom nginx configuration block |
|
||||
|
||||
## Usage
|
||||
|
||||
### Stdio Mode (for Claude Desktop, etc.)
|
||||
@@ -95,6 +128,9 @@ Add to your `claude_desktop_config.json`:
|
||||
| `get_proxy_host_details` | Get full config for a specific host |
|
||||
| `get_system_health` | Check NPM version and status |
|
||||
| `search_audit_logs` | Query audit log entries |
|
||||
| `list_certificates` | List SSL certificates |
|
||||
| `list_access_lists` | List access lists for authentication/IP restrictions |
|
||||
| `create_proxy_host` | Create a new proxy host |
|
||||
|
||||
## Development
|
||||
|
||||
|
||||
@@ -7,3 +7,8 @@ NPM_SECRET=changeme
|
||||
NPM_MCP_HOST=0.0.0.0
|
||||
NPM_MCP_PORT=8000
|
||||
NPM_MCP_TRANSPORT=stdio # stdio or http
|
||||
|
||||
# Proxy Host Creation Defaults (JSON)
|
||||
# Set default values for create_proxy_host tool parameters
|
||||
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
|
||||
# NPM_PROXY_DEFAULTS='{"certificate_id": 0, "ssl_forced": true, "block_exploits": true, "allow_websocket_upgrade": true}'
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "npm-mcp"
|
||||
version = "0.1.0"
|
||||
version = "0.0.2"
|
||||
description = "MCP server for Nginx Proxy Manager"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@@ -13,6 +13,7 @@ from .exceptions import (
|
||||
NpmNotFoundError,
|
||||
)
|
||||
from .models import (
|
||||
AccessList,
|
||||
AuditLogEntry,
|
||||
Certificate,
|
||||
HealthStatus,
|
||||
@@ -92,9 +93,7 @@ class NpmClient:
|
||||
raise NpmAuthenticationError("Invalid credentials")
|
||||
|
||||
if response.status_code != 200:
|
||||
raise NpmApiError(
|
||||
f"Login failed: {response.text}", status_code=response.status_code
|
||||
)
|
||||
raise NpmApiError(f"Login failed: {response.text}", status_code=response.status_code)
|
||||
|
||||
data = response.json()
|
||||
token_response = TokenResponse(**data)
|
||||
@@ -172,9 +171,7 @@ class NpmClient:
|
||||
raise NpmNotFoundError(f"Resource not found: {endpoint}")
|
||||
|
||||
if response.status_code >= 400:
|
||||
raise NpmApiError(
|
||||
f"API error: {response.text}", status_code=response.status_code
|
||||
)
|
||||
raise NpmApiError(f"API error: {response.text}", status_code=response.status_code)
|
||||
|
||||
return response
|
||||
|
||||
@@ -259,3 +256,70 @@ class NpmClient:
|
||||
)
|
||||
data = response.json()
|
||||
return [AuditLogEntry(**entry) for entry in data]
|
||||
|
||||
async def get_access_lists(self) -> list[AccessList]:
|
||||
"""Get all access lists."""
|
||||
response = await self._request("GET", "/nginx/access-lists")
|
||||
data = response.json()
|
||||
return [AccessList(**item) for item in data]
|
||||
|
||||
async def create_proxy_host(
|
||||
self,
|
||||
domain_names: list[str],
|
||||
forward_host: str,
|
||||
forward_port: int,
|
||||
forward_scheme: str = "http",
|
||||
certificate_id: int | None = None,
|
||||
ssl_forced: bool = True,
|
||||
hsts_enabled: bool = True,
|
||||
hsts_subdomains: bool = False,
|
||||
http2_support: bool = True,
|
||||
block_exploits: bool = True,
|
||||
caching_enabled: bool = False,
|
||||
allow_websocket_upgrade: bool = True,
|
||||
access_list_id: int = 0,
|
||||
advanced_config: str = "",
|
||||
meta: dict | None = None,
|
||||
) -> ProxyHost:
|
||||
"""Create a new proxy host.
|
||||
|
||||
Args:
|
||||
domain_names: List of domain names for this host
|
||||
forward_host: Backend host to forward to
|
||||
forward_port: Backend port to forward to
|
||||
forward_scheme: http or https
|
||||
certificate_id: SSL certificate ID (0 for none, use list_certificates to find)
|
||||
ssl_forced: Force SSL/HTTPS
|
||||
hsts_enabled: Enable HSTS
|
||||
hsts_subdomains: Include subdomains in HSTS
|
||||
http2_support: Enable HTTP/2
|
||||
block_exploits: Enable exploit blocking
|
||||
caching_enabled: Enable caching
|
||||
allow_websocket_upgrade: Allow WebSocket upgrades
|
||||
access_list_id: Access list ID (0 for none, use list_access_lists to find)
|
||||
advanced_config: Custom nginx configuration
|
||||
meta: Additional metadata
|
||||
|
||||
Returns:
|
||||
Created ProxyHost object
|
||||
"""
|
||||
payload = {
|
||||
"domain_names": domain_names,
|
||||
"forward_host": forward_host,
|
||||
"forward_port": forward_port,
|
||||
"forward_scheme": forward_scheme,
|
||||
"certificate_id": certificate_id or 0,
|
||||
"ssl_forced": ssl_forced,
|
||||
"hsts_enabled": hsts_enabled,
|
||||
"hsts_subdomains": hsts_subdomains,
|
||||
"http2_support": http2_support,
|
||||
"block_exploits": block_exploits,
|
||||
"caching_enabled": caching_enabled,
|
||||
"allow_websocket_upgrade": allow_websocket_upgrade,
|
||||
"access_list_id": access_list_id,
|
||||
"advanced_config": advanced_config,
|
||||
"meta": meta or {},
|
||||
}
|
||||
|
||||
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
|
||||
return ProxyHost(**response.json())
|
||||
|
||||
@@ -1,7 +1,26 @@
|
||||
"""Configuration management using pydantic-settings."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import field_validator
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
# Default values for proxy host creation
|
||||
DEFAULT_PROXY_SETTINGS: dict[str, Any] = {
|
||||
"forward_scheme": "http",
|
||||
"certificate_id": 0,
|
||||
"ssl_forced": True,
|
||||
"hsts_enabled": True,
|
||||
"hsts_subdomains": False,
|
||||
"http2_support": True,
|
||||
"caching_enabled": False,
|
||||
"block_exploits": True,
|
||||
"allow_websocket_upgrade": True,
|
||||
"access_list_id": 0,
|
||||
"advanced_config": "",
|
||||
"meta": {},
|
||||
}
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Application settings loaded from environment variables."""
|
||||
@@ -23,5 +42,30 @@ class Settings(BaseSettings):
|
||||
mcp_port: int = 8000
|
||||
mcp_transport: str = "stdio" # "stdio" or "http"
|
||||
|
||||
# Proxy host creation defaults (JSON string)
|
||||
# Example: '{"certificate_id": 24, "ssl_forced": true}'
|
||||
proxy_defaults: dict[str, Any] = {}
|
||||
|
||||
@field_validator("proxy_defaults", mode="before")
|
||||
@classmethod
|
||||
def parse_proxy_defaults(cls, v: Any) -> dict[str, Any]:
|
||||
"""Parse JSON string to dict, or pass through if already dict."""
|
||||
if isinstance(v, dict):
|
||||
return v
|
||||
if isinstance(v, str) and v.strip():
|
||||
import json
|
||||
|
||||
try:
|
||||
return json.loads(v)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid JSON in NPM_PROXY_DEFAULTS: {e}") from e
|
||||
return {}
|
||||
|
||||
def get_proxy_defaults(self) -> dict[str, Any]:
|
||||
"""Get merged proxy defaults (base defaults + user overrides)."""
|
||||
merged = DEFAULT_PROXY_SETTINGS.copy()
|
||||
merged.update(self.proxy_defaults)
|
||||
return merged
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
@@ -34,6 +34,18 @@ class Owner(BaseModel):
|
||||
roles: list[str]
|
||||
|
||||
|
||||
class AccessList(BaseModel):
|
||||
"""Access list for authentication/IP restrictions."""
|
||||
|
||||
id: int
|
||||
created_on: datetime
|
||||
modified_on: datetime
|
||||
owner_user_id: int = 0
|
||||
name: str
|
||||
satisfy_any: bool = False
|
||||
pass_auth: bool = False
|
||||
|
||||
|
||||
class Certificate(BaseModel):
|
||||
"""SSL Certificate information."""
|
||||
|
||||
@@ -81,7 +93,7 @@ class ProxyHost(BaseModel):
|
||||
advanced_config: str = ""
|
||||
enabled: bool = True
|
||||
meta: dict[str, Any] = Field(default_factory=dict)
|
||||
locations: list[ProxyHostLocation] = Field(default_factory=list)
|
||||
locations: list[ProxyHostLocation] | None = None
|
||||
# Optional expanded relations
|
||||
owner: Owner | None = None
|
||||
certificate: Certificate | None = None
|
||||
|
||||
@@ -179,7 +179,7 @@ async def get_system_health() -> str:
|
||||
try:
|
||||
await client._ensure_authenticated()
|
||||
result.append("Authenticated: ✅")
|
||||
|
||||
|
||||
# Try to get settings (admin only)
|
||||
try:
|
||||
settings_list = await client.get_settings()
|
||||
@@ -253,11 +253,126 @@ async def list_certificates() -> str:
|
||||
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
|
||||
|
||||
result.append(
|
||||
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n"
|
||||
f" Domains: {domains}{expiry}"
|
||||
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n Domains: {domains}{expiry}"
|
||||
)
|
||||
|
||||
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_access_lists() -> str:
|
||||
"""List all access lists configured in Nginx Proxy Manager.
|
||||
|
||||
Returns a summary of all access lists including their IDs and names.
|
||||
Use these IDs when creating proxy hosts that require access control.
|
||||
"""
|
||||
try:
|
||||
client = get_client()
|
||||
access_lists = await client.get_access_lists()
|
||||
|
||||
if not access_lists:
|
||||
return "No access lists configured."
|
||||
|
||||
result = []
|
||||
for al in access_lists:
|
||||
result.append(f"[{al.id}] {al.name}")
|
||||
|
||||
return f"Found {len(access_lists)} access list(s):\n\n" + "\n".join(result)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def create_proxy_host(
|
||||
domain_names: list[str],
|
||||
forward_host: str,
|
||||
forward_port: int,
|
||||
forward_scheme: str | None = None,
|
||||
certificate_id: int | None = None,
|
||||
ssl_forced: bool | None = None,
|
||||
block_exploits: bool | None = None,
|
||||
allow_websocket_upgrade: bool | None = None,
|
||||
access_list_id: int | None = None,
|
||||
advanced_config: str | None = None,
|
||||
) -> str:
|
||||
"""Create a new proxy host in Nginx Proxy Manager.
|
||||
|
||||
Args:
|
||||
domain_names: List of domain names (e.g., ["app.ext.ben.io"])
|
||||
forward_host: Backend host/IP to forward to (e.g., "192.168.1.100" or "container-name")
|
||||
forward_port: Backend port to forward to (e.g., 8080)
|
||||
forward_scheme: Backend protocol - "http" or "https" (default from config)
|
||||
certificate_id: SSL certificate ID. Use list_certificates to find available certs.
|
||||
Use 0 for no SSL, or the ID of a wildcard cert. (default from config)
|
||||
ssl_forced: Force HTTPS redirect (default from config)
|
||||
block_exploits: Enable common exploit blocking (default from config)
|
||||
allow_websocket_upgrade: Allow WebSocket connections (default from config)
|
||||
access_list_id: Access list ID for authentication. Use list_access_lists to find.
|
||||
Use 0 for no access restrictions. (default from config)
|
||||
advanced_config: Custom nginx configuration block (default from config)
|
||||
|
||||
Returns:
|
||||
Details of the created proxy host including the new host ID.
|
||||
|
||||
Note:
|
||||
Default values can be configured via NPM_PROXY_DEFAULTS environment variable.
|
||||
Example: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
|
||||
|
||||
Example:
|
||||
create_proxy_host(
|
||||
domain_names=["myapp.ext.ben.io"],
|
||||
forward_host="10.0.0.50",
|
||||
forward_port=3000,
|
||||
certificate_id=24, # *.ext.ben.io wildcard
|
||||
)
|
||||
"""
|
||||
try:
|
||||
# Get defaults from config, then override with provided values
|
||||
defaults = settings.get_proxy_defaults()
|
||||
|
||||
client = get_client()
|
||||
host = await client.create_proxy_host(
|
||||
domain_names=domain_names,
|
||||
forward_host=forward_host,
|
||||
forward_port=forward_port,
|
||||
forward_scheme=forward_scheme
|
||||
if forward_scheme is not None
|
||||
else defaults["forward_scheme"],
|
||||
certificate_id=certificate_id
|
||||
if certificate_id is not None
|
||||
else defaults["certificate_id"],
|
||||
ssl_forced=ssl_forced if ssl_forced is not None else defaults["ssl_forced"],
|
||||
hsts_enabled=defaults.get("hsts_enabled", True),
|
||||
hsts_subdomains=defaults.get("hsts_subdomains", False),
|
||||
http2_support=defaults.get("http2_support", True),
|
||||
block_exploits=block_exploits
|
||||
if block_exploits is not None
|
||||
else defaults["block_exploits"],
|
||||
caching_enabled=defaults.get("caching_enabled", False),
|
||||
allow_websocket_upgrade=allow_websocket_upgrade
|
||||
if allow_websocket_upgrade is not None
|
||||
else defaults["allow_websocket_upgrade"],
|
||||
access_list_id=access_list_id
|
||||
if access_list_id is not None
|
||||
else defaults["access_list_id"],
|
||||
advanced_config=advanced_config
|
||||
if advanced_config is not None
|
||||
else defaults["advanced_config"],
|
||||
meta=defaults.get("meta", {}),
|
||||
)
|
||||
|
||||
domains = ", ".join(host.domain_names)
|
||||
return (
|
||||
f"Successfully created proxy host!\n\n"
|
||||
f"ID: {host.id}\n"
|
||||
f"Domains: {domains}\n"
|
||||
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
|
||||
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return _format_error(e)
|
||||
|
||||
@@ -35,6 +35,53 @@ def mock_proxy_hosts():
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_access_lists():
|
||||
"""Mock access lists response."""
|
||||
return [
|
||||
{
|
||||
"id": 1,
|
||||
"created_on": "2024-01-01T00:00:00Z",
|
||||
"modified_on": "2024-01-01T00:00:00Z",
|
||||
"owner_user_id": 1,
|
||||
"name": "Admin Only",
|
||||
"satisfy_any": False,
|
||||
"pass_auth": True,
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"created_on": "2024-01-02T00:00:00Z",
|
||||
"modified_on": "2024-01-02T00:00:00Z",
|
||||
"owner_user_id": 1,
|
||||
"name": "Internal Network",
|
||||
"satisfy_any": True,
|
||||
"pass_auth": False,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_created_proxy_host():
|
||||
"""Mock response for created proxy host."""
|
||||
return {
|
||||
"id": 42,
|
||||
"created_on": "2024-01-15T10:00:00Z",
|
||||
"modified_on": "2024-01-15T10:00:00Z",
|
||||
"owner_user_id": 1,
|
||||
"domain_names": ["newapp.example.com"],
|
||||
"forward_host": "10.0.0.50",
|
||||
"forward_port": 3000,
|
||||
"forward_scheme": "http",
|
||||
"enabled": True,
|
||||
"ssl_forced": True,
|
||||
"certificate_id": 24,
|
||||
"block_exploits": True,
|
||||
"allow_websocket_upgrade": True,
|
||||
"access_list_id": 0,
|
||||
"advanced_config": "",
|
||||
}
|
||||
|
||||
|
||||
class TestNpmClientAuth:
|
||||
"""Test authentication logic."""
|
||||
|
||||
@@ -114,3 +161,69 @@ class TestNpmClientEndpoints:
|
||||
assert hosts[0].id == 1
|
||||
assert hosts[0].domain_names == ["example.com"]
|
||||
assert hosts[0].forward_host == "192.168.1.100"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_access_lists(self, httpx_mock, mock_token_response, mock_access_lists):
|
||||
"""Test fetching access lists."""
|
||||
httpx_mock.add_response(
|
||||
method="POST",
|
||||
url="http://localhost:81/api/tokens",
|
||||
json=mock_token_response,
|
||||
)
|
||||
httpx_mock.add_response(
|
||||
method="GET",
|
||||
url="http://localhost:81/api/nginx/access-lists",
|
||||
json=mock_access_lists,
|
||||
)
|
||||
|
||||
async with NpmClient(
|
||||
base_url="http://localhost:81/api",
|
||||
identity="test@test.com",
|
||||
secret="password",
|
||||
) as client:
|
||||
access_lists = await client.get_access_lists()
|
||||
|
||||
assert len(access_lists) == 2
|
||||
assert access_lists[0].id == 1
|
||||
assert access_lists[0].name == "Admin Only"
|
||||
assert access_lists[0].pass_auth is True
|
||||
assert access_lists[1].id == 2
|
||||
assert access_lists[1].name == "Internal Network"
|
||||
assert access_lists[1].satisfy_any is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_proxy_host(
|
||||
self, httpx_mock, mock_token_response, mock_created_proxy_host
|
||||
):
|
||||
"""Test creating a proxy host."""
|
||||
httpx_mock.add_response(
|
||||
method="POST",
|
||||
url="http://localhost:81/api/tokens",
|
||||
json=mock_token_response,
|
||||
)
|
||||
httpx_mock.add_response(
|
||||
method="POST",
|
||||
url="http://localhost:81/api/nginx/proxy-hosts",
|
||||
json=mock_created_proxy_host,
|
||||
status_code=201,
|
||||
)
|
||||
|
||||
async with NpmClient(
|
||||
base_url="http://localhost:81/api",
|
||||
identity="test@test.com",
|
||||
secret="password",
|
||||
) as client:
|
||||
host = await client.create_proxy_host(
|
||||
domain_names=["newapp.example.com"],
|
||||
forward_host="10.0.0.50",
|
||||
forward_port=3000,
|
||||
certificate_id=24,
|
||||
ssl_forced=True,
|
||||
)
|
||||
|
||||
assert host.id == 42
|
||||
assert host.domain_names == ["newapp.example.com"]
|
||||
assert host.forward_host == "10.0.0.50"
|
||||
assert host.forward_port == 3000
|
||||
assert host.ssl_forced is True
|
||||
assert host.certificate_id == 24
|
||||
|
||||
91
tests/test_config.py
Normal file
91
tests/test_config.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Tests for configuration handling."""
|
||||
|
||||
import pytest
|
||||
from pydantic_settings.exceptions import SettingsError
|
||||
|
||||
from npm_mcp.config import DEFAULT_PROXY_SETTINGS, Settings
|
||||
|
||||
|
||||
class TestProxyDefaults:
|
||||
"""Test NPM_PROXY_DEFAULTS parsing and merging."""
|
||||
|
||||
def test_default_proxy_settings(self):
|
||||
"""Test that default settings are correct."""
|
||||
settings = Settings(identity="test", secret="test")
|
||||
defaults = settings.get_proxy_defaults()
|
||||
|
||||
assert defaults["forward_scheme"] == "http"
|
||||
assert defaults["certificate_id"] == 0
|
||||
assert defaults["ssl_forced"] is True
|
||||
assert defaults["block_exploits"] is True
|
||||
assert defaults["allow_websocket_upgrade"] is True
|
||||
assert defaults["access_list_id"] == 0
|
||||
assert defaults["advanced_config"] == ""
|
||||
|
||||
def test_proxy_defaults_json_parsing(self, monkeypatch):
|
||||
"""Test parsing JSON string from environment variable."""
|
||||
monkeypatch.setenv("NPM_IDENTITY", "test")
|
||||
monkeypatch.setenv("NPM_SECRET", "test")
|
||||
monkeypatch.setenv("NPM_PROXY_DEFAULTS", '{"certificate_id": 24, "ssl_forced": false}')
|
||||
|
||||
settings = Settings()
|
||||
defaults = settings.get_proxy_defaults()
|
||||
|
||||
# Overridden values
|
||||
assert defaults["certificate_id"] == 24
|
||||
assert defaults["ssl_forced"] is False
|
||||
|
||||
# Default values preserved
|
||||
assert defaults["forward_scheme"] == "http"
|
||||
assert defaults["block_exploits"] is True
|
||||
|
||||
def test_proxy_defaults_dict_passthrough(self):
|
||||
"""Test that dict values pass through correctly."""
|
||||
settings = Settings(
|
||||
identity="test",
|
||||
secret="test",
|
||||
proxy_defaults={"certificate_id": 18, "access_list_id": 5},
|
||||
)
|
||||
defaults = settings.get_proxy_defaults()
|
||||
|
||||
assert defaults["certificate_id"] == 18
|
||||
assert defaults["access_list_id"] == 5
|
||||
|
||||
def test_proxy_defaults_empty_env_raises(self, monkeypatch):
|
||||
"""Test that empty string env var raises SettingsError."""
|
||||
monkeypatch.setenv("NPM_IDENTITY", "test")
|
||||
monkeypatch.setenv("NPM_SECRET", "test")
|
||||
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "")
|
||||
|
||||
# pydantic-settings tries to JSON decode empty string and fails
|
||||
with pytest.raises(SettingsError):
|
||||
Settings()
|
||||
|
||||
def test_proxy_defaults_invalid_json_raises(self, monkeypatch):
|
||||
"""Test that invalid JSON raises SettingsError."""
|
||||
monkeypatch.setenv("NPM_IDENTITY", "test")
|
||||
monkeypatch.setenv("NPM_SECRET", "test")
|
||||
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "{not valid json}")
|
||||
|
||||
# pydantic-settings tries to JSON decode and fails
|
||||
with pytest.raises(SettingsError):
|
||||
Settings()
|
||||
|
||||
def test_proxy_defaults_merges_not_replaces(self):
|
||||
"""Test that user defaults merge with base defaults."""
|
||||
settings = Settings(
|
||||
identity="test",
|
||||
secret="test",
|
||||
proxy_defaults={"certificate_id": 24},
|
||||
)
|
||||
defaults = settings.get_proxy_defaults()
|
||||
|
||||
# All keys should be present
|
||||
assert set(defaults.keys()) == set(DEFAULT_PROXY_SETTINGS.keys())
|
||||
|
||||
# User value applied
|
||||
assert defaults["certificate_id"] == 24
|
||||
|
||||
# Other defaults preserved
|
||||
assert defaults["ssl_forced"] is True
|
||||
assert defaults["block_exploits"] is True
|
||||
Reference in New Issue
Block a user