Compare commits

2 Commits

Author SHA1 Message Date
Ben
15b2876d7b chore: Update version to 0.0.2 2025-12-24 21:23:15 +00:00
b3nw
52eb484432 feat: Add proxy host creation and access list tools (#1)
Add MCP tools for creating proxy hosts and listing access lists, with configurable defaults via environment variable.

New tools:
- create_proxy_host: Create new proxy hosts with SSL, access control, and websocket support
- list_access_lists: List available access lists for use in proxy host creation

New features:
- NPM_PROXY_DEFAULTS env var for configurable default values (certificate_id, access_list_id, ssl settings, etc.)
2025-12-24 15:20:45 -06:00
10 changed files with 492 additions and 13 deletions

View File

@@ -26,7 +26,6 @@ jobs:
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
@@ -49,7 +48,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha

View File

@@ -49,8 +49,41 @@ NPM_SECRET=yourpassword
# Optional: Server settings
NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # or "http"
# Optional: Default values for create_proxy_host (JSON)
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
```
### Environment Variables
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `NPM_API_URL` | Yes | `http://localhost:81/api` | NPM API endpoint |
| `NPM_IDENTITY` | Yes | - | NPM user email |
| `NPM_SECRET` | Yes | - | NPM user password |
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
### NPM_PROXY_DEFAULTS Keys
Configure default values for proxy host creation:
```bash
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true, "block_exploits": true}'
```
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `forward_scheme` | string | `"http"` | Backend protocol (`http` or `https`) |
| `certificate_id` | int | `0` | SSL certificate ID (use `list_certificates` to find) |
| `ssl_forced` | bool | `true` | Force HTTPS redirect |
| `block_exploits` | bool | `true` | Enable common exploit blocking |
| `allow_websocket_upgrade` | bool | `true` | Allow WebSocket connections |
| `access_list_id` | int | `0` | Access list ID (use `list_access_lists` to find) |
| `advanced_config` | string | `""` | Custom nginx configuration block |
## Usage
### Stdio Mode (for Claude Desktop, etc.)
@@ -95,6 +128,9 @@ Add to your `claude_desktop_config.json`:
| `get_proxy_host_details` | Get full config for a specific host |
| `get_system_health` | Check NPM version and status |
| `search_audit_logs` | Query audit log entries |
| `list_certificates` | List SSL certificates |
| `list_access_lists` | List access lists for authentication/IP restrictions |
| `create_proxy_host` | Create a new proxy host |
## Development

View File

@@ -7,3 +7,8 @@ NPM_SECRET=changeme
NPM_MCP_HOST=0.0.0.0
NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # stdio or http
# Proxy Host Creation Defaults (JSON)
# Set default values for create_proxy_host tool parameters
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
# NPM_PROXY_DEFAULTS='{"certificate_id": 0, "ssl_forced": true, "block_exploits": true, "allow_websocket_upgrade": true}'

View File

@@ -1,6 +1,6 @@
[project]
name = "npm-mcp"
version = "0.1.0"
version = "0.0.2"
description = "MCP server for Nginx Proxy Manager"
readme = "README.md"
requires-python = ">=3.11"

View File

@@ -13,6 +13,7 @@ from .exceptions import (
NpmNotFoundError,
)
from .models import (
AccessList,
AuditLogEntry,
Certificate,
HealthStatus,
@@ -92,9 +93,7 @@ class NpmClient:
raise NpmAuthenticationError("Invalid credentials")
if response.status_code != 200:
raise NpmApiError(
f"Login failed: {response.text}", status_code=response.status_code
)
raise NpmApiError(f"Login failed: {response.text}", status_code=response.status_code)
data = response.json()
token_response = TokenResponse(**data)
@@ -172,9 +171,7 @@ class NpmClient:
raise NpmNotFoundError(f"Resource not found: {endpoint}")
if response.status_code >= 400:
raise NpmApiError(
f"API error: {response.text}", status_code=response.status_code
)
raise NpmApiError(f"API error: {response.text}", status_code=response.status_code)
return response
@@ -259,3 +256,70 @@ class NpmClient:
)
data = response.json()
return [AuditLogEntry(**entry) for entry in data]
async def get_access_lists(self) -> list[AccessList]:
"""Get all access lists."""
response = await self._request("GET", "/nginx/access-lists")
data = response.json()
return [AccessList(**item) for item in data]
async def create_proxy_host(
self,
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str = "http",
certificate_id: int | None = None,
ssl_forced: bool = True,
hsts_enabled: bool = True,
hsts_subdomains: bool = False,
http2_support: bool = True,
block_exploits: bool = True,
caching_enabled: bool = False,
allow_websocket_upgrade: bool = True,
access_list_id: int = 0,
advanced_config: str = "",
meta: dict | None = None,
) -> ProxyHost:
"""Create a new proxy host.
Args:
domain_names: List of domain names for this host
forward_host: Backend host to forward to
forward_port: Backend port to forward to
forward_scheme: http or https
certificate_id: SSL certificate ID (0 for none, use list_certificates to find)
ssl_forced: Force SSL/HTTPS
hsts_enabled: Enable HSTS
hsts_subdomains: Include subdomains in HSTS
http2_support: Enable HTTP/2
block_exploits: Enable exploit blocking
caching_enabled: Enable caching
allow_websocket_upgrade: Allow WebSocket upgrades
access_list_id: Access list ID (0 for none, use list_access_lists to find)
advanced_config: Custom nginx configuration
meta: Additional metadata
Returns:
Created ProxyHost object
"""
payload = {
"domain_names": domain_names,
"forward_host": forward_host,
"forward_port": forward_port,
"forward_scheme": forward_scheme,
"certificate_id": certificate_id or 0,
"ssl_forced": ssl_forced,
"hsts_enabled": hsts_enabled,
"hsts_subdomains": hsts_subdomains,
"http2_support": http2_support,
"block_exploits": block_exploits,
"caching_enabled": caching_enabled,
"allow_websocket_upgrade": allow_websocket_upgrade,
"access_list_id": access_list_id,
"advanced_config": advanced_config,
"meta": meta or {},
}
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
return ProxyHost(**response.json())

View File

@@ -1,7 +1,26 @@
"""Configuration management using pydantic-settings."""
from typing import Any
from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
# Default values for proxy host creation
DEFAULT_PROXY_SETTINGS: dict[str, Any] = {
"forward_scheme": "http",
"certificate_id": 0,
"ssl_forced": True,
"hsts_enabled": True,
"hsts_subdomains": False,
"http2_support": True,
"caching_enabled": False,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
"meta": {},
}
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
@@ -23,5 +42,30 @@ class Settings(BaseSettings):
mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http"
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}
@field_validator("proxy_defaults", mode="before")
@classmethod
def parse_proxy_defaults(cls, v: Any) -> dict[str, Any]:
"""Parse JSON string to dict, or pass through if already dict."""
if isinstance(v, dict):
return v
if isinstance(v, str) and v.strip():
import json
try:
return json.loads(v)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in NPM_PROXY_DEFAULTS: {e}") from e
return {}
def get_proxy_defaults(self) -> dict[str, Any]:
"""Get merged proxy defaults (base defaults + user overrides)."""
merged = DEFAULT_PROXY_SETTINGS.copy()
merged.update(self.proxy_defaults)
return merged
settings = Settings()

