8 Commits

Author SHA1 Message Date
b3nw 77318c428e fix: Add hsts_subdomains and meta to DEFAULT_PROXY_SETTINGS 2025-12-24 21:14:23 +00:00
b3nw 474784d4bc fix: Add hsts_enabled, http2_support, and caching_enabled to defaults 2025-12-24 20:10:47 +00:00
b3nw 43d6ed4ca1 ci: Enable pushing PR builds to GHCR with pr-N tags 2025-12-24 17:09:04 +00:00
b3nw a9011ff428 fix: Allow locations to be null in ProxyHost model 2025-12-24 16:59:33 +00:00
b3nw ea137943fc test: Add tests for access lists, proxy host creation, and config defaults 2025-12-24 15:45:14 +00:00
b3nw a9f4379c98 docs: Update README with new tools and NPM_PROXY_DEFAULTS
- Add list_access_lists and create_proxy_host to tools table
- Document all environment variables in a table
- Add NPM_PROXY_DEFAULTS JSON configuration section
2025-12-24 15:34:20 +00:00
b3nw e8df05e3f3 feat: Add configurable defaults via NPM_PROXY_DEFAULTS env var
- Add NPM_PROXY_DEFAULTS JSON environment variable for default settings
- Merge user overrides with base defaults (certificate_id, ssl_forced, etc.)
- Update create_proxy_host to use None defaults and pull from config
- Update env.example with documentation and examples
2025-12-24 15:30:03 +00:00
b3nw 69158a871b feat: Add proxy host creation and access list tools
- Add AccessList model for authentication/IP restrictions
- Add get_access_lists() client method to query access lists
- Add create_proxy_host() client method with full parameter support
- Add list_access_lists MCP tool for reference data
- Add create_proxy_host MCP tool for automated proxy creation