View File

@@ -34,6 +34,18 @@ class Owner(BaseModel):
roles: list[str]
class AccessList(BaseModel):
"""Access list for authentication/IP restrictions."""
id: int
created_on: datetime
modified_on: datetime
owner_user_id: int = 0
name: str
satisfy_any: bool = False
pass_auth: bool = False
class Certificate(BaseModel):
"""SSL Certificate information."""
@@ -81,7 +93,7 @@ class ProxyHost(BaseModel):
advanced_config: str = ""
enabled: bool = True
meta: dict[str, Any] = Field(default_factory=dict)
locations: list[ProxyHostLocation] = Field(default_factory=list)
locations: list[ProxyHostLocation] | None = None
# Optional expanded relations
owner: Owner | None = None
certificate: Certificate | None = None

View File

@@ -179,7 +179,7 @@ async def get_system_health() -> str:
try:
await client._ensure_authenticated()
result.append("Authenticated: ✅")
# Try to get settings (admin only)
try:
settings_list = await client.get_settings()
@@ -253,11 +253,126 @@ async def list_certificates() -> str:
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
result.append(
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n"
f" Domains: {domains}{expiry}"
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n Domains: {domains}{expiry}"
)
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def list_access_lists() -> str:
"""List all access lists configured in Nginx Proxy Manager.
Returns a summary of all access lists including their IDs and names.
Use these IDs when creating proxy hosts that require access control.
"""
try:
client = get_client()
access_lists = await client.get_access_lists()
if not access_lists:
return "No access lists configured."
result = []
for al in access_lists:
result.append(f"[{al.id}] {al.name}")
return f"Found {len(access_lists)} access list(s):\n\n" + "\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_proxy_host(
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
) -> str:
"""Create a new proxy host in Nginx Proxy Manager.
Args:
domain_names: List of domain names (e.g., ["app.ext.ben.io"])
forward_host: Backend host/IP to forward to (e.g., "192.168.1.100" or "container-name")
forward_port: Backend port to forward to (e.g., 8080)
forward_scheme: Backend protocol - "http" or "https" (default from config)
certificate_id: SSL certificate ID. Use list_certificates to find available certs.
Use 0 for no SSL, or the ID of a wildcard cert. (default from config)
ssl_forced: Force HTTPS redirect (default from config)
block_exploits: Enable common exploit blocking (default from config)
allow_websocket_upgrade: Allow WebSocket connections (default from config)
access_list_id: Access list ID for authentication. Use list_access_lists to find.
Use 0 for no access restrictions. (default from config)
advanced_config: Custom nginx configuration block (default from config)
Returns:
Details of the created proxy host including the new host ID.
Note:
Default values can be configured via NPM_PROXY_DEFAULTS environment variable.
Example: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
Example:
create_proxy_host(
domain_names=["myapp.ext.ben.io"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24, # *.ext.ben.io wildcard
)
"""
try:
# Get defaults from config, then override with provided values
defaults = settings.get_proxy_defaults()
client = get_client()
host = await client.create_proxy_host(
domain_names=domain_names,
forward_host=forward_host,
forward_port=forward_port,
forward_scheme=forward_scheme
if forward_scheme is not None
else defaults["forward_scheme"],
certificate_id=certificate_id
if certificate_id is not None
else defaults["certificate_id"],
ssl_forced=ssl_forced if ssl_forced is not None else defaults["ssl_forced"],
hsts_enabled=defaults.get("hsts_enabled", True),
hsts_subdomains=defaults.get("hsts_subdomains", False),
http2_support=defaults.get("http2_support", True),
block_exploits=block_exploits
if block_exploits is not None
else defaults["block_exploits"],
caching_enabled=defaults.get("caching_enabled", False),
allow_websocket_upgrade=allow_websocket_upgrade
if allow_websocket_upgrade is not None
else defaults["allow_websocket_upgrade"],
access_list_id=access_list_id
if access_list_id is not None
else defaults["access_list_id"],
advanced_config=advanced_config
if advanced_config is not None
else defaults["advanced_config"],
meta=defaults.get("meta", {}),
)
domains = ", ".join(host.domain_names)
return (
f"Successfully created proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}"
)
except Exception as e:
return _format_error(e)