This enables automated proxy host creation as part of docker agent
setup workflows.
2025-12-24 15:25:01 +00:00
18 changed files with 50 additions and 1937 deletions
+1 -5
View File
@@ -22,9 +22,6 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
@@ -51,8 +48,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
-91
View File
@@ -64,7 +64,6 @@ NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
| `NPM_LOG_DIR` | No | - | Path to mounted NPM log directory (enables `get_proxy_host_logs`) |
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
### NPM_PROXY_DEFAULTS Keys
@@ -127,101 +126,11 @@ Add to your `claude_desktop_config.json`:
|------|-------------|
| `list_proxy_hosts` | List all proxy hosts |
| `get_proxy_host_details` | Get full config for a specific host |
| `get_proxy_host_logs` | Retrieve nginx access/error logs for a proxy host (requires log mount) |
| `get_system_health` | Check NPM version and status |
| `search_audit_logs` | Query audit log entries |
| `list_certificates` | List SSL certificates |
| `list_access_lists` | List access lists for authentication/IP restrictions |
| `create_proxy_host` | Create a new proxy host |
| `update_proxy_host` | Update an existing proxy host (v0.0.3+) |
| `create_certificate` | Provision a new Let's Encrypt SSL certificate (v0.0.3+) |
## Log Access Setup
The `get_proxy_host_logs` tool reads nginx log files directly from disk. Since NPM has no API for log retrieval, you need to mount NPM's log directory into the MCP container.
NPM writes per-host logs to `/data/logs/` inside its container:
- `proxy-host-{id}_access.log` — HTTP request log (client IP, status, path, user agent)
- `proxy-host-{id}_error.log` — nginx error log (upstream failures, config issues)
### Docker Compose (same stack)
If NPM and the MCP server share a compose stack with a named volume:
```yaml
services:
nginx-proxy-manager:
image: jc21/nginx-proxy-manager:latest
volumes:
- npm_data:/data
npm-mcp:
image: ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
environment:
- NPM_API_URL=http://nginx-proxy-manager:81/api
- NPM_IDENTITY=admin@example.com
- NPM_SECRET=yourpassword
- NPM_LOG_DIR=/data/npm-logs
volumes:
# Mount NPM's /data volume — logs are in /data/logs/ inside it
- npm_data:/data/npm-logs:ro
depends_on:
- nginx-proxy-manager
volumes:
npm_data:
```
> **Note:** NPM stores logs under `/data/logs/` inside its data volume. When you
> mount the full `/data` volume to `/data/npm-logs`, the MCP server looks for logs at
> `/data/npm-logs/logs/`. Set `NPM_LOG_DIR` to match your mount path plus `/logs`.
If you mounted the full data volume:
```bash
NPM_LOG_DIR=/data/npm-logs/logs
```
### Bind Mount (separate stacks)
If NPM uses a bind mount (e.g., `./npm-data:/data`), mount the logs subdirectory directly:
```yaml
npm-mcp:
volumes:
- /path/to/npm-data/logs:/data/npm-logs:ro
environment:
- NPM_LOG_DIR=/data/npm-logs
```
### Docker Run
```bash
docker run -d \
--name npm-mcp \
-p 8000:8000 \
-v npm_data:/data/npm-logs:ro \
-e NPM_API_URL=http://your-npm:81/api \
-e NPM_IDENTITY=admin@example.com \
-e NPM_SECRET=yourpassword \
-e NPM_LOG_DIR=/data/npm-logs/logs \
-e NPM_MCP_TRANSPORT=http \
ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
```
### Local Development (non-Docker)
Point `NPM_LOG_DIR` at wherever NPM's logs are on your filesystem:
```bash
NPM_LOG_DIR=/path/to/npm/data/logs npm-mcp
```
### Verifying the Mount
After starting, call `get_system_health` — if the log directory is mounted and accessible
the tool will confirm it. You can also call `get_proxy_host_logs` with any host ID to
verify logs are readable.
## Development
+1 -9
View File
@@ -16,11 +16,6 @@ services:
- NPM_MCP_TRANSPORT=http
- NPM_MCP_HOST=0.0.0.0
- NPM_MCP_PORT=8000
# Optional: Log access (requires volume mount below)
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# # Mount NPM's log directory for get_proxy_host_logs tool (read-only)
# - npm_data:/data/npm-logs:ro # named volume — see "Log Access" in README
# Uncomment to use .env file instead of inline environment
# env_file:
# - .env
@@ -31,7 +26,7 @@ services:
retries: 3
start_period: 10s
# Example: Running alongside NPM in same compose stack (with log access)
# Example: Running alongside NPM in same compose stack
# services:
# nginx-proxy-manager:
# image: jc21/nginx-proxy-manager:latest
@@ -49,9 +44,6 @@ services:
# - NPM_API_URL=http://nginx-proxy-manager:81/api
# - NPM_IDENTITY=admin@example.com
# - NPM_SECRET=changeme
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# - npm_data:/data/npm-logs:ro
# depends_on:
# - nginx-proxy-manager
#
-176
View File
@@ -1,176 +0,0 @@
# Feature Request: Per-Host Log Retrieval API
**Project:** [NginxProxyManager/nginx-proxy-manager](https://github.com/NginxProxyManager/nginx-proxy-manager)
**Type:** Feature Request
**Status:** Draft PRD
## Problem Statement
Nginx Proxy Manager writes per-host access and error logs to predictable paths on disk
(`/data/logs/proxy-host-{id}_access.log`, `/data/logs/proxy-host-{id}_error.log`), but
provides no API to read them. The only log-related API is the audit log (`GET /api/audit-log`),
which tracks admin configuration changes — not HTTP traffic.
This means operators and automation tools have no programmatic way to:
- Retrieve recent access log entries for a specific proxy host
- Check error logs when debugging upstream connectivity issues
- Monitor traffic patterns or detect anomalies through the existing API surface
The only workarounds today are direct filesystem access (requiring volume mounts or
`docker exec`) or external log aggregation pipelines, both of which add significant
operational complexity for a task that should be simple.
## Proposed Solution
Add REST API endpoints to retrieve nginx access and error logs for individual proxy hosts.
### New Endpoints
#### `GET /api/nginx/proxy-hosts/{id}/logs`
Retrieve log entries for a specific proxy host.
**Query Parameters:**
| Parameter | Type | Default | Description |
|------------|--------|------------|-------------------------------------------------------|
| `type` | string | `"access"` | Log type: `"access"` or `"error"` |
| `lines` | int | `100` | Number of most recent lines to return (max: `1000`) |
| `search` | string | - | Filter lines containing this substring |
| `since` | string | - | ISO 8601 timestamp — only return lines after this time|
**Response (200):**
```json
{
"host_id": 5,
"log_type": "access",
"file": "proxy-host-5_access.log",
"total_lines": 4821,
"returned_lines": 100,
"lines": [
"[01/Jun/2025:14:22:31 +0000] HIT 200 200 - GET https app.example.com \"/api/data\" [Client 10.0.0.1] [Length 1542] [Gzip -] [Sent-to 192.168.1.50] \"Mozilla/5.0\" \"https://app.example.com/\"",
"..."
]
}
```
**Error Responses:**
| Status | Condition |
|--------|-------------------------------------|
| 404 | Proxy host not found |
| 404 | Log file does not exist |
| 403 | User lacks permission for this host |
#### `GET /api/nginx/proxy-hosts/{id}/logs/summary`
Return a statistical summary of recent traffic for a proxy host.
**Response (200):**
```json
{
"host_id": 5,
"period": "last_1000_lines",
"status_codes": {"200": 812, "301": 45, "404": 23, "500": 3},
"top_paths": ["/api/data", "/", "/login"],
"top_clients": ["10.0.0.1", "10.0.0.5"],
"cache_hit_rate": 0.42,
"access_log_size_bytes": 524288,
"error_log_size_bytes": 8192
}
```
### Permissions
| Permission | Description |
|--------------------|--------------------------------------------|
| `proxy-hosts:logs` | Read logs for proxy hosts the user can view |
Admin users can read logs for any host. Non-admin users can only read logs
for hosts they own, consistent with existing proxy host permissions.
### Backend Implementation Notes
The implementation is straightforward because log paths are already deterministic
and hardcoded in nginx templates:
```javascript
// backend/templates/proxy_host.conf
access_log /data/logs/proxy-host-{{ id }}_access.log proxy;
error_log /data/logs/proxy-host-{{ id }}_error.log warn;
```
A minimal implementation would:
1. Add a new route in `backend/routes/` (e.g., `proxy-host-logs.js`)
2. Verify the proxy host exists and the user has access
3. Read the last N lines from the log file using a reverse-reader (or `tail`-like approach)
4. Optionally filter lines by substring or timestamp
5. Return as JSON
**Reference files for implementation:**
- `backend/routes/nginx/proxy_hosts.js` — existing proxy host routes and permission model
- `backend/templates/proxy_host.conf` — log path template confirming the naming convention
- `backend/lib/access/` — permission definition files
- `docker/rootfs/etc/logrotate.d/nginx-proxy-manager` — log rotation config (rotated logs
have `.1`, `.2.gz` suffixes)
### Log Rotation Consideration
NPM rotates logs weekly (access: 4 rotations, error: 10 rotations). The API should
read only the current (unrotated) log file. Rotated archives (`.1`, `.2.gz`) could be
supported in a future iteration but are not required for the initial implementation.
## Motivation
### Use Cases
1. **MCP/AI Agent Integration** — MCP servers wrapping the NPM API (like
[nginx-proxy-manager-mcp](https://github.com/b3nw/nginx-proxy-manager-mcp)) can
expose log retrieval to AI assistants for debugging proxy issues conversationally.
2. **Quick Debugging** — When a reverse proxy returns errors, operators need to quickly
check the access and error logs for that specific host. Today this requires SSH/exec
access to the container.
3. **Monitoring Dashboards** — Custom dashboards can poll the log endpoint for
traffic summaries without deploying a full log aggregation stack.
4. **Automation** — CI/CD pipelines and health-check scripts can verify that traffic
is flowing correctly to newly deployed services behind NPM.
### Why Not External Log Aggregation?
External solutions (Loki, ELK, Fluentd) are powerful but heavy. Many NPM users run
single-host homelab setups where a full log pipeline is disproportionate to the need.
A built-in API covers 80% of use cases with zero additional infrastructure.
## Alternatives Considered
| Approach | Pros | Cons |
|---------------------------|-----------------------------|-------------------------------------------------|
| **API endpoint (proposed)** | Native, zero extra infra | Requires upstream PR |
| Volume mount + file read | Works today, no NPM changes | Tight coupling, no access control, not portable |
| Docker exec | Works today | Requires Docker socket, security risk |
| Sidecar log server | Decoupled | Extra container, extra config, extra maintenance |
## Scope
### In Scope (v1)
- `GET /api/nginx/proxy-hosts/{id}/logs` with `type`, `lines`, `search` params
- Permission checks consistent with existing proxy host access model
- Current (unrotated) log file only
### Out of Scope (Future)
- Log streaming via WebSocket or SSE
- Rotated log archive access (`.gz` files)
- Log summary/analytics endpoint
- Redirection host, dead host, and stream logs (same pattern, easy to add later)
- Log download as file attachment
- Log retention configuration via API
-5
View File
@@ -8,11 +8,6 @@ NPM_MCP_HOST=0.0.0.0
NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # stdio or http
# Log Access (optional)
# Mount NPM's /data/logs directory and set this to the mount path.
# Enables the get_proxy_host_logs tool for reading nginx access/error logs.
# NPM_LOG_DIR=/data/npm-logs
# Proxy Host Creation Defaults (JSON)
# Set default values for create_proxy_host tool parameters
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "npm-mcp"
version = "0.0.2"
version = "0.1.0"
description = "MCP server for Nginx Proxy Manager"
readme = "README.md"
requires-python = ">=3.11"
-128
View File
@@ -2,7 +2,6 @@
import logging
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
@@ -324,130 +323,3 @@ class NpmClient:
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
return ProxyHost(**response.json())
async def update_proxy_host(
self,
host_id: int,
**kwargs,
) -> ProxyHost:
"""Update an existing proxy host.
Args:
host_id: The proxy host ID to update
**kwargs: Fields to update (same as create_proxy_host)
Returns:
Updated ProxyHost object
"""
# Get existing host to merge with updates
existing = await self.get_proxy_host(host_id)
payload = {
"domain_names": existing.domain_names,
"forward_host": existing.forward_host,
"forward_port": existing.forward_port,
"forward_scheme": existing.forward_scheme,
"certificate_id": existing.certificate_id or 0,
"ssl_forced": existing.ssl_forced,
"hsts_enabled": existing.hsts_enabled,
"hsts_subdomains": existing.hsts_subdomains,
"http2_support": existing.http2_support,
"block_exploits": existing.block_exploits,
"caching_enabled": existing.caching_enabled,
"allow_websocket_upgrade": existing.allow_websocket_upgrade,
"access_list_id": existing.access_list_id,
"advanced_config": existing.advanced_config,
"meta": existing.meta,
}
payload.update({k: v for k, v in kwargs.items() if v is not None})
response = await self._request("PUT", f"/nginx/proxy-hosts/{host_id}", json=payload)
return ProxyHost(**response.json())
async def create_certificate(
self,
domain_names: list[str],
email: str,
provider: str = "letsencrypt",
dns_challenge: bool = False,
) -> Certificate:
"""Create/provision a new SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
provider: Certificate provider (default: "letsencrypt")
dns_challenge: Use DNS challenge instead of HTTP (default: False)
Returns:
Created Certificate object
"""
payload = {
"domain_names": domain_names,
"meta": {
"letsencrypt_email": email,
"letsencrypt_agree": True,
"dns_challenge": dns_challenge,
},
"provider": provider,
}
response = await self._request("POST", "/nginx/certificates", json=payload)
return Certificate(**response.json())
async def get_proxy_host_logs(
self,
host_id: int,
log_type: str = "access",
lines: int = 100,
) -> dict[str, Any]:
"""Retrieve proxy host logs via the API.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return.
"""
response = await self._request(
"GET",
f"/nginx/proxy-hosts/{host_id}/logs",
params={"type": log_type, "limit": lines},
)
return response.json()
async def get_proxy_host_logs_summary(self, host_id: int) -> dict[str, Any]:
"""Retrieve proxy host logs summary via the API.
Args:
host_id: NPM proxy host ID.
"""
response = await self._request("GET", f"/nginx/proxy-hosts/{host_id}/logs/summary")
return response.json()
async def create_access_list(
self,
name: str,
satisfy_any: bool = False,
pass_auth: bool = False,
items: list[dict[str, Any]] | None = None,
clients: list[dict[str, Any]] | None = None,
) -> AccessList:
"""Create a new access list on the server.
Args:
name: Name of the access list
satisfy_any: Satisfy any HTTP auth or IP restriction
pass_auth: Pass auth headers to host
items: Auth entries (username/password) and IP restrictions
clients: IP restriction clients
"""
payload = {
"name": name,
"satisfy_any": satisfy_any,
"pass_auth": pass_auth,
"items": items or [],
"clients": clients or [],
}
response = await self._request("POST", "/nginx/access-lists", json=payload)
return AccessList(**response.json())
-28
View File
@@ -37,39 +37,11 @@ class Settings(BaseSettings):
identity: str = ""
secret: str = ""
# Multi-Server Configuration
# Example JSON:
# '[{"name": "prod", "url": "http://10.0.0.10:81/api",
# "identity": "admin@example.com", "secret": "pwd"}]'
servers: list[dict[str, Any]] = []
default_server: str | None = None
@field_validator("servers", mode="before")
@classmethod
def parse_servers(cls, v: Any) -> list[dict[str, Any]]:
"""Parse JSON string to list of dicts, or pass through if already list."""
if isinstance(v, list):
return v
if isinstance(v, str) and v.strip():
import json
try:
data = json.loads(v)
if not isinstance(data, list):
raise ValueError("NPM_SERVERS must be a list of objects")
return data
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in NPM_SERVERS: {e}") from e
return []
# MCP Server Configuration
mcp_host: str = "0.0.0.0"
mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http"
# Path to NPM log directory (mount NPM's /data/logs here)
log_dir: str = ""
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}
-6
View File
@@ -31,9 +31,3 @@ class NpmApiError(NpmClientError):
def __init__(self, message: str, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class NpmLogError(NpmClientError):
"""Raised when log file operations fail."""
pass
-118
View File
@@ -1,118 +0,0 @@
"""Log file reader for Nginx Proxy Manager proxy host logs."""
from __future__ import annotations
import re
from pathlib import Path
from .config import settings
from .exceptions import NpmLogError
LOG_FILE_PATTERN = re.compile(r"^proxy-host-(\d+)_(access|error)\.log$")
MAX_LINES = 500
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB safety cap
def _get_log_dir() -> Path:
"""Resolve and validate the configured log directory."""
log_dir = settings.log_dir
if not log_dir:
raise NpmLogError(
"NPM_LOG_DIR is not configured. Mount NPM's /data/logs volume "
"and set NPM_LOG_DIR to the mount path. See README for details."
)
path = Path(log_dir)
if not path.is_dir():
raise NpmLogError(f"Log directory does not exist: {log_dir}")
return path
def _log_file_path(host_id: int, log_type: str) -> Path:
if log_type not in ("access", "error"):
raise NpmLogError(f"Invalid log type: {log_type!r} (must be 'access' or 'error')")
log_dir = _get_log_dir()
return log_dir / f"proxy-host-{host_id}_{log_type}.log"
def read_log_lines(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> dict:
"""Read the last N lines from a proxy host log file.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return (capped at MAX_LINES).
search: Optional substring filter applied to each line.
Returns:
Dict with host_id, log_type, file name, line count, and the lines themselves.
"""
lines = max(1, min(lines, MAX_LINES))
log_path = _log_file_path(host_id, log_type)
if not log_path.is_file():
raise NpmLogError(
f"Log file not found: {log_path.name}. "
"The proxy host may not have received any traffic yet, "
"or the log directory mount is incorrect."
)
file_size = log_path.stat().st_size
if file_size > MAX_FILE_SIZE:
raise NpmLogError(
f"Log file is too large ({file_size / 1024 / 1024:.1f} MB). "
"Consider using an external log aggregation tool."
)
all_lines = log_path.read_text(errors="replace").splitlines()
total_lines = len(all_lines)
if search:
search_lower = search.lower()
all_lines = [line for line in all_lines if search_lower in line.lower()]
tail = all_lines[-lines:]
return {
"host_id": host_id,
"log_type": log_type,
"file": log_path.name,
"total_lines_in_file": total_lines if not search else None,
"matched_lines": len(all_lines) if search else None,
"returned_lines": len(tail),
"lines": tail,
}
def is_log_dir_configured() -> bool:
"""Check whether the log directory is configured and accessible."""
if not settings.log_dir:
return False
return Path(settings.log_dir).is_dir()
def list_available_logs() -> list[dict]:
"""List all proxy-host log files present in the log directory.
Returns:
List of dicts with host_id, log_type, file name, and size.
"""
log_dir = _get_log_dir()
results = []
for entry in sorted(log_dir.iterdir()):
match = LOG_FILE_PATTERN.match(entry.name)
if match and entry.is_file():
results.append(
{
"host_id": int(match.group(1)),
"log_type": match.group(2),
"file": entry.name,
"size_bytes": entry.stat().st_size,
}
)
return results
+16 -16
View File
@@ -23,15 +23,15 @@ class UserMeta(BaseModel):
class Owner(BaseModel):
"""Proxy host owner information."""
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
is_disabled: bool = False
email: str | None = None
name: str = ""
nickname: str = ""
avatar: str = ""
roles: list[str] = Field(default_factory=list)
id: int
created_on: datetime
modified_on: datetime
is_disabled: bool
email: str
name: str
nickname: str
avatar: str
roles: list[str]
class AccessList(BaseModel):
@@ -49,13 +49,13 @@ class AccessList(BaseModel):
class Certificate(BaseModel):
"""SSL Certificate information."""
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
owner_user_id: int | None = None
provider: str = ""
nice_name: str = ""
domain_names: list[str] = Field(default_factory=list)
id: int
created_on: datetime
modified_on: datetime
owner_user_id: int
provider: str
nice_name: str
domain_names: list[str]
expires_on: datetime | None = None
meta: dict[str, Any] = Field(default_factory=dict)
+28 -670
View File
@@ -9,114 +9,34 @@ from mcp.server.fastmcp import FastMCP
from .client import NpmClient
from .config import settings
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError, NpmLogError
from .logs import is_log_dir_configured, list_available_logs, read_log_lines
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError
logger = logging.getLogger(__name__)
class ServerRegistry:
"""Manages multiple NpmClient connections."""
def __init__(self, configs: list[dict[str, Any]], default: str | None = None):
self._clients: dict[str, NpmClient] = {}
self._default = default
# Register multi-server entries
for cfg in configs:
name = cfg.get("name")
url = cfg.get("url") or cfg.get("api_url")
identity = cfg.get("identity")
secret = cfg.get("secret")
if not all([name, url, identity, secret]):
logger.warning(f"Server '{name}' is missing required fields, skipping")
continue
self._clients[name] = NpmClient(base_url=url, identity=identity, secret=secret)
# Fallback to single-server settings if registry is empty
if not self._clients and settings.api_url and settings.identity and settings.secret:
logger.info("No servers in NPM_SERVERS. Using single-server environment variables.")
self._clients["default"] = NpmClient(
base_url=settings.api_url,
identity=settings.identity,
secret=settings.secret
)
if not self._default:
self._default = "default"
# Validate default
if self._default and self._default not in self._clients:
logger.warning(
f"Default server '{self._default}' is not in configured servers. "
"Clearing default."
)
self._default = None
def get(self, name: str | None = None) -> NpmClient:
"""Retrieve client by name. Fallback to default if name is None/empty."""
if not self._clients:
raise KeyError("No NPM servers configured.")
if name is None or name == "":
if self._default:
name = self._default
elif len(self._clients) == 1:
name = next(iter(self._clients.keys()))
else:
raise KeyError("Multiple servers configured but no default server specified.")
if name not in self._clients:
raise KeyError(
f"Server '{name}' not found. "
f"Configured servers: {list(self._clients.keys())}"
)
return self._clients[name]
def list_names(self) -> list[str]:
return list(self._clients.keys())
def get_default(self) -> str | None:
return self._default
async def close_all(self) -> None:
for client in self._clients.values():
await client.close()
# Global registry
registry: ServerRegistry | None = None
def get_registry() -> ServerRegistry:
"""Get or create the global ServerRegistry instance."""
global registry
if registry is None:
registry = ServerRegistry(settings.servers, settings.default_server)
return registry
def _get_client(server: str | None = None) -> NpmClient:
"""Retrieve NPM client for the specified server, or fallback to default."""
return get_registry().get(server)
# Create global client instance (lazy initialization)
_client: NpmClient | None = None
def get_client() -> NpmClient:
"""Get or create the NPM client instance (backward compatibility)."""
return _get_client(None)
"""Get or create the NPM client instance."""
global _client
if _client is None:
_client = NpmClient()
return _client
@asynccontextmanager
async def lifespan(server: FastMCP):
"""Manage client lifecycle."""
global registry
registry = ServerRegistry(settings.servers, settings.default_server)
logger.info(f"NPM MCP Server starting. Configured servers: {registry.list_names()}")
global _client
_client = NpmClient()
logger.info(f"NPM MCP Server starting, connecting to {settings.api_url}")
try:
yield
finally:
if registry:
await registry.close_all()
registry = None
if _client:
await _client.close()
_client = None
logger.info("NPM MCP Server stopped")
@@ -132,34 +52,29 @@ mcp = FastMCP(
def _format_error(e: Exception) -> str:
"""Format exception for tool response."""
if isinstance(e, KeyError):
return f"Configuration error: {e.args[0]}"
elif isinstance(e, NpmAuthenticationError):
if isinstance(e, NpmAuthenticationError):
return f"Authentication failed: {e}"
elif isinstance(e, NpmConnectionError):
return f"Connection error: {e}"
elif isinstance(e, NpmLogError):
return f"Log error: {e}"
elif isinstance(e, NpmApiError):
return f"API error: {e}"
return f"Error: {e}"
# =============================================================================
# Tools
# =============================================================================
@mcp.tool()
async def list_proxy_hosts(server: str | None = None) -> str:
async def list_proxy_hosts() -> str:
"""List all proxy hosts configured in Nginx Proxy Manager.
Returns a summary of all proxy hosts including their domains,
forward destinations, and SSL status.
"""
try:
client = _get_client(server)
client = get_client()
hosts = await client.get_proxy_hosts()
if not hosts:
@@ -183,18 +98,17 @@ async def list_proxy_hosts(server: str | None = None) -> str:
@mcp.tool()
async def get_proxy_host_details(host_id: int, server: str | None = None) -> str:
async def get_proxy_host_details(host_id: int) -> str:
"""Get detailed configuration for a specific proxy host.
Args:
host_id: The ID of the proxy host to retrieve
server: Target server name
Returns full configuration including SSL settings, locations,
and advanced configuration.
"""
try:
client = _get_client(server)
client = get_client()
host = await client.get_proxy_host(host_id)
details: dict[str, Any] = {
@@ -247,13 +161,13 @@ async def get_proxy_host_details(host_id: int, server: str | None = None) -> str
@mcp.tool()
async def get_system_health(server: str | None = None) -> str:
async def get_system_health() -> str:
"""Check the health and status of the Nginx Proxy Manager instance.
Returns system status, version information, and connectivity status.
"""
try:
client = _get_client(server)
client = get_client()
status = await client.get_status()
result = [f"Status: {status.status}"]
@@ -275,14 +189,6 @@ async def get_system_health(server: str | None = None) -> str:
except NpmAuthenticationError:
result.append("Authenticated: ❌ (check credentials)")
if is_log_dir_configured():
logs = list_available_logs()
result.append(f"Log directory: ✅ ({len(logs)} log files found)")
else:
result.append(
"Log directory: ❌ (not configured — set NPM_LOG_DIR to enable get_proxy_host_logs)"
)
return "\n".join(result)
except Exception as e:
@@ -290,18 +196,17 @@ async def get_system_health(server: str | None = None) -> str:
@mcp.tool()
async def search_audit_logs(limit: int = 50, offset: int = 0, server: str | None = None) -> str:
async def search_audit_logs(limit: int = 50, offset: int = 0) -> str:
"""Search the audit log for recent actions in Nginx Proxy Manager.
Args:
limit: Maximum number of entries to return (default: 50, max: 100)
offset: Number of entries to skip for pagination (default: 0)
server: Target server name
Returns recent audit log entries showing user actions and changes.
"""
try:
client = _get_client(server)
client = get_client()
limit = min(limit, 100) # Cap at 100
entries = await client.get_audit_log(limit=limit, offset=offset)
@@ -324,14 +229,14 @@ async def search_audit_logs(limit: int = 50, offset: int = 0, server: str | None
@mcp.tool()
async def list_certificates(server: str | None = None) -> str:
async def list_certificates() -> str:
"""List all SSL certificates managed by Nginx Proxy Manager.
Returns a summary of all certificates including their domains,
provider, and expiration dates.
"""
try:
client = _get_client(server)
client = get_client()
certs = await client.get_certificates()
if not certs:
@@ -358,14 +263,14 @@ async def list_certificates(server: str | None = None) -> str:
@mcp.tool()
async def list_access_lists(server: str | None = None) -> str:
async def list_access_lists() -> str:
"""List all access lists configured in Nginx Proxy Manager.
Returns a summary of all access lists including their IDs and names.
Use these IDs when creating proxy hosts that require access control.
"""
try:
client = _get_client(server)
client = get_client()
access_lists = await client.get_access_lists()
if not access_lists:
@@ -393,7 +298,6 @@ async def create_proxy_host(
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
server: str | None = None,
) -> str:
"""Create a new proxy host in Nginx Proxy Manager.
@@ -410,7 +314,6 @@ async def create_proxy_host(
access_list_id: Access list ID for authentication. Use list_access_lists to find.
Use 0 for no access restrictions. (default from config)
advanced_config: Custom nginx configuration block (default from config)
server: Target server name
Returns:
Details of the created proxy host including the new host ID.
@@ -431,7 +334,7 @@ async def create_proxy_host(
# Get defaults from config, then override with provided values
defaults = settings.get_proxy_defaults()
client = _get_client(server)
client = get_client()
host = await client.create_proxy_host(
domain_names=domain_names,
forward_host=forward_host,
@@ -473,548 +376,3 @@ async def create_proxy_host(
except Exception as e:
return _format_error(e)
@mcp.tool()
async def update_proxy_host(
host_id: int,
forward_host: str | None = None,
forward_port: int | None = None,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
server: str | None = None,
) -> str:
"""Update an existing proxy host in Nginx Proxy Manager.
Only provided fields will be updated; all others remain unchanged.
Args:
host_id: The ID of the proxy host to update
forward_host: Backend host/IP to forward to
forward_port: Backend port to forward to
forward_scheme: Backend protocol - "http" or "https"
certificate_id: SSL certificate ID (use list_certificates to find, 0 for none)
ssl_forced: Force HTTPS redirect
block_exploits: Enable common exploit blocking
allow_websocket_upgrade: Allow WebSocket connections
access_list_id: Access list ID (0 for no restrictions)
advanced_config: Custom nginx configuration block
server: Target server name
Returns:
Details of the updated proxy host.
"""
try:
client = _get_client(server)
kwargs = {}
if forward_host is not None:
kwargs["forward_host"] = forward_host
if forward_port is not None:
kwargs["forward_port"] = forward_port
if forward_scheme is not None:
kwargs["forward_scheme"] = forward_scheme
if certificate_id is not None:
kwargs["certificate_id"] = certificate_id
if ssl_forced is not None:
kwargs["ssl_forced"] = ssl_forced
if block_exploits is not None:
kwargs["block_exploits"] = block_exploits
if allow_websocket_upgrade is not None:
kwargs["allow_websocket_upgrade"] = allow_websocket_upgrade
if access_list_id is not None:
kwargs["access_list_id"] = access_list_id
if advanced_config is not None:
kwargs["advanced_config"] = advanced_config
host = await client.update_proxy_host(host_id, **kwargs)
domains = ", ".join(host.domain_names)
return (
f"Successfully updated proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}\n"
f"Certificate ID: {host.certificate_id}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def get_proxy_host_logs(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
server: str | None = None,
) -> str:
"""Retrieve recent nginx log entries for a specific proxy host.
Reads the raw nginx access or error log file for the given host or retrieves
them via the API.
Args:
host_id: The ID of the proxy host (use list_proxy_hosts to find IDs)
log_type: Log type - "access" for HTTP traffic or "error"
for nginx errors (default: "access")
lines: Number of most recent lines to return
(default: 100, max: 500)
search: Optional filter string - only lines containing this
text are returned (case-insensitive)
server: Target server name
"""
try:
client = _get_client(server)
# Try retrieving logs via the API first
try:
log_data = await client.get_proxy_host_logs(
host_id=host_id,
log_type=log_type,
lines=lines,
)
raw_lines = log_data.get("lines", [])
if search:
search_lower = search.lower()
filtered_lines = [
line for line in raw_lines if search_lower in line.lower()
]
else:
filtered_lines = raw_lines
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log (retrieved via API)",
f"Showing last {len(filtered_lines)} lines:",
]
header = "\n".join(header_parts)
if not filtered_lines:
return f"{header}\n\n(no log entries found)"
return f"{header}\n\n" + "\n".join(filtered_lines)
except Exception as api_err:
# Fallback for default server or if local logs are mounted
reg = get_registry()
is_default = False
try:
target_client = reg.get(server)
default_client = reg.get(None)
if target_client.base_url == default_client.base_url:
is_default = True
except Exception:
pass
if is_default and is_log_dir_configured():
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
result = read_log_lines(
host_id=host_id,
log_type=log_type,
lines=lines,
search=search,
)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log (local fallback)",
f"File: {result['file']}",
]
if result["total_lines_in_file"] is not None:
header_parts.append(f"Total lines in file: {result['total_lines_in_file']}")
if result["matched_lines"] is not None:
header_parts.append(f"Lines matching '{search}': {result['matched_lines']}")
header_parts.append(f"Showing last {result['returned_lines']} lines:")
header = "\n".join(header_parts)
if not result["lines"]:
return f"{header}\n\n(no log entries found)"
return f"{header}\n\n" + "\n".join(result["lines"])
else:
raise api_err
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_certificate(
domain_names: list[str],
email: str,
dns_challenge: bool = False,
server: str | None = None,
) -> str:
"""Provision a new Let's Encrypt SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
dns_challenge: Use DNS challenge instead of HTTP (default: False)
server: Target server name
"""
try:
client = _get_client(server)
cert = await client.create_certificate(
domain_names=domain_names,
email=email,
dns_challenge=dns_challenge,
)
domains = ", ".join(cert.domain_names)
expiry = cert.expires_on.strftime("%Y-%m-%d") if cert.expires_on else "N/A"
return (
f"Successfully created certificate!\n\n"
f"ID: {cert.id}\n"
f"Provider: {cert.provider}\n"
f"Domains: {domains}\n"
f"Expires: {expiry}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def list_servers() -> str:
"""List all configured NPM servers and their health/connectivity status.
Returns a JSON string containing the list of registered servers,
the default server, and their health status.
"""
try:
reg = get_registry()
server_names = reg.list_names()
default_server = reg.get_default()
health_status = {}
for name in server_names:
try:
client = reg.get(name)
status = await client.get_status()
health_status[name] = {"status": status.status, "version": status.version}
except Exception as e:
health_status[name] = {"status": "error", "error": str(e)}
return json.dumps({
"servers": server_names,
"default_server": default_server,
"health": health_status
}, indent=2)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def clone_proxy_host(
source_server: str,
target_server: str,
host_id: int,
override_settings: dict[str, Any] | None = None,
) -> str:
"""Clone a proxy host configuration from source_server to target_server.
Resolves certificate and access list IDs automatically by matching names/domains.
Args:
source_server: Name of the source server.
target_server: Name of the destination server.
host_id: The ID of the host on the source server.
override_settings: Optional dict of settings to override during creation.
"""
try:
source_client = _get_client(source_server)
target_client = _get_client(target_server)
# Retrieve host configuration from source server
host = await source_client.get_proxy_host(host_id)
# Resolve certificate ID
target_cert_id = 0
cert_resolved_msg = "None"
if host.certificate_id and host.certificate_id > 0:
try:
source_cert = await source_client.get_certificate(host.certificate_id)
target_certs = await target_client.get_certificates()
matched_cert = None
for cert in target_certs:
if (
source_cert.nice_name
and cert.nice_name
and source_cert.nice_name == cert.nice_name
):
matched_cert = cert
break
if (
source_cert.domain_names
and cert.domain_names
and set(source_cert.domain_names) == set(cert.domain_names)
):
matched_cert = cert
break
if matched_cert:
target_cert_id = matched_cert.id
cert_name = matched_cert.nice_name or ", ".join(matched_cert.domain_names)
cert_resolved_msg = f"Resolved to ID {target_cert_id} ({cert_name})"
else:
source_name = source_cert.nice_name or ", ".join(source_cert.domain_names)
cert_resolved_msg = (
f"Could not resolve source certificate '{source_name}' "
"on target server. Defaulting to None (0)."
)
except Exception as e:
cert_resolved_msg = f"Error resolving certificate: {e}. Defaulting to None (0)."
# Resolve access list ID
target_access_list_id = 0
access_list_resolved_msg = "None"
if host.access_list_id and host.access_list_id > 0:
try:
source_alists = await source_client.get_access_lists()
source_alist = next(
(al for al in source_alists if al.id == host.access_list_id), None
)
if source_alist:
target_alists = await target_client.get_access_lists()
matched_alist = next(
(al for al in target_alists if al.name == source_alist.name), None
)
if matched_alist:
target_access_list_id = matched_alist.id
access_list_resolved_msg = (
f"Resolved to ID {target_access_list_id} ({matched_alist.name})"
)
else:
access_list_resolved_msg = (
f"Could not resolve source access list '{source_alist.name}' "
"on target server. Defaulting to None (0)."
)
else:
access_list_resolved_msg = (
"Source access list not found. Defaulting to None (0)."
)
except Exception as e:
access_list_resolved_msg = (
f"Error resolving access list: {e}. Defaulting to None (0)."
)
# Construct creation payload
payload = {
"domain_names": host.domain_names,
"forward_host": host.forward_host,
"forward_port": host.forward_port,
"forward_scheme": host.forward_scheme,
"ssl_forced": host.ssl_forced,
"hsts_enabled": host.hsts_enabled,
"hsts_subdomains": host.hsts_subdomains,
"http2_support": host.http2_support,
"block_exploits": host.block_exploits,
"caching_enabled": host.caching_enabled,
"allow_websocket_upgrade": host.allow_websocket_upgrade,
"advanced_config": host.advanced_config,
"meta": host.meta,
"certificate_id": target_cert_id,
"access_list_id": target_access_list_id,
}
# Apply overrides
if override_settings:
payload.update(override_settings)
# Create proxy host on target server
new_host = await target_client.create_proxy_host(
domain_names=payload["domain_names"],
forward_host=payload["forward_host"],
forward_port=payload["forward_port"],
forward_scheme=payload["forward_scheme"],
certificate_id=payload["certificate_id"],
ssl_forced=payload["ssl_forced"],
hsts_enabled=payload["hsts_enabled"],
hsts_subdomains=payload["hsts_subdomains"],
http2_support=payload["http2_support"],
block_exploits=payload["block_exploits"],
caching_enabled=payload["caching_enabled"],
allow_websocket_upgrade=payload["allow_websocket_upgrade"],
access_list_id=payload["access_list_id"],
advanced_config=payload["advanced_config"],
meta=payload["meta"],
)
return (
f"Successfully cloned proxy host from '{source_server}' to '{target_server}'!\n\n"
f"Source Host ID: {host_id}\n"
f"Target Host ID: {new_host.id}\n"
f"Domains: {', '.join(new_host.domain_names)}\n"
f"Certificate: {cert_resolved_msg}\n"
f"Access List: {access_list_resolved_msg}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def sync_access_lists(source_server: str, target_server: str) -> str:
"""Sync access lists from source_server to target_server.
Replicates missing access lists by name, carrying over credentials and IP rules.
Args:
source_server: Name of the source server.
target_server: Name of the target server.
"""
try:
source_client = _get_client(source_server)
target_client = _get_client(target_server)
# Get raw access lists from both servers to retrieve detailed items and clients
source_response = await source_client._request("GET", "/nginx/access-lists")
source_lists = source_response.json()
target_response = await target_client._request("GET", "/nginx/access-lists")
target_lists = target_response.json()
target_names = {al["name"] for al in target_lists}
synced = []
skipped = []
for al in source_lists:
name = al.get("name")
if not name:
continue
if name in target_names:
skipped.append(f"'{name}' (already exists)")
continue
# Strip database IDs and unique primary keys from items & clients to avoid conflicts
items = al.get("items", [])
cleaned_items = []
for item in items:
cleaned = item.copy()
cleaned.pop("id", None)
cleaned.pop("access_list_id", None)
cleaned.pop("created_on", None)
cleaned.pop("modified_on", None)
cleaned_items.append(cleaned)
clients = al.get("clients", [])
cleaned_clients = []
for client in clients:
cleaned = client.copy()
cleaned.pop("id", None)
cleaned.pop("access_list_id", None)
cleaned.pop("created_on", None)
cleaned.pop("modified_on", None)
cleaned_clients.append(cleaned)
# Replicate access list
await target_client.create_access_list(
name=name,
satisfy_any=al.get("satisfy_any", False),
pass_auth=al.get("pass_auth", False),
items=cleaned_items,
clients=cleaned_clients,
)
synced.append(name)
result_parts = [f"Synced access lists from '{source_server}' to '{target_server}':"]
if synced:
result_parts.append(f"✅ Created: {', '.join(synced)}")
else:
result_parts.append("No new access lists were created.")
if skipped:
result_parts.append(f"️ Matched (exists): {', '.join(skipped)}")
return "\n".join(result_parts)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def sync_certificates(source_server: str, target_server: str) -> str:
"""Sync Let's Encrypt certificates from source_server to target_server.
Matches existing certificates on the target server by domain names.
Args:
source_server: Name of the source server.
target_server: Name of the target server.
"""
try:
source_client = _get_client(source_server)
target_client = _get_client(target_server)
source_certs = await source_client.get_certificates()
target_certs = await target_client.get_certificates()
# Build target domain map for lookup
target_domains_map = {frozenset(cert.domain_names): cert for cert in target_certs}
synced = []
skipped_exists = []
skipped_custom = []
for cert in source_certs:
domains = cert.domain_names
if not domains:
continue
cert_domains_set = frozenset(domains)
# Check if matching cert exists on target
if cert_domains_set in target_domains_map:
skipped_exists.append(f"'{cert.nice_name or ', '.join(domains)}'")
continue
# Check provider type
if cert.provider != "letsencrypt":
skipped_custom.append(
f"'{cert.nice_name or ', '.join(domains)}' "
f"(custom provider: {cert.provider})"
)
continue
# Re-provision Let's Encrypt certificate
email = (
cert.meta.get("letsencrypt_email")
or cert.meta.get("email")
or settings.identity
or "admin@example.com"
)
dns_challenge = cert.meta.get("dns_challenge", False)
await target_client.create_certificate(
domain_names=domains,
email=email,
dns_challenge=dns_challenge,
)
synced.append(f"'{', '.join(domains)}'")
result_parts = [
f"Synced Let's Encrypt certificates from '{source_server}' "
f"to '{target_server}':"
]
if synced:
result_parts.append(f"✅ Provisioned: {', '.join(synced)}")
else:
result_parts.append("No new certificates were provisioned.")
if skipped_exists:
result_parts.append(f"️ Matched (exists): {', '.join(skipped_exists)}")
if skipped_custom:
result_parts.append(f"⚠️ Skipped (manual upload required): {', '.join(skipped_custom)}")
return "\n".join(result_parts)
except Exception as e:
return _format_error(e)
+2 -87
View File
@@ -1,9 +1,10 @@
"""Tests for NpmClient."""
import pytest
from httpx import Response
from npm_mcp.client import NpmClient
from npm_mcp.exceptions import NpmAuthenticationError
from npm_mcp.exceptions import NpmAuthenticationError, NpmConnectionError
@pytest.fixture
@@ -226,89 +227,3 @@ class TestNpmClientEndpoints:
assert host.forward_port == 3000
assert host.ssl_forced is True
assert host.certificate_id == 24
@pytest.mark.asyncio
async def test_create_access_list(self, httpx_mock, mock_token_response):
"""Test creating an access list."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/nginx/access-lists",
json={
"id": 5,
"created_on": "2024-01-01T00:00:00Z",
"modified_on": "2024-01-01T00:00:00Z",
"owner_user_id": 1,
"name": "Custom List",
"satisfy_any": True,
"pass_auth": False,
},
status_code=201,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
al = await client.create_access_list(
name="Custom List",
satisfy_any=True,
pass_auth=False,
items=[{"username": "u", "password": "p"}],
clients=[{"address": "1.1.1.1", "directive": "allow"}],
)
assert al.id == 5
assert al.name == "Custom List"
assert al.satisfy_any is True
assert al.pass_auth is False
@pytest.mark.asyncio
async def test_get_proxy_host_logs(self, httpx_mock, mock_token_response):
"""Test fetching proxy host logs via API."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/proxy-hosts/42/logs?type=access&limit=50",
json={"lines": ["line 1", "line 2"]},
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
logs = await client.get_proxy_host_logs(host_id=42, log_type="access", lines=50)
assert logs == {"lines": ["line 1", "line 2"]}
@pytest.mark.asyncio
async def test_get_proxy_host_logs_summary(self, httpx_mock, mock_token_response):
"""Test fetching proxy host logs summary via API."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/proxy-hosts/42/logs/summary",
json={"access": 100, "error": 5},
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
summary = await client.get_proxy_host_logs_summary(host_id=42)
assert summary == {"access": 100, "error": 5}
-38
View File
@@ -89,41 +89,3 @@ class TestProxyDefaults:
# Other defaults preserved
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True
class TestMultiServerConfig:
"""Test multi-server configuration parsing."""
def test_servers_empty_by_default(self):
"""Test that servers is empty by default."""
s = Settings(identity="test", secret="test")
assert s.servers == []
assert s.default_server is None
def test_servers_json_parsing(self, monkeypatch):
"""Test parsing servers JSON list from environment variable."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv(
"NPM_SERVERS",
'[{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"}, '
'{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"}]'
)
monkeypatch.setenv("NPM_DEFAULT_SERVER", "prod")
s = Settings()
assert len(s.servers) == 2
assert s.servers[0]["name"] == "prod"
assert s.servers[0]["url"] == "http://prod:81/api"
assert s.servers[1]["name"] == "dev"
assert s.default_server == "prod"
def test_servers_invalid_json_raises(self, monkeypatch):
"""Test that invalid servers JSON raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_SERVERS", "{not valid}")
with pytest.raises(SettingsError):
Settings()
-146
View File
@@ -1,146 +0,0 @@
"""Tests for the log reader module."""
import pytest
from npm_mcp.exceptions import NpmLogError
from npm_mcp.logs import (
is_log_dir_configured,
list_available_logs,
read_log_lines,
)
SAMPLE_ACCESS_LOG = (
'[22/May/2025:10:00:01 +0000] - 200 200 - GET https app.example.com'
' "/" [Client 10.0.0.1] [Length 1542] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:02 +0000] - 301 301 - GET http app.example.com'
' "/old-path" [Client 10.0.0.2] [Length 0] "curl/7.88" "-"\n'
'[22/May/2025:10:00:03 +0000] - 404 404 - GET https app.example.com'
' "/missing" [Client 10.0.0.1] [Length 548] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:04 +0000] - 200 200 - POST https app.example.com'
' "/api/data" [Client 10.0.0.3] [Length 256] "python-requests/2.31" "-"\n'
'[22/May/2025:10:00:05 +0000] - 502 502 - GET https app.example.com'
' "/health" [Client 10.0.0.1] [Length 166] "kube-probe/1.28" "-"\n'
)
SAMPLE_ERROR_LOG = (
"2025/05/22 10:00:05 [error] 42#42: *123 connect() failed"
" (111: Connection refused) while connecting to upstream,"
" client: 10.0.0.1, server: app.example.com\n"
)
@pytest.fixture
def log_dir(tmp_path, monkeypatch):
"""Create a temp log directory with sample log files."""
logs = tmp_path / "logs"
logs.mkdir()
(logs / "proxy-host-5_access.log").write_text(SAMPLE_ACCESS_LOG)
(logs / "proxy-host-5_error.log").write_text(SAMPLE_ERROR_LOG)
(logs / "proxy-host-12_access.log").write_text("")
(logs / "fallback_error.log").write_text("global error\n")
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", str(logs))
return logs
@pytest.fixture
def no_log_dir(monkeypatch):
"""Ensure log_dir is unconfigured."""
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "")
class TestReadLogLines:
def test_read_access_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="access")
assert result["host_id"] == 5
assert result["log_type"] == "access"
assert result["file"] == "proxy-host-5_access.log"
assert result["returned_lines"] == 5
assert result["total_lines_in_file"] == 5
assert "app.example.com" in result["lines"][0]
def test_read_error_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="error")
assert result["log_type"] == "error"
assert result["returned_lines"] == 1
assert "Connection refused" in result["lines"][0]
def test_lines_limit(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=2)
assert result["returned_lines"] == 2
assert "10:00:04" in result["lines"][0]
assert "10:00:05" in result["lines"][1]
def test_lines_capped_at_max(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=9999)
assert result["returned_lines"] == 5
def test_search_filter(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="404")
assert result["returned_lines"] == 1
assert result["matched_lines"] == 1
assert result["total_lines_in_file"] is None
assert "/missing" in result["lines"][0]
def test_search_case_insensitive(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="MOZILLA")
assert result["returned_lines"] == 2
def test_search_by_ip(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="10.0.0.1")
assert result["returned_lines"] == 3
def test_nonexistent_host(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=999, log_type="access")
def test_empty_log_file(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=12, log_type="error")
def test_invalid_log_type(self, log_dir):
with pytest.raises(NpmLogError, match="Invalid log type"):
read_log_lines(host_id=5, log_type="combined")
def test_no_log_dir_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
read_log_lines(host_id=5, log_type="access")
def test_nonexistent_log_dir(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent/path")
with pytest.raises(NpmLogError, match="does not exist"):
read_log_lines(host_id=5, log_type="access")
class TestIsLogDirConfigured:
def test_configured(self, log_dir):
assert is_log_dir_configured() is True
def test_not_configured(self, no_log_dir):
assert is_log_dir_configured() is False
def test_configured_but_missing(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent")
assert is_log_dir_configured() is False
class TestListAvailableLogs:
def test_lists_proxy_host_logs_only(self, log_dir):
results = list_available_logs()
files = {r["file"] for r in results}
assert "proxy-host-5_access.log" in files
assert "proxy-host-5_error.log" in files
assert "proxy-host-12_access.log" in files
assert "fallback_error.log" not in files
def test_correct_metadata(self, log_dir):
results = list_available_logs()
access_5 = next(r for r in results if r["file"] == "proxy-host-5_access.log")
assert access_5["host_id"] == 5
assert access_5["log_type"] == "access"
assert access_5["size_bytes"] > 0
def test_not_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
list_available_logs()
-97
View File
@@ -1,97 +0,0 @@
"""Tests for ServerRegistry."""
import pytest
from npm_mcp.config import settings
from npm_mcp.server import ServerRegistry
def test_registry_fallback_to_single_server(monkeypatch):
"""Test that registry falls back to single-server settings when empty."""
monkeypatch.setattr(settings, "api_url", "http://test-url:81/api")
monkeypatch.setattr(settings, "identity", "test-user")
monkeypatch.setattr(settings, "secret", "test-pass")
registry = ServerRegistry(configs=[], default=None)
assert registry.list_names() == ["default"]
assert registry.get_default() == "default"
client = registry.get()
assert client.base_url == "http://test-url:81/api"
assert client._identity == "test-user"
def test_registry_multiple_servers():
"""Test that multiple servers are correctly registered."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"},
]
registry = ServerRegistry(configs=configs, default="prod")
assert set(registry.list_names()) == {"prod", "dev"}
assert registry.get_default() == "prod"
prod_client = registry.get("prod")
assert prod_client.base_url == "http://prod:81/api"
dev_client = registry.get("dev")
assert dev_client.base_url == "http://dev:81/api"
def test_registry_get_default_fallback():
"""Test that get() falls back to default server when name is None/empty."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"},
]
registry = ServerRegistry(configs=configs, default="dev")
# Name is None
client = registry.get(None)
assert client.base_url == "http://dev:81/api"
# Name is empty string
client_empty = registry.get("")
assert client_empty.base_url == "http://dev:81/api"
def test_registry_single_client_no_default_specified():
"""Test that get() succeeds if there is only 1 server, even if no default is specified."""
configs = [
{"name": "only-one", "url": "http://only:81/api", "identity": "o", "secret": "os"}
]
registry = ServerRegistry(configs=configs, default=None)
assert registry.get_default() is None
client = registry.get()
assert client.base_url == "http://only:81/api"
def test_registry_multiple_clients_no_default_raises():
"""Test that get() raises KeyError if multiple servers are defined but no default is set."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"},
]
registry = ServerRegistry(configs=configs, default=None)
with pytest.raises(KeyError, match="Multiple servers configured but no default server"):
registry.get()
def test_registry_invalid_name_raises():
"""Test that get() raises KeyError for non-existent server names."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
]
registry = ServerRegistry(configs=configs, default="prod")
with pytest.raises(KeyError, match="Server 'non-existent' not found"):
registry.get("non-existent")
-315
View File
@@ -1,315 +0,0 @@
"""Tests for multi-server management and sync tools."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from npm_mcp.models import AccessList, Certificate, HealthStatus, ProxyHost
from npm_mcp.server import (
clone_proxy_host,
get_proxy_host_logs,
list_servers,
sync_access_lists,
sync_certificates,
)
@pytest.fixture
def mock_registry():
"""Mock registry with prod and dev servers."""
reg = MagicMock()
reg.list_names.return_value = ["prod", "dev"]
reg.get_default.return_value = "prod"
return reg
@pytest.mark.asyncio
async def test_list_servers(mock_registry):
"""Test list_servers tool output and health query."""
client_prod = MagicMock()
client_prod.get_status = AsyncMock(
return_value=HealthStatus(status="online", version={"major": "2"})
)
client_dev = MagicMock()
client_dev.get_status = AsyncMock(side_effect=Exception("Connection failed"))
mock_registry.get.side_effect = lambda name: client_prod if name == "prod" else client_dev
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result_json = await list_servers()
result = json.loads(result_json)
assert result["servers"] == ["prod", "dev"]
assert result["default_server"] == "prod"
assert result["health"]["prod"]["status"] == "online"
assert result["health"]["dev"]["status"] == "error"
assert "Connection failed" in result["health"]["dev"]["error"]
@pytest.mark.asyncio
async def test_clone_proxy_host(mock_registry):
"""Test clone_proxy_host tool with cert and access list resolution."""
source_client = MagicMock()
target_client = MagicMock()
mock_registry.get.side_effect = lambda name: source_client if name == "prod" else target_client
# Source host setup
source_host = ProxyHost(
id=12,
created_on="2024-01-01T00:00:00Z",
modified_on="2024-01-01T00:00:00Z",
owner_user_id=1,
domain_names=["test.example.com"],
forward_host="192.168.1.50",
forward_port=8080,
forward_scheme="http",
certificate_id=10,
access_list_id=5,
ssl_forced=True,
hsts_enabled=True,
hsts_subdomains=False,
http2_support=True,
block_exploits=True,
caching_enabled=False,
allow_websocket_upgrade=True,
advanced_config="my advanced config",
meta={"key": "val"},
)
source_client.get_proxy_host = AsyncMock(return_value=source_host)
# Source dependencies setup
source_cert = Certificate(
id=10,
nice_name="wildcard-example",
domain_names=["*.example.com"],
provider="letsencrypt",
)
source_client.get_certificate = AsyncMock(return_value=source_cert)
source_alists = [
AccessList(
id=5,
name="Staging Auth",
created_on="2024-01-01T00:00:00Z",
modified_on="2024-01-01T00:00:00Z",
)
]
source_client.get_access_lists = AsyncMock(return_value=source_alists)
# Target dependency search results
target_certs = [
Certificate(
id=100,
nice_name="wildcard-example",
domain_names=["*.example.com"],
provider="letsencrypt",
)
]
target_client.get_certificates = AsyncMock(return_value=target_certs)
target_alists = [
AccessList(
id=500,
name="Staging Auth",
created_on="2024-01-02T00:00:00Z",
modified_on="2024-01-02T00:00:00Z",
)
]
target_client.get_access_lists = AsyncMock(return_value=target_alists)
# Mock creation on target
cloned_host = ProxyHost(
id=999,
created_on="2024-01-03T00:00:00Z",
modified_on="2024-01-03T00:00:00Z",
owner_user_id=1,
domain_names=["test.example.com"],
forward_host="192.168.1.50",
forward_port=8080,
)
target_client.create_proxy_host = AsyncMock(return_value=cloned_host)
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result = await clone_proxy_host(
source_server="prod",
target_server="dev",
host_id=12,
override_settings={"forward_host": "10.0.0.10"},
)
assert "Successfully cloned" in result
assert "Source Host ID: 12" in result
assert "Target Host ID: 999" in result
assert "Resolved to ID 100" in result
assert "Resolved to ID 500" in result
target_client.create_proxy_host.assert_called_once_with(
domain_names=["test.example.com"],
forward_host="10.0.0.10", # Overridden!
forward_port=8080,
forward_scheme="http",
certificate_id=100, # Resolved!
ssl_forced=True,
hsts_enabled=True,
hsts_subdomains=False,
http2_support=True,
block_exploits=True,
caching_enabled=False,
allow_websocket_upgrade=True,
access_list_id=500, # Resolved!
advanced_config="my advanced config",
meta={"key": "val"},
)
@pytest.mark.asyncio
async def test_sync_access_lists(mock_registry):
"""Test sync_access_lists replicates missing access lists with credentials/IPs."""
source_client = MagicMock()
target_client = MagicMock()
mock_registry.get.side_effect = lambda name: source_client if name == "prod" else target_client
# Source returns raw JSON including items & clients
source_mock_response = MagicMock()
source_mock_response.json.return_value = [
{
"id": 1,
"name": "Staging Auth",
"satisfy_any": False,
"pass_auth": True,
"items": [
{"id": 10, "access_list_id": 1, "username": "u", "password": "p"}
],
"clients": [
{"id": 20, "access_list_id": 1, "address": "1.1.1.1", "directive": "allow"}
],
},
{
"id": 2,
"name": "Already Synced",
"satisfy_any": True,
"pass_auth": False,
}
]
source_client._request = AsyncMock(return_value=source_mock_response)
# Target returns raw JSON showing "Already Synced" exists
target_mock_response = MagicMock()
target_mock_response.json.return_value = [{"id": 99, "name": "Already Synced"}]
target_client._request = AsyncMock(return_value=target_mock_response)
target_client.create_access_list = AsyncMock()
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result = await sync_access_lists(source_server="prod", target_server="dev")
assert "Created: Staging Auth" in result
assert "Matched (exists): 'Already Synced' (already exists)" in result
# Verify items and clients were stripped of database IDs
target_client.create_access_list.assert_called_once_with(
name="Staging Auth",
satisfy_any=False,
pass_auth=True,
items=[{"username": "u", "password": "p"}],
clients=[{"address": "1.1.1.1", "directive": "allow"}],
)
@pytest.mark.asyncio
async def test_sync_certificates(mock_registry):
"""Test sync_certificates provisions Let's Encrypt and skips custom certs."""
source_client = MagicMock()
target_client = MagicMock()
mock_registry.get.side_effect = lambda name: source_client if name == "prod" else target_client
# Source certificates
source_certs = [
Certificate(
id=1,
nice_name="le-cert",
domain_names=["le.example.com"],
provider="letsencrypt",
meta={"letsencrypt_email": "le@test.com", "dns_challenge": True},
),
Certificate(
id=2,
nice_name="custom-cert",
domain_names=["custom.example.com"],
provider="other-provider",
),
Certificate(
id=3,
nice_name="already-on-target",
domain_names=["existing.example.com"],
provider="letsencrypt",
)
]
source_client.get_certificates = AsyncMock(return_value=source_certs)
# Target certificates
target_certs = [
Certificate(
id=10,
nice_name="already-on-target",
domain_names=["existing.example.com"],
provider="letsencrypt",
)
]
target_client.get_certificates = AsyncMock(return_value=target_certs)
target_client.create_certificate = AsyncMock()
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result = await sync_certificates(source_server="prod", target_server="dev")
assert "Provisioned: 'le.example.com'" in result
assert "Matched (exists): 'already-on-target'" in result
assert "Skipped (manual upload required): 'custom-cert'" in result
target_client.create_certificate.assert_called_once_with(
domain_names=["le.example.com"],
email="le@test.com",
dns_challenge=True,
)
@pytest.mark.asyncio
async def test_get_proxy_host_logs_api(mock_registry):
"""Test get_proxy_host_logs tool queries the API for logs first."""
client = MagicMock()
mock_registry.get.return_value = client
# Mock host details
host = ProxyHost(
id=5,
created_on="2024-01-01T00:00:00Z",
modified_on="2024-01-01T00:00:00Z",
owner_user_id=1,
domain_names=["test.example.com"],
forward_host="192.168.1.50",
forward_port=8080,
)
client.get_proxy_host = AsyncMock(return_value=host)
# Mock API logs endpoint
client.get_proxy_host_logs = AsyncMock(return_value={
"lines": ["Log line A", "Log line B", "Filter me out"]
})
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
# Retrieve logs with search filter
result = await get_proxy_host_logs(
host_id=5, log_type="access", lines=10, search="Log line"
)
assert "test.example.com" in result
assert "(retrieved via API)" in result
assert "Log line A" in result
assert "Log line B" in result
assert "Filter me out" not in result
assert "Showing last 2 lines:" in result
Generated
+1 -1
View File
@@ -313,7 +313,7 @@ wheels = [
[[package]]
name = "npm-mcp"
version = "0.0.2"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "httpx" },