View File

@@ -35,6 +35,53 @@ def mock_proxy_hosts():
]
@pytest.fixture
def mock_access_lists():
"""Mock access lists response."""
return [
{
"id": 1,
"created_on": "2024-01-01T00:00:00Z",
"modified_on": "2024-01-01T00:00:00Z",
"owner_user_id": 1,
"name": "Admin Only",
"satisfy_any": False,
"pass_auth": True,
},
{
"id": 2,
"created_on": "2024-01-02T00:00:00Z",
"modified_on": "2024-01-02T00:00:00Z",
"owner_user_id": 1,
"name": "Internal Network",
"satisfy_any": True,
"pass_auth": False,
},
]
@pytest.fixture
def mock_created_proxy_host():
"""Mock response for created proxy host."""
return {
"id": 42,
"created_on": "2024-01-15T10:00:00Z",
"modified_on": "2024-01-15T10:00:00Z",
"owner_user_id": 1,
"domain_names": ["newapp.example.com"],
"forward_host": "10.0.0.50",
"forward_port": 3000,
"forward_scheme": "http",
"enabled": True,
"ssl_forced": True,
"certificate_id": 24,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
}
class TestNpmClientAuth:
"""Test authentication logic."""
@@ -114,3 +161,69 @@ class TestNpmClientEndpoints:
assert hosts[0].id == 1
assert hosts[0].domain_names == ["example.com"]
assert hosts[0].forward_host == "192.168.1.100"
@pytest.mark.asyncio
async def test_get_access_lists(self, httpx_mock, mock_token_response, mock_access_lists):
"""Test fetching access lists."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/access-lists",
json=mock_access_lists,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
access_lists = await client.get_access_lists()
assert len(access_lists) == 2
assert access_lists[0].id == 1
assert access_lists[0].name == "Admin Only"
assert access_lists[0].pass_auth is True
assert access_lists[1].id == 2
assert access_lists[1].name == "Internal Network"
assert access_lists[1].satisfy_any is True
@pytest.mark.asyncio
async def test_create_proxy_host(
self, httpx_mock, mock_token_response, mock_created_proxy_host
):
"""Test creating a proxy host."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/nginx/proxy-hosts",
json=mock_created_proxy_host,
status_code=201,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
host = await client.create_proxy_host(
domain_names=["newapp.example.com"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24,
ssl_forced=True,
)
assert host.id == 42
assert host.domain_names == ["newapp.example.com"]
assert host.forward_host == "10.0.0.50"
assert host.forward_port == 3000
assert host.ssl_forced is True
assert host.certificate_id == 24

91
tests/test_config.py Normal file
View File

@@ -0,0 +1,91 @@
"""Tests for configuration handling."""
import pytest
from pydantic_settings.exceptions import SettingsError
from npm_mcp.config import DEFAULT_PROXY_SETTINGS, Settings
class TestProxyDefaults:
"""Test NPM_PROXY_DEFAULTS parsing and merging."""
def test_default_proxy_settings(self):
"""Test that default settings are correct."""
settings = Settings(identity="test", secret="test")
defaults = settings.get_proxy_defaults()
assert defaults["forward_scheme"] == "http"
assert defaults["certificate_id"] == 0
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True
assert defaults["allow_websocket_upgrade"] is True
assert defaults["access_list_id"] == 0
assert defaults["advanced_config"] == ""
def test_proxy_defaults_json_parsing(self, monkeypatch):
"""Test parsing JSON string from environment variable."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", '{"certificate_id": 24, "ssl_forced": false}')
settings = Settings()
defaults = settings.get_proxy_defaults()
# Overridden values
assert defaults["certificate_id"] == 24
assert defaults["ssl_forced"] is False
# Default values preserved
assert defaults["forward_scheme"] == "http"
assert defaults["block_exploits"] is True
def test_proxy_defaults_dict_passthrough(self):
"""Test that dict values pass through correctly."""
settings = Settings(
identity="test",
secret="test",
proxy_defaults={"certificate_id": 18, "access_list_id": 5},
)
defaults = settings.get_proxy_defaults()
assert defaults["certificate_id"] == 18
assert defaults["access_list_id"] == 5
def test_proxy_defaults_empty_env_raises(self, monkeypatch):
"""Test that empty string env var raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "")
# pydantic-settings tries to JSON decode empty string and fails
with pytest.raises(SettingsError):
Settings()
def test_proxy_defaults_invalid_json_raises(self, monkeypatch):
"""Test that invalid JSON raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "{not valid json}")
# pydantic-settings tries to JSON decode and fails
with pytest.raises(SettingsError):
Settings()
def test_proxy_defaults_merges_not_replaces(self):
"""Test that user defaults merge with base defaults."""
settings = Settings(
identity="test",
secret="test",
proxy_defaults={"certificate_id": 24},
)
defaults = settings.get_proxy_defaults()
# All keys should be present
assert set(defaults.keys()) == set(DEFAULT_PROXY_SETTINGS.keys())
# User value applied
assert defaults["certificate_id"] == 24
# Other defaults preserved
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True