9 Commits

Author SHA1 Message Date
b3nw ddaf4190f9 feat: implement multi-server support and sync tools
- Introduced ServerRegistry to manage multiple NPM instances
- Added support for NPM_SERVERS JSON environment variable
- Updated all tools to support optional 'server' targeting
- Implemented clone_proxy_host, sync_access_lists, and sync_certificates tools
- Transitioned get_proxy_host_logs to API-based retrieval with local fallback
- Added comprehensive test suite for multi-server management and sync operations

Co-authored-by: claw-io <agent@ben.io>
2026-06-09 19:43:47 +00:00
b3nw 4a95ccd1b5 fix(#2): add multi-architecture Docker builds for arm64 support
Adds QEMU emulation setup and specifies platforms: linux/amd64,linux/arm64
in the GitHub Actions Docker build workflow. This fixes the ARM64 manifest
error ('no matching manifest for linux/arm64/v8') on Apple Silicon Macs.

Changes:
- Add docker/setup-qemu-action@v3 step before Buildx setup
- Add platforms: linux/amd64,linux/arm64 to build-push-action

Closes #2

Co-authored-by: claw-io <agent@ben.io>
2026-06-08 14:08:09 +00:00
b3nw 5e89176806 feat: Add get_proxy_host_logs tool for reading nginx proxy host logs
Reads nginx access/error logs directly from a mounted NPM log directory,
enabling agents to debug proxy issues without SSH access. Requires mounting
NPM's /data/logs volume and setting NPM_LOG_DIR.

Also includes a feature request PRD for proposing a native log API upstream.
2026-05-22 14:27:37 +00:00
b3nw c3227c3a5f fix: Disable Docker push on PR builds and update README
Fork PRs don't have write access to GHCR. Use conditional push
that only pushes on non-PR events (push to main, tags).

Also update README with new tools from v0.0.3:
- update_proxy_host
- create_certificate
2026-02-12 04:06:58 +00:00
b3nw 32f57b1a9e fix: Disable Docker push on PR builds
Fork PRs don't have write access to GHCR. Use conditional push
that only pushes on non-PR events (push to main, tags).
2026-02-12 03:51:18 +00:00
Jordan Réjaud f81bf796a6 Fix Pydantic backward compatibility and add update/certificate tools (#3)
- Make Owner and Certificate model fields optional with defaults to fix
  parsing errors when NPM API returns null/missing nested objects
- Add update_proxy_host tool for modifying existing proxy host configs
- Add create_certificate tool for provisioning Let's Encrypt SSL certs
- Add corresponding client methods with full parameter support
2026-02-11 21:47:54 -06:00
b3nw 15b2876d7b chore: Update version to 0.0.2 2025-12-24 21:23:15 +00:00
b3nw 52eb484432 feat: Add proxy host creation and access list tools (#1)
Add MCP tools for creating proxy hosts and listing access lists, with configurable defaults via environment variable.

New tools:
- create_proxy_host: Create new proxy hosts with SSL, access control, and websocket support
- list_access_lists: List available access lists for use in proxy host creation

New features:
- NPM_PROXY_DEFAULTS env var for configurable default values (certificate_id, access_list_id, ssl settings, etc.)
2025-12-24 15:20:45 -06:00
b3nw 37ad76f012 chore: Use pre-built Docker image from GHCR
- Update compose.yaml to use ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
- Add Docker quick start section to README
- Users can now deploy without cloning the repo
2025-12-18 03:47:35 +00:00
18 changed files with 2448 additions and 58 deletions
+4 -1
View File
@@ -22,11 +22,13 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
@@ -50,6 +52,7 @@ jobs:
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
platforms: linux/amd64,linux/arm64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
+153 -1
View File
@@ -2,7 +2,32 @@
MCP server for [Nginx Proxy Manager](https://nginxproxymanager.com/) - manage your reverse proxy through AI assistants.
## Installation
## Quick Start (Docker)
The easiest way to run the NPM MCP server - no cloning required!
```bash
# Download the compose file
curl -O https://raw.githubusercontent.com/b3nw/nginx-proxy-manager-mcp/main/compose.yaml
# Edit the environment variables, then start
docker compose up -d
```
Or run directly:
```bash
docker run -d \
--name npm-mcp \
-p 8000:8000 \
-e NPM_API_URL=http://your-npm:81/api \
-e NPM_IDENTITY=admin@example.com \
-e NPM_SECRET=yourpassword \
-e NPM_MCP_TRANSPORT=http \
ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
```
## Installation (Local)
```bash
# Using uv (recommended)
@@ -24,8 +49,42 @@ NPM_SECRET=yourpassword
# Optional: Server settings
NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # or "http"
# Optional: Default values for create_proxy_host (JSON)
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
```
### Environment Variables
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `NPM_API_URL` | Yes | `http://localhost:81/api` | NPM API endpoint |
| `NPM_IDENTITY` | Yes | - | NPM user email |
| `NPM_SECRET` | Yes | - | NPM user password |
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
| `NPM_LOG_DIR` | No | - | Path to mounted NPM log directory (enables `get_proxy_host_logs`) |
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
### NPM_PROXY_DEFAULTS Keys
Configure default values for proxy host creation:
```bash
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true, "block_exploits": true}'
```
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `forward_scheme` | string | `"http"` | Backend protocol (`http` or `https`) |
| `certificate_id` | int | `0` | SSL certificate ID (use `list_certificates` to find) |
| `ssl_forced` | bool | `true` | Force HTTPS redirect |
| `block_exploits` | bool | `true` | Enable common exploit blocking |
| `allow_websocket_upgrade` | bool | `true` | Allow WebSocket connections |
| `access_list_id` | int | `0` | Access list ID (use `list_access_lists` to find) |
| `advanced_config` | string | `""` | Custom nginx configuration block |
## Usage
### Stdio Mode (for Claude Desktop, etc.)
@@ -68,8 +127,101 @@ Add to your `claude_desktop_config.json`:
|------|-------------|
| `list_proxy_hosts` | List all proxy hosts |
| `get_proxy_host_details` | Get full config for a specific host |
| `get_proxy_host_logs` | Retrieve nginx access/error logs for a proxy host (requires log mount) |
| `get_system_health` | Check NPM version and status |
| `search_audit_logs` | Query audit log entries |
| `list_certificates` | List SSL certificates |
| `list_access_lists` | List access lists for authentication/IP restrictions |
| `create_proxy_host` | Create a new proxy host |
| `update_proxy_host` | Update an existing proxy host (v0.0.3+) |
| `create_certificate` | Provision a new Let's Encrypt SSL certificate (v0.0.3+) |
## Log Access Setup
The `get_proxy_host_logs` tool reads nginx log files directly from disk. Since NPM has no API for log retrieval, you need to mount NPM's log directory into the MCP container.
NPM writes per-host logs to `/data/logs/` inside its container:
- `proxy-host-{id}_access.log` — HTTP request log (client IP, status, path, user agent)
- `proxy-host-{id}_error.log` — nginx error log (upstream failures, config issues)
### Docker Compose (same stack)
If NPM and the MCP server share a compose stack with a named volume:
```yaml
services:
nginx-proxy-manager:
image: jc21/nginx-proxy-manager:latest
volumes:
- npm_data:/data
npm-mcp:
image: ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
environment:
- NPM_API_URL=http://nginx-proxy-manager:81/api
- NPM_IDENTITY=admin@example.com
- NPM_SECRET=yourpassword
- NPM_LOG_DIR=/data/npm-logs
volumes:
# Mount NPM's /data volume — logs are in /data/logs/ inside it
- npm_data:/data/npm-logs:ro
depends_on:
- nginx-proxy-manager
volumes:
npm_data:
```
> **Note:** NPM stores logs under `/data/logs/` inside its data volume. When you
> mount the full `/data` volume to `/data/npm-logs`, the MCP server looks for logs at
> `/data/npm-logs/logs/`. Set `NPM_LOG_DIR` to match your mount path plus `/logs`.
If you mounted the full data volume:
```bash
NPM_LOG_DIR=/data/npm-logs/logs
```
### Bind Mount (separate stacks)
If NPM uses a bind mount (e.g., `./npm-data:/data`), mount the logs subdirectory directly:
```yaml
npm-mcp:
volumes:
- /path/to/npm-data/logs:/data/npm-logs:ro
environment:
- NPM_LOG_DIR=/data/npm-logs
```
### Docker Run
```bash
docker run -d \
--name npm-mcp \
-p 8000:8000 \
-v npm_data:/data/npm-logs:ro \
-e NPM_API_URL=http://your-npm:81/api \
-e NPM_IDENTITY=admin@example.com \
-e NPM_SECRET=yourpassword \
-e NPM_LOG_DIR=/data/npm-logs/logs \
-e NPM_MCP_TRANSPORT=http \
ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
```
### Local Development (non-Docker)
Point `NPM_LOG_DIR` at wherever NPM's logs are on your filesystem:
```bash
NPM_LOG_DIR=/path/to/npm/data/logs npm-mcp
```
### Verifying the Mount
After starting, call `get_system_health` — if the log directory is mounted and accessible
the tool will confirm it. You can also call `get_proxy_host_logs` with any host ID to
verify logs are readable.
## Development
+12 -5
View File
@@ -1,9 +1,8 @@
# Example Docker Compose configuration for NPM MCP Server
# This runs the MCP server in HTTP mode, suitable for remote AI agents
# Docker Compose configuration for NPM MCP Server
services:
npm-mcp:
build: .
image: ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
container_name: npm-mcp
restart: unless-stopped
ports:
@@ -17,6 +16,11 @@ services:
- NPM_MCP_TRANSPORT=http
- NPM_MCP_HOST=0.0.0.0
- NPM_MCP_PORT=8000
# Optional: Log access (requires volume mount below)
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# # Mount NPM's log directory for get_proxy_host_logs tool (read-only)
# - npm_data:/data/npm-logs:ro # named volume — see "Log Access" in README
# Uncomment to use .env file instead of inline environment
# env_file:
# - .env
@@ -27,7 +31,7 @@ services:
retries: 3
start_period: 10s
# Example: Running alongside NPM in same compose stack
# Example: Running alongside NPM in same compose stack (with log access)
# services:
# nginx-proxy-manager:
# image: jc21/nginx-proxy-manager:latest
@@ -40,11 +44,14 @@ services:
# - npm_letsencrypt:/etc/letsencrypt
#
# npm-mcp:
# build: .
# image: ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
# environment:
# - NPM_API_URL=http://nginx-proxy-manager:81/api
# - NPM_IDENTITY=admin@example.com
# - NPM_SECRET=changeme
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# - npm_data:/data/npm-logs:ro
# depends_on:
# - nginx-proxy-manager
#
+176
View File
@@ -0,0 +1,176 @@
# Feature Request: Per-Host Log Retrieval API
**Project:** [NginxProxyManager/nginx-proxy-manager](https://github.com/NginxProxyManager/nginx-proxy-manager)
**Type:** Feature Request
**Status:** Draft PRD
## Problem Statement
Nginx Proxy Manager writes per-host access and error logs to predictable paths on disk
(`/data/logs/proxy-host-{id}_access.log`, `/data/logs/proxy-host-{id}_error.log`), but
provides no API to read them. The only log-related API is the audit log (`GET /api/audit-log`),
which tracks admin configuration changes — not HTTP traffic.
This means operators and automation tools have no programmatic way to:
- Retrieve recent access log entries for a specific proxy host
- Check error logs when debugging upstream connectivity issues
- Monitor traffic patterns or detect anomalies through the existing API surface
The only workarounds today are direct filesystem access (requiring volume mounts or
`docker exec`) or external log aggregation pipelines, both of which add significant
operational complexity for a task that should be simple.
## Proposed Solution
Add REST API endpoints to retrieve nginx access and error logs for individual proxy hosts.
### New Endpoints
#### `GET /api/nginx/proxy-hosts/{id}/logs`
Retrieve log entries for a specific proxy host.
**Query Parameters:**
| Parameter | Type | Default | Description |
|------------|--------|------------|-------------------------------------------------------|
| `type` | string | `"access"` | Log type: `"access"` or `"error"` |
| `lines` | int | `100` | Number of most recent lines to return (max: `1000`) |
| `search` | string | - | Filter lines containing this substring |
| `since` | string | - | ISO 8601 timestamp — only return lines after this time|
**Response (200):**
```json
{
"host_id": 5,
"log_type": "access",
"file": "proxy-host-5_access.log",
"total_lines": 4821,
"returned_lines": 100,
"lines": [
"[01/Jun/2025:14:22:31 +0000] HIT 200 200 - GET https app.example.com \"/api/data\" [Client 10.0.0.1] [Length 1542] [Gzip -] [Sent-to 192.168.1.50] \"Mozilla/5.0\" \"https://app.example.com/\"",
"..."
]
}
```
**Error Responses:**
| Status | Condition |
|--------|-------------------------------------|
| 404 | Proxy host not found |
| 404 | Log file does not exist |
| 403 | User lacks permission for this host |
#### `GET /api/nginx/proxy-hosts/{id}/logs/summary`
Return a statistical summary of recent traffic for a proxy host.
**Response (200):**
```json
{
"host_id": 5,
"period": "last_1000_lines",
"status_codes": {"200": 812, "301": 45, "404": 23, "500": 3},
"top_paths": ["/api/data", "/", "/login"],
"top_clients": ["10.0.0.1", "10.0.0.5"],
"cache_hit_rate": 0.42,
"access_log_size_bytes": 524288,
"error_log_size_bytes": 8192
}
```
### Permissions
| Permission | Description |
|--------------------|--------------------------------------------|
| `proxy-hosts:logs` | Read logs for proxy hosts the user can view |
Admin users can read logs for any host. Non-admin users can only read logs
for hosts they own, consistent with existing proxy host permissions.
### Backend Implementation Notes
The implementation is straightforward because log paths are already deterministic
and hardcoded in nginx templates:
```javascript
// backend/templates/proxy_host.conf
access_log /data/logs/proxy-host-{{ id }}_access.log proxy;
error_log /data/logs/proxy-host-{{ id }}_error.log warn;
```
A minimal implementation would:
1. Add a new route in `backend/routes/` (e.g., `proxy-host-logs.js`)
2. Verify the proxy host exists and the user has access
3. Read the last N lines from the log file using a reverse-reader (or `tail`-like approach)
4. Optionally filter lines by substring or timestamp
5. Return as JSON
**Reference files for implementation:**
- `backend/routes/nginx/proxy_hosts.js` — existing proxy host routes and permission model
- `backend/templates/proxy_host.conf` — log path template confirming the naming convention
- `backend/lib/access/` — permission definition files
- `docker/rootfs/etc/logrotate.d/nginx-proxy-manager` — log rotation config (rotated logs
have `.1`, `.2.gz` suffixes)
### Log Rotation Consideration
NPM rotates logs weekly (access: 4 rotations, error: 10 rotations). The API should
read only the current (unrotated) log file. Rotated archives (`.1`, `.2.gz`) could be
supported in a future iteration but are not required for the initial implementation.
## Motivation
### Use Cases
1. **MCP/AI Agent Integration** — MCP servers wrapping the NPM API (like
[nginx-proxy-manager-mcp](https://github.com/b3nw/nginx-proxy-manager-mcp)) can
expose log retrieval to AI assistants for debugging proxy issues conversationally.
2. **Quick Debugging** — When a reverse proxy returns errors, operators need to quickly
check the access and error logs for that specific host. Today this requires SSH/exec
access to the container.
3. **Monitoring Dashboards** — Custom dashboards can poll the log endpoint for
traffic summaries without deploying a full log aggregation stack.
4. **Automation** — CI/CD pipelines and health-check scripts can verify that traffic
is flowing correctly to newly deployed services behind NPM.
### Why Not External Log Aggregation?
External solutions (Loki, ELK, Fluentd) are powerful but heavy. Many NPM users run
single-host homelab setups where a full log pipeline is disproportionate to the need.
A built-in API covers 80% of use cases with zero additional infrastructure.
## Alternatives Considered
| Approach | Pros | Cons |
|---------------------------|-----------------------------|-------------------------------------------------|
| **API endpoint (proposed)** | Native, zero extra infra | Requires upstream PR |
| Volume mount + file read | Works today, no NPM changes | Tight coupling, no access control, not portable |
| Docker exec | Works today | Requires Docker socket, security risk |
| Sidecar log server | Decoupled | Extra container, extra config, extra maintenance |
## Scope
### In Scope (v1)
- `GET /api/nginx/proxy-hosts/{id}/logs` with `type`, `lines`, `search` params
- Permission checks consistent with existing proxy host access model
- Current (unrotated) log file only
### Out of Scope (Future)
- Log streaming via WebSocket or SSE
- Rotated log archive access (`.gz` files)
- Log summary/analytics endpoint
- Redirection host, dead host, and stream logs (same pattern, easy to add later)
- Log download as file attachment
- Log retention configuration via API
+10
View File
@@ -7,3 +7,13 @@ NPM_SECRET=changeme
NPM_MCP_HOST=0.0.0.0
NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # stdio or http
# Log Access (optional)
# Mount NPM's /data/logs directory and set this to the mount path.
# Enables the get_proxy_host_logs tool for reading nginx access/error logs.
# NPM_LOG_DIR=/data/npm-logs
# Proxy Host Creation Defaults (JSON)
# Set default values for create_proxy_host tool parameters
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
# NPM_PROXY_DEFAULTS='{"certificate_id": 0, "ssl_forced": true, "block_exploits": true, "allow_websocket_upgrade": true}'
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "npm-mcp"
version = "0.1.0"
version = "0.0.2"
description = "MCP server for Nginx Proxy Manager"
readme = "README.md"
requires-python = ">=3.11"
+198 -6
View File
@@ -2,6 +2,7 @@
import logging
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
@@ -13,6 +14,7 @@ from .exceptions import (
NpmNotFoundError,
)
from .models import (
AccessList,
AuditLogEntry,
Certificate,
HealthStatus,
@@ -92,9 +94,7 @@ class NpmClient:
raise NpmAuthenticationError("Invalid credentials")
if response.status_code != 200:
raise NpmApiError(
f"Login failed: {response.text}", status_code=response.status_code
)
raise NpmApiError(f"Login failed: {response.text}", status_code=response.status_code)
data = response.json()
token_response = TokenResponse(**data)
@@ -172,9 +172,7 @@ class NpmClient:
raise NpmNotFoundError(f"Resource not found: {endpoint}")
if response.status_code >= 400:
raise NpmApiError(
f"API error: {response.text}", status_code=response.status_code
)
raise NpmApiError(f"API error: {response.text}", status_code=response.status_code)
return response
@@ -259,3 +257,197 @@ class NpmClient:
)
data = response.json()
return [AuditLogEntry(**entry) for entry in data]
async def get_access_lists(self) -> list[AccessList]:
"""Get all access lists."""
response = await self._request("GET", "/nginx/access-lists")
data = response.json()
return [AccessList(**item) for item in data]
async def create_proxy_host(
self,
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str = "http",
certificate_id: int | None = None,
ssl_forced: bool = True,
hsts_enabled: bool = True,
hsts_subdomains: bool = False,
http2_support: bool = True,
block_exploits: bool = True,
caching_enabled: bool = False,
allow_websocket_upgrade: bool = True,
access_list_id: int = 0,
advanced_config: str = "",
meta: dict | None = None,
) -> ProxyHost:
"""Create a new proxy host.
Args:
domain_names: List of domain names for this host
forward_host: Backend host to forward to
forward_port: Backend port to forward to
forward_scheme: http or https
certificate_id: SSL certificate ID (0 for none, use list_certificates to find)
ssl_forced: Force SSL/HTTPS
hsts_enabled: Enable HSTS
hsts_subdomains: Include subdomains in HSTS
http2_support: Enable HTTP/2
block_exploits: Enable exploit blocking
caching_enabled: Enable caching
allow_websocket_upgrade: Allow WebSocket upgrades
access_list_id: Access list ID (0 for none, use list_access_lists to find)
advanced_config: Custom nginx configuration
meta: Additional metadata
Returns:
Created ProxyHost object
"""
payload = {
"domain_names": domain_names,
"forward_host": forward_host,
"forward_port": forward_port,
"forward_scheme": forward_scheme,
"certificate_id": certificate_id or 0,
"ssl_forced": ssl_forced,
"hsts_enabled": hsts_enabled,
"hsts_subdomains": hsts_subdomains,
"http2_support": http2_support,
"block_exploits": block_exploits,
"caching_enabled": caching_enabled,
"allow_websocket_upgrade": allow_websocket_upgrade,
"access_list_id": access_list_id,
"advanced_config": advanced_config,
"meta": meta or {},
}
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
return ProxyHost(**response.json())
async def update_proxy_host(
self,
host_id: int,
**kwargs,
) -> ProxyHost:
"""Update an existing proxy host.
Args:
host_id: The proxy host ID to update
**kwargs: Fields to update (same as create_proxy_host)
Returns:
Updated ProxyHost object
"""
# Get existing host to merge with updates
existing = await self.get_proxy_host(host_id)
payload = {
"domain_names": existing.domain_names,
"forward_host": existing.forward_host,
"forward_port": existing.forward_port,
"forward_scheme": existing.forward_scheme,
"certificate_id": existing.certificate_id or 0,
"ssl_forced": existing.ssl_forced,
"hsts_enabled": existing.hsts_enabled,
"hsts_subdomains": existing.hsts_subdomains,
"http2_support": existing.http2_support,
"block_exploits": existing.block_exploits,
"caching_enabled": existing.caching_enabled,
"allow_websocket_upgrade": existing.allow_websocket_upgrade,
"access_list_id": existing.access_list_id,
"advanced_config": existing.advanced_config,
"meta": existing.meta,
}
payload.update({k: v for k, v in kwargs.items() if v is not None})
response = await self._request("PUT", f"/nginx/proxy-hosts/{host_id}", json=payload)
return ProxyHost(**response.json())
async def create_certificate(
self,
domain_names: list[str],
email: str,
provider: str = "letsencrypt",
dns_challenge: bool = False,
) -> Certificate:
"""Create/provision a new SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
provider: Certificate provider (default: "letsencrypt")
dns_challenge: Use DNS challenge instead of HTTP (default: False)
Returns:
Created Certificate object
"""
payload = {
"domain_names": domain_names,
"meta": {
"letsencrypt_email": email,
"letsencrypt_agree": True,
"dns_challenge": dns_challenge,
},
"provider": provider,
}
response = await self._request("POST", "/nginx/certificates", json=payload)
return Certificate(**response.json())
async def get_proxy_host_logs(
self,
host_id: int,
log_type: str = "access",
lines: int = 100,
) -> dict[str, Any]:
"""Retrieve proxy host logs via the API.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return.
"""
response = await self._request(
"GET",
f"/nginx/proxy-hosts/{host_id}/logs",
params={"type": log_type, "limit": lines},
)
return response.json()
async def get_proxy_host_logs_summary(self, host_id: int) -> dict[str, Any]:
"""Retrieve proxy host logs summary via the API.
Args:
host_id: NPM proxy host ID.
"""
response = await self._request("GET", f"/nginx/proxy-hosts/{host_id}/logs/summary")
return response.json()
async def create_access_list(
self,
name: str,
satisfy_any: bool = False,
pass_auth: bool = False,
items: list[dict[str, Any]] | None = None,
clients: list[dict[str, Any]] | None = None,
) -> AccessList:
"""Create a new access list on the server.
Args:
name: Name of the access list
satisfy_any: Satisfy any HTTP auth or IP restriction
pass_auth: Pass auth headers to host
items: Auth entries (username/password) and IP restrictions
clients: IP restriction clients
"""
payload = {
"name": name,
"satisfy_any": satisfy_any,
"pass_auth": pass_auth,
"items": items or [],
"clients": clients or [],
}
response = await self._request("POST", "/nginx/access-lists", json=payload)
return AccessList(**response.json())
+72
View File
@@ -1,7 +1,26 @@
"""Configuration management using pydantic-settings."""
from typing import Any
from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
# Default values for proxy host creation
DEFAULT_PROXY_SETTINGS: dict[str, Any] = {
"forward_scheme": "http",
"certificate_id": 0,
"ssl_forced": True,
"hsts_enabled": True,
"hsts_subdomains": False,
"http2_support": True,
"caching_enabled": False,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
"meta": {},
}
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
@@ -18,10 +37,63 @@ class Settings(BaseSettings):
identity: str = ""
secret: str = ""
# Multi-Server Configuration
# Example JSON:
# '[{"name": "prod", "url": "http://10.0.0.10:81/api",
# "identity": "admin@example.com", "secret": "pwd"}]'
servers: list[dict[str, Any]] = []
default_server: str | None = None
@field_validator("servers", mode="before")
@classmethod
def parse_servers(cls, v: Any) -> list[dict[str, Any]]:
"""Parse JSON string to list of dicts, or pass through if already list."""
if isinstance(v, list):
return v
if isinstance(v, str) and v.strip():
import json
try:
data = json.loads(v)
if not isinstance(data, list):
raise ValueError("NPM_SERVERS must be a list of objects")
return data
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in NPM_SERVERS: {e}") from e
return []
# MCP Server Configuration
mcp_host: str = "0.0.0.0"
mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http"
# Path to NPM log directory (mount NPM's /data/logs here)
log_dir: str = ""
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}
@field_validator("proxy_defaults", mode="before")
@classmethod
def parse_proxy_defaults(cls, v: Any) -> dict[str, Any]:
"""Parse JSON string to dict, or pass through if already dict."""
if isinstance(v, dict):
return v
if isinstance(v, str) and v.strip():
import json
try:
return json.loads(v)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in NPM_PROXY_DEFAULTS: {e}") from e
return {}
def get_proxy_defaults(self) -> dict[str, Any]:
"""Get merged proxy defaults (base defaults + user overrides)."""
merged = DEFAULT_PROXY_SETTINGS.copy()
merged.update(self.proxy_defaults)
return merged
settings = Settings()
+6
View File
@@ -31,3 +31,9 @@ class NpmApiError(NpmClientError):
def __init__(self, message: str, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class NpmLogError(NpmClientError):
"""Raised when log file operations fail."""
pass
+118
View File
@@ -0,0 +1,118 @@
"""Log file reader for Nginx Proxy Manager proxy host logs."""
from __future__ import annotations
import re
from pathlib import Path
from .config import settings
from .exceptions import NpmLogError
LOG_FILE_PATTERN = re.compile(r"^proxy-host-(\d+)_(access|error)\.log$")
MAX_LINES = 500
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB safety cap
def _get_log_dir() -> Path:
"""Resolve and validate the configured log directory."""
log_dir = settings.log_dir
if not log_dir:
raise NpmLogError(
"NPM_LOG_DIR is not configured. Mount NPM's /data/logs volume "
"and set NPM_LOG_DIR to the mount path. See README for details."
)
path = Path(log_dir)
if not path.is_dir():
raise NpmLogError(f"Log directory does not exist: {log_dir}")
return path
def _log_file_path(host_id: int, log_type: str) -> Path:
if log_type not in ("access", "error"):
raise NpmLogError(f"Invalid log type: {log_type!r} (must be 'access' or 'error')")
log_dir = _get_log_dir()
return log_dir / f"proxy-host-{host_id}_{log_type}.log"
def read_log_lines(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> dict:
"""Read the last N lines from a proxy host log file.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return (capped at MAX_LINES).
search: Optional substring filter applied to each line.
Returns:
Dict with host_id, log_type, file name, line count, and the lines themselves.
"""
lines = max(1, min(lines, MAX_LINES))
log_path = _log_file_path(host_id, log_type)
if not log_path.is_file():
raise NpmLogError(
f"Log file not found: {log_path.name}. "
"The proxy host may not have received any traffic yet, "
"or the log directory mount is incorrect."
)
file_size = log_path.stat().st_size
if file_size > MAX_FILE_SIZE:
raise NpmLogError(
f"Log file is too large ({file_size / 1024 / 1024:.1f} MB). "
"Consider using an external log aggregation tool."
)
all_lines = log_path.read_text(errors="replace").splitlines()
total_lines = len(all_lines)
if search:
search_lower = search.lower()
all_lines = [line for line in all_lines if search_lower in line.lower()]
tail = all_lines[-lines:]
return {
"host_id": host_id,
"log_type": log_type,
"file": log_path.name,
"total_lines_in_file": total_lines if not search else None,
"matched_lines": len(all_lines) if search else None,
"returned_lines": len(tail),
"lines": tail,
}
def is_log_dir_configured() -> bool:
"""Check whether the log directory is configured and accessible."""
if not settings.log_dir:
return False
return Path(settings.log_dir).is_dir()
def list_available_logs() -> list[dict]:
"""List all proxy-host log files present in the log directory.
Returns:
List of dicts with host_id, log_type, file name, and size.
"""
log_dir = _get_log_dir()
results = []
for entry in sorted(log_dir.iterdir()):
match = LOG_FILE_PATTERN.match(entry.name)
if match and entry.is_file():
results.append(
{
"host_id": int(match.group(1)),
"log_type": match.group(2),
"file": entry.name,
"size_bytes": entry.stat().st_size,
}
)
return results
+25 -13
View File
@@ -23,27 +23,39 @@ class UserMeta(BaseModel):
class Owner(BaseModel):
"""Proxy host owner information."""
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
is_disabled: bool = False
email: str | None = None
name: str = ""
nickname: str = ""
avatar: str = ""
roles: list[str] = Field(default_factory=list)
class AccessList(BaseModel):
"""Access list for authentication/IP restrictions."""
id: int
created_on: datetime
modified_on: datetime
is_disabled: bool
email: str
owner_user_id: int = 0
name: str
nickname: str
avatar: str
roles: list[str]
satisfy_any: bool = False
pass_auth: bool = False
class Certificate(BaseModel):
"""SSL Certificate information."""
id: int
created_on: datetime
modified_on: datetime
owner_user_id: int
provider: str
nice_name: str
domain_names: list[str]
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
owner_user_id: int | None = None
provider: str = ""
nice_name: str = ""
domain_names: list[str] = Field(default_factory=list)
expires_on: datetime | None = None
meta: dict[str, Any] = Field(default_factory=dict)
@@ -81,7 +93,7 @@ class ProxyHost(BaseModel):
advanced_config: str = ""
enabled: bool = True
meta: dict[str, Any] = Field(default_factory=dict)
locations: list[ProxyHostLocation] = Field(default_factory=list)
locations: list[ProxyHostLocation] | None = None
# Optional expanded relations
owner: Owner | None = None
certificate: Certificate | None = None
+785 -28
View File
@@ -9,34 +9,114 @@ from mcp.server.fastmcp import FastMCP
from .client import NpmClient
from .config import settings
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError, NpmLogError
from .logs import is_log_dir_configured, list_available_logs, read_log_lines
logger = logging.getLogger(__name__)
# Create global client instance (lazy initialization)
_client: NpmClient | None = None
class ServerRegistry:
"""Manages multiple NpmClient connections."""
def __init__(self, configs: list[dict[str, Any]], default: str | None = None):
self._clients: dict[str, NpmClient] = {}
self._default = default
# Register multi-server entries
for cfg in configs:
name = cfg.get("name")
url = cfg.get("url") or cfg.get("api_url")
identity = cfg.get("identity")
secret = cfg.get("secret")
if not all([name, url, identity, secret]):
logger.warning(f"Server '{name}' is missing required fields, skipping")
continue
self._clients[name] = NpmClient(base_url=url, identity=identity, secret=secret)
# Fallback to single-server settings if registry is empty
if not self._clients and settings.api_url and settings.identity and settings.secret:
logger.info("No servers in NPM_SERVERS. Using single-server environment variables.")
self._clients["default"] = NpmClient(
base_url=settings.api_url,
identity=settings.identity,
secret=settings.secret
)
if not self._default:
self._default = "default"
# Validate default
if self._default and self._default not in self._clients:
logger.warning(
f"Default server '{self._default}' is not in configured servers. "
"Clearing default."
)
self._default = None
def get(self, name: str | None = None) -> NpmClient:
"""Retrieve client by name. Fallback to default if name is None/empty."""
if not self._clients:
raise KeyError("No NPM servers configured.")
if name is None or name == "":
if self._default:
name = self._default
elif len(self._clients) == 1:
name = next(iter(self._clients.keys()))
else:
raise KeyError("Multiple servers configured but no default server specified.")
if name not in self._clients:
raise KeyError(
f"Server '{name}' not found. "
f"Configured servers: {list(self._clients.keys())}"
)
return self._clients[name]
def list_names(self) -> list[str]:
return list(self._clients.keys())
def get_default(self) -> str | None:
return self._default
async def close_all(self) -> None:
for client in self._clients.values():
await client.close()
# Global registry
registry: ServerRegistry | None = None
def get_registry() -> ServerRegistry:
"""Get or create the global ServerRegistry instance."""
global registry
if registry is None:
registry = ServerRegistry(settings.servers, settings.default_server)
return registry
def _get_client(server: str | None = None) -> NpmClient:
"""Retrieve NPM client for the specified server, or fallback to default."""
return get_registry().get(server)
def get_client() -> NpmClient:
"""Get or create the NPM client instance."""
global _client
if _client is None:
_client = NpmClient()
return _client
"""Get or create the NPM client instance (backward compatibility)."""
return _get_client(None)
@asynccontextmanager
async def lifespan(server: FastMCP):
"""Manage client lifecycle."""
global _client
_client = NpmClient()
logger.info(f"NPM MCP Server starting, connecting to {settings.api_url}")
global registry
registry = ServerRegistry(settings.servers, settings.default_server)
logger.info(f"NPM MCP Server starting. Configured servers: {registry.list_names()}")
try:
yield
finally:
if _client:
await _client.close()
_client = None
if registry:
await registry.close_all()
registry = None
logger.info("NPM MCP Server stopped")
@@ -52,29 +132,34 @@ mcp = FastMCP(
def _format_error(e: Exception) -> str:
"""Format exception for tool response."""
if isinstance(e, NpmAuthenticationError):
if isinstance(e, KeyError):
return f"Configuration error: {e.args[0]}"
elif isinstance(e, NpmAuthenticationError):
return f"Authentication failed: {e}"
elif isinstance(e, NpmConnectionError):
return f"Connection error: {e}"
elif isinstance(e, NpmLogError):
return f"Log error: {e}"
elif isinstance(e, NpmApiError):
return f"API error: {e}"
return f"Error: {e}"
# =============================================================================
# Tools
# =============================================================================
@mcp.tool()
async def list_proxy_hosts() -> str:
async def list_proxy_hosts(server: str | None = None) -> str:
"""List all proxy hosts configured in Nginx Proxy Manager.
Returns a summary of all proxy hosts including their domains,
forward destinations, and SSL status.
"""
try:
client = get_client()
client = _get_client(server)
hosts = await client.get_proxy_hosts()
if not hosts:
@@ -98,17 +183,18 @@ async def list_proxy_hosts() -> str:
@mcp.tool()
async def get_proxy_host_details(host_id: int) -> str:
async def get_proxy_host_details(host_id: int, server: str | None = None) -> str:
"""Get detailed configuration for a specific proxy host.
Args:
host_id: The ID of the proxy host to retrieve
server: Target server name
Returns full configuration including SSL settings, locations,
and advanced configuration.
"""
try:
client = get_client()
client = _get_client(server)
host = await client.get_proxy_host(host_id)
details: dict[str, Any] = {
@@ -161,13 +247,13 @@ async def get_proxy_host_details(host_id: int) -> str:
@mcp.tool()
async def get_system_health() -> str:
async def get_system_health(server: str | None = None) -> str:
"""Check the health and status of the Nginx Proxy Manager instance.
Returns system status, version information, and connectivity status.
"""
try:
client = get_client()
client = _get_client(server)
status = await client.get_status()
result = [f"Status: {status.status}"]
@@ -179,7 +265,7 @@ async def get_system_health() -> str:
try:
await client._ensure_authenticated()
result.append("Authenticated: ✅")
# Try to get settings (admin only)
try:
settings_list = await client.get_settings()
@@ -189,6 +275,14 @@ async def get_system_health() -> str:
except NpmAuthenticationError:
result.append("Authenticated: ❌ (check credentials)")
if is_log_dir_configured():
logs = list_available_logs()
result.append(f"Log directory: ✅ ({len(logs)} log files found)")
else:
result.append(
"Log directory: ❌ (not configured — set NPM_LOG_DIR to enable get_proxy_host_logs)"
)
return "\n".join(result)
except Exception as e:
@@ -196,17 +290,18 @@ async def get_system_health() -> str:
@mcp.tool()
async def search_audit_logs(limit: int = 50, offset: int = 0) -> str:
async def search_audit_logs(limit: int = 50, offset: int = 0, server: str | None = None) -> str:
"""Search the audit log for recent actions in Nginx Proxy Manager.
Args:
limit: Maximum number of entries to return (default: 50, max: 100)
offset: Number of entries to skip for pagination (default: 0)
server: Target server name
Returns recent audit log entries showing user actions and changes.
"""
try:
client = get_client()
client = _get_client(server)
limit = min(limit, 100) # Cap at 100
entries = await client.get_audit_log(limit=limit, offset=offset)
@@ -229,14 +324,14 @@ async def search_audit_logs(limit: int = 50, offset: int = 0) -> str:
@mcp.tool()
async def list_certificates() -> str:
async def list_certificates(server: str | None = None) -> str:
"""List all SSL certificates managed by Nginx Proxy Manager.
Returns a summary of all certificates including their domains,
provider, and expiration dates.
"""
try:
client = get_client()
client = _get_client(server)
certs = await client.get_certificates()
if not certs:
@@ -253,11 +348,673 @@ async def list_certificates() -> str:
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
result.append(
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n"
f" Domains: {domains}{expiry}"
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n Domains: {domains}{expiry}"
)
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def list_access_lists(server: str | None = None) -> str:
"""List all access lists configured in Nginx Proxy Manager.
Returns a summary of all access lists including their IDs and names.
Use these IDs when creating proxy hosts that require access control.
"""
try:
client = _get_client(server)
access_lists = await client.get_access_lists()
if not access_lists:
return "No access lists configured."
result = []
for al in access_lists:
result.append(f"[{al.id}] {al.name}")
return f"Found {len(access_lists)} access list(s):\n\n" + "\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_proxy_host(
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
server: str | None = None,
) -> str:
"""Create a new proxy host in Nginx Proxy Manager.
Args:
domain_names: List of domain names (e.g., ["app.ext.ben.io"])
forward_host: Backend host/IP to forward to (e.g., "192.168.1.100" or "container-name")
forward_port: Backend port to forward to (e.g., 8080)
forward_scheme: Backend protocol - "http" or "https" (default from config)
certificate_id: SSL certificate ID. Use list_certificates to find available certs.
Use 0 for no SSL, or the ID of a wildcard cert. (default from config)
ssl_forced: Force HTTPS redirect (default from config)
block_exploits: Enable common exploit blocking (default from config)
allow_websocket_upgrade: Allow WebSocket connections (default from config)
access_list_id: Access list ID for authentication. Use list_access_lists to find.
Use 0 for no access restrictions. (default from config)
advanced_config: Custom nginx configuration block (default from config)
server: Target server name
Returns:
Details of the created proxy host including the new host ID.
Note:
Default values can be configured via NPM_PROXY_DEFAULTS environment variable.
Example: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
Example:
create_proxy_host(
domain_names=["myapp.ext.ben.io"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24, # *.ext.ben.io wildcard
)
"""
try:
# Get defaults from config, then override with provided values
defaults = settings.get_proxy_defaults()
client = _get_client(server)
host = await client.create_proxy_host(
domain_names=domain_names,
forward_host=forward_host,
forward_port=forward_port,
forward_scheme=forward_scheme
if forward_scheme is not None
else defaults["forward_scheme"],
certificate_id=certificate_id
if certificate_id is not None
else defaults["certificate_id"],
ssl_forced=ssl_forced if ssl_forced is not None else defaults["ssl_forced"],
hsts_enabled=defaults.get("hsts_enabled", True),
hsts_subdomains=defaults.get("hsts_subdomains", False),
http2_support=defaults.get("http2_support", True),
block_exploits=block_exploits
if block_exploits is not None
else defaults["block_exploits"],
caching_enabled=defaults.get("caching_enabled", False),
allow_websocket_upgrade=allow_websocket_upgrade
if allow_websocket_upgrade is not None
else defaults["allow_websocket_upgrade"],
access_list_id=access_list_id
if access_list_id is not None
else defaults["access_list_id"],
advanced_config=advanced_config
if advanced_config is not None
else defaults["advanced_config"],
meta=defaults.get("meta", {}),
)
domains = ", ".join(host.domain_names)
return (
f"Successfully created proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def update_proxy_host(
host_id: int,
forward_host: str | None = None,
forward_port: int | None = None,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
server: str | None = None,
) -> str:
"""Update an existing proxy host in Nginx Proxy Manager.
Only provided fields will be updated; all others remain unchanged.
Args:
host_id: The ID of the proxy host to update
forward_host: Backend host/IP to forward to
forward_port: Backend port to forward to
forward_scheme: Backend protocol - "http" or "https"
certificate_id: SSL certificate ID (use list_certificates to find, 0 for none)
ssl_forced: Force HTTPS redirect
block_exploits: Enable common exploit blocking
allow_websocket_upgrade: Allow WebSocket connections
access_list_id: Access list ID (0 for no restrictions)
advanced_config: Custom nginx configuration block
server: Target server name
Returns:
Details of the updated proxy host.
"""
try:
client = _get_client(server)
kwargs = {}
if forward_host is not None:
kwargs["forward_host"] = forward_host
if forward_port is not None:
kwargs["forward_port"] = forward_port
if forward_scheme is not None:
kwargs["forward_scheme"] = forward_scheme
if certificate_id is not None:
kwargs["certificate_id"] = certificate_id
if ssl_forced is not None:
kwargs["ssl_forced"] = ssl_forced
if block_exploits is not None:
kwargs["block_exploits"] = block_exploits
if allow_websocket_upgrade is not None:
kwargs["allow_websocket_upgrade"] = allow_websocket_upgrade
if access_list_id is not None:
kwargs["access_list_id"] = access_list_id
if advanced_config is not None:
kwargs["advanced_config"] = advanced_config
host = await client.update_proxy_host(host_id, **kwargs)
domains = ", ".join(host.domain_names)
return (
f"Successfully updated proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}\n"
f"Certificate ID: {host.certificate_id}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def get_proxy_host_logs(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
server: str | None = None,
) -> str:
"""Retrieve recent nginx log entries for a specific proxy host.
Reads the raw nginx access or error log file for the given host or retrieves
them via the API.
Args:
host_id: The ID of the proxy host (use list_proxy_hosts to find IDs)
log_type: Log type - "access" for HTTP traffic or "error"
for nginx errors (default: "access")
lines: Number of most recent lines to return
(default: 100, max: 500)
search: Optional filter string - only lines containing this
text are returned (case-insensitive)
server: Target server name
"""
try:
client = _get_client(server)
# Try retrieving logs via the API first
try:
log_data = await client.get_proxy_host_logs(
host_id=host_id,
log_type=log_type,
lines=lines,
)
raw_lines = log_data.get("lines", [])
if search:
search_lower = search.lower()
filtered_lines = [
line for line in raw_lines if search_lower in line.lower()
]
else:
filtered_lines = raw_lines
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log (retrieved via API)",
f"Showing last {len(filtered_lines)} lines:",
]
header = "\n".join(header_parts)
if not filtered_lines:
return f"{header}\n\n(no log entries found)"
return f"{header}\n\n" + "\n".join(filtered_lines)
except Exception as api_err:
# Fallback for default server or if local logs are mounted
reg = get_registry()
is_default = False
try:
target_client = reg.get(server)
default_client = reg.get(None)
if target_client.base_url == default_client.base_url:
is_default = True
except Exception:
pass
if is_default and is_log_dir_configured():
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
result = read_log_lines(
host_id=host_id,
log_type=log_type,
lines=lines,
search=search,
)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log (local fallback)",
f"File: {result['file']}",
]
if result["total_lines_in_file"] is not None:
header_parts.append(f"Total lines in file: {result['total_lines_in_file']}")
if result["matched_lines"] is not None:
header_parts.append(f"Lines matching '{search}': {result['matched_lines']}")
header_parts.append(f"Showing last {result['returned_lines']} lines:")
header = "\n".join(header_parts)
if not result["lines"]:
return f"{header}\n\n(no log entries found)"
return f"{header}\n\n" + "\n".join(result["lines"])
else:
raise api_err
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_certificate(
domain_names: list[str],
email: str,
dns_challenge: bool = False,
server: str | None = None,
) -> str:
"""Provision a new Let's Encrypt SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
dns_challenge: Use DNS challenge instead of HTTP (default: False)
server: Target server name
"""
try:
client = _get_client(server)
cert = await client.create_certificate(
domain_names=domain_names,
email=email,
dns_challenge=dns_challenge,
)
domains = ", ".join(cert.domain_names)
expiry = cert.expires_on.strftime("%Y-%m-%d") if cert.expires_on else "N/A"
return (
f"Successfully created certificate!\n\n"
f"ID: {cert.id}\n"
f"Provider: {cert.provider}\n"
f"Domains: {domains}\n"
f"Expires: {expiry}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def list_servers() -> str:
"""List all configured NPM servers and their health/connectivity status.
Returns a JSON string containing the list of registered servers,
the default server, and their health status.
"""
try:
reg = get_registry()
server_names = reg.list_names()
default_server = reg.get_default()
health_status = {}
for name in server_names:
try:
client = reg.get(name)
status = await client.get_status()
health_status[name] = {"status": status.status, "version": status.version}
except Exception as e:
health_status[name] = {"status": "error", "error": str(e)}
return json.dumps({
"servers": server_names,
"default_server": default_server,
"health": health_status
}, indent=2)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def clone_proxy_host(
source_server: str,
target_server: str,
host_id: int,
override_settings: dict[str, Any] | None = None,
) -> str:
"""Clone a proxy host configuration from source_server to target_server.
Resolves certificate and access list IDs automatically by matching names/domains.
Args:
source_server: Name of the source server.
target_server: Name of the destination server.
host_id: The ID of the host on the source server.
override_settings: Optional dict of settings to override during creation.
"""
try:
source_client = _get_client(source_server)
target_client = _get_client(target_server)
# Retrieve host configuration from source server
host = await source_client.get_proxy_host(host_id)
# Resolve certificate ID
target_cert_id = 0
cert_resolved_msg = "None"
if host.certificate_id and host.certificate_id > 0:
try:
source_cert = await source_client.get_certificate(host.certificate_id)
target_certs = await target_client.get_certificates()
matched_cert = None
for cert in target_certs:
if (
source_cert.nice_name
and cert.nice_name
and source_cert.nice_name == cert.nice_name
):
matched_cert = cert
break
if (
source_cert.domain_names
and cert.domain_names
and set(source_cert.domain_names) == set(cert.domain_names)
):
matched_cert = cert
break
if matched_cert:
target_cert_id = matched_cert.id
cert_name = matched_cert.nice_name or ", ".join(matched_cert.domain_names)
cert_resolved_msg = f"Resolved to ID {target_cert_id} ({cert_name})"
else:
source_name = source_cert.nice_name or ", ".join(source_cert.domain_names)
cert_resolved_msg = (
f"Could not resolve source certificate '{source_name}' "
"on target server. Defaulting to None (0)."
)
except Exception as e:
cert_resolved_msg = f"Error resolving certificate: {e}. Defaulting to None (0)."
# Resolve access list ID
target_access_list_id = 0
access_list_resolved_msg = "None"
if host.access_list_id and host.access_list_id > 0:
try:
source_alists = await source_client.get_access_lists()
source_alist = next(
(al for al in source_alists if al.id == host.access_list_id), None
)
if source_alist:
target_alists = await target_client.get_access_lists()
matched_alist = next(
(al for al in target_alists if al.name == source_alist.name), None
)
if matched_alist:
target_access_list_id = matched_alist.id
access_list_resolved_msg = (
f"Resolved to ID {target_access_list_id} ({matched_alist.name})"
)
else:
access_list_resolved_msg = (
f"Could not resolve source access list '{source_alist.name}' "
"on target server. Defaulting to None (0)."
)
else:
access_list_resolved_msg = (
"Source access list not found. Defaulting to None (0)."
)
except Exception as e:
access_list_resolved_msg = (
f"Error resolving access list: {e}. Defaulting to None (0)."
)
# Construct creation payload
payload = {
"domain_names": host.domain_names,
"forward_host": host.forward_host,
"forward_port": host.forward_port,
"forward_scheme": host.forward_scheme,
"ssl_forced": host.ssl_forced,
"hsts_enabled": host.hsts_enabled,
"hsts_subdomains": host.hsts_subdomains,
"http2_support": host.http2_support,
"block_exploits": host.block_exploits,
"caching_enabled": host.caching_enabled,
"allow_websocket_upgrade": host.allow_websocket_upgrade,
"advanced_config": host.advanced_config,
"meta": host.meta,
"certificate_id": target_cert_id,
"access_list_id": target_access_list_id,
}
# Apply overrides
if override_settings:
payload.update(override_settings)
# Create proxy host on target server
new_host = await target_client.create_proxy_host(
domain_names=payload["domain_names"],
forward_host=payload["forward_host"],
forward_port=payload["forward_port"],
forward_scheme=payload["forward_scheme"],
certificate_id=payload["certificate_id"],
ssl_forced=payload["ssl_forced"],
hsts_enabled=payload["hsts_enabled"],
hsts_subdomains=payload["hsts_subdomains"],
http2_support=payload["http2_support"],
block_exploits=payload["block_exploits"],
caching_enabled=payload["caching_enabled"],
allow_websocket_upgrade=payload["allow_websocket_upgrade"],
access_list_id=payload["access_list_id"],
advanced_config=payload["advanced_config"],
meta=payload["meta"],
)
return (
f"Successfully cloned proxy host from '{source_server}' to '{target_server}'!\n\n"
f"Source Host ID: {host_id}\n"
f"Target Host ID: {new_host.id}\n"
f"Domains: {', '.join(new_host.domain_names)}\n"
f"Certificate: {cert_resolved_msg}\n"
f"Access List: {access_list_resolved_msg}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def sync_access_lists(source_server: str, target_server: str) -> str:
"""Sync access lists from source_server to target_server.
Replicates missing access lists by name, carrying over credentials and IP rules.
Args:
source_server: Name of the source server.
target_server: Name of the target server.
"""
try:
source_client = _get_client(source_server)
target_client = _get_client(target_server)
# Get raw access lists from both servers to retrieve detailed items and clients
source_response = await source_client._request("GET", "/nginx/access-lists")
source_lists = source_response.json()
target_response = await target_client._request("GET", "/nginx/access-lists")
target_lists = target_response.json()
target_names = {al["name"] for al in target_lists}
synced = []
skipped = []
for al in source_lists:
name = al.get("name")
if not name:
continue
if name in target_names:
skipped.append(f"'{name}' (already exists)")
continue
# Strip database IDs and unique primary keys from items & clients to avoid conflicts
items = al.get("items", [])
cleaned_items = []
for item in items:
cleaned = item.copy()
cleaned.pop("id", None)
cleaned.pop("access_list_id", None)
cleaned.pop("created_on", None)
cleaned.pop("modified_on", None)
cleaned_items.append(cleaned)
clients = al.get("clients", [])
cleaned_clients = []
for client in clients:
cleaned = client.copy()
cleaned.pop("id", None)
cleaned.pop("access_list_id", None)
cleaned.pop("created_on", None)
cleaned.pop("modified_on", None)
cleaned_clients.append(cleaned)
# Replicate access list
await target_client.create_access_list(
name=name,
satisfy_any=al.get("satisfy_any", False),
pass_auth=al.get("pass_auth", False),
items=cleaned_items,
clients=cleaned_clients,
)
synced.append(name)
result_parts = [f"Synced access lists from '{source_server}' to '{target_server}':"]
if synced:
result_parts.append(f"✅ Created: {', '.join(synced)}")
else:
result_parts.append("No new access lists were created.")
if skipped:
result_parts.append(f"️ Matched (exists): {', '.join(skipped)}")
return "\n".join(result_parts)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def sync_certificates(source_server: str, target_server: str) -> str:
"""Sync Let's Encrypt certificates from source_server to target_server.
Matches existing certificates on the target server by domain names.
Args:
source_server: Name of the source server.
target_server: Name of the target server.
"""
try:
source_client = _get_client(source_server)
target_client = _get_client(target_server)
source_certs = await source_client.get_certificates()
target_certs = await target_client.get_certificates()
# Build target domain map for lookup
target_domains_map = {frozenset(cert.domain_names): cert for cert in target_certs}
synced = []
skipped_exists = []
skipped_custom = []
for cert in source_certs:
domains = cert.domain_names
if not domains:
continue
cert_domains_set = frozenset(domains)
# Check if matching cert exists on target
if cert_domains_set in target_domains_map:
skipped_exists.append(f"'{cert.nice_name or ', '.join(domains)}'")
continue
# Check provider type
if cert.provider != "letsencrypt":
skipped_custom.append(
f"'{cert.nice_name or ', '.join(domains)}' "
f"(custom provider: {cert.provider})"
)
continue
# Re-provision Let's Encrypt certificate
email = (
cert.meta.get("letsencrypt_email")
or cert.meta.get("email")
or settings.identity
or "admin@example.com"
)
dns_challenge = cert.meta.get("dns_challenge", False)
await target_client.create_certificate(
domain_names=domains,
email=email,
dns_challenge=dns_challenge,
)
synced.append(f"'{', '.join(domains)}'")
result_parts = [
f"Synced Let's Encrypt certificates from '{source_server}' "
f"to '{target_server}':"
]
if synced:
result_parts.append(f"✅ Provisioned: {', '.join(synced)}")
else:
result_parts.append("No new certificates were provisioned.")
if skipped_exists:
result_parts.append(f"️ Matched (exists): {', '.join(skipped_exists)}")
if skipped_custom:
result_parts.append(f"⚠️ Skipped (manual upload required): {', '.join(skipped_custom)}")
return "\n".join(result_parts)
except Exception as e:
return _format_error(e)
+200 -2
View File
@@ -1,10 +1,9 @@
"""Tests for NpmClient."""
import pytest
from httpx import Response
from npm_mcp.client import NpmClient
from npm_mcp.exceptions import NpmAuthenticationError, NpmConnectionError
from npm_mcp.exceptions import NpmAuthenticationError
@pytest.fixture
@@ -35,6 +34,53 @@ def mock_proxy_hosts():
]
@pytest.fixture
def mock_access_lists():
"""Mock access lists response."""
return [
{
"id": 1,
"created_on": "2024-01-01T00:00:00Z",
"modified_on": "2024-01-01T00:00:00Z",
"owner_user_id": 1,
"name": "Admin Only",
"satisfy_any": False,
"pass_auth": True,
},
{
"id": 2,
"created_on": "2024-01-02T00:00:00Z",
"modified_on": "2024-01-02T00:00:00Z",
"owner_user_id": 1,
"name": "Internal Network",
"satisfy_any": True,
"pass_auth": False,
},
]
@pytest.fixture
def mock_created_proxy_host():
"""Mock response for created proxy host."""
return {
"id": 42,
"created_on": "2024-01-15T10:00:00Z",
"modified_on": "2024-01-15T10:00:00Z",
"owner_user_id": 1,
"domain_names": ["newapp.example.com"],
"forward_host": "10.0.0.50",
"forward_port": 3000,
"forward_scheme": "http",
"enabled": True,
"ssl_forced": True,
"certificate_id": 24,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
}
class TestNpmClientAuth:
"""Test authentication logic."""
@@ -114,3 +160,155 @@ class TestNpmClientEndpoints:
assert hosts[0].id == 1
assert hosts[0].domain_names == ["example.com"]
assert hosts[0].forward_host == "192.168.1.100"
@pytest.mark.asyncio
async def test_get_access_lists(self, httpx_mock, mock_token_response, mock_access_lists):
"""Test fetching access lists."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/access-lists",
json=mock_access_lists,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
access_lists = await client.get_access_lists()
assert len(access_lists) == 2
assert access_lists[0].id == 1
assert access_lists[0].name == "Admin Only"
assert access_lists[0].pass_auth is True
assert access_lists[1].id == 2
assert access_lists[1].name == "Internal Network"
assert access_lists[1].satisfy_any is True
@pytest.mark.asyncio
async def test_create_proxy_host(
self, httpx_mock, mock_token_response, mock_created_proxy_host
):
"""Test creating a proxy host."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/nginx/proxy-hosts",
json=mock_created_proxy_host,
status_code=201,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
host = await client.create_proxy_host(
domain_names=["newapp.example.com"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24,
ssl_forced=True,
)
assert host.id == 42
assert host.domain_names == ["newapp.example.com"]
assert host.forward_host == "10.0.0.50"
assert host.forward_port == 3000
assert host.ssl_forced is True
assert host.certificate_id == 24
@pytest.mark.asyncio
async def test_create_access_list(self, httpx_mock, mock_token_response):
"""Test creating an access list."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/nginx/access-lists",
json={
"id": 5,
"created_on": "2024-01-01T00:00:00Z",
"modified_on": "2024-01-01T00:00:00Z",
"owner_user_id": 1,
"name": "Custom List",
"satisfy_any": True,
"pass_auth": False,
},
status_code=201,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
al = await client.create_access_list(
name="Custom List",
satisfy_any=True,
pass_auth=False,
items=[{"username": "u", "password": "p"}],
clients=[{"address": "1.1.1.1", "directive": "allow"}],
)
assert al.id == 5
assert al.name == "Custom List"
assert al.satisfy_any is True
assert al.pass_auth is False
@pytest.mark.asyncio
async def test_get_proxy_host_logs(self, httpx_mock, mock_token_response):
"""Test fetching proxy host logs via API."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/proxy-hosts/42/logs?type=access&limit=50",
json={"lines": ["line 1", "line 2"]},
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
logs = await client.get_proxy_host_logs(host_id=42, log_type="access", lines=50)
assert logs == {"lines": ["line 1", "line 2"]}
@pytest.mark.asyncio
async def test_get_proxy_host_logs_summary(self, httpx_mock, mock_token_response):
"""Test fetching proxy host logs summary via API."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/proxy-hosts/42/logs/summary",
json={"access": 100, "error": 5},
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
summary = await client.get_proxy_host_logs_summary(host_id=42)
assert summary == {"access": 100, "error": 5}
+129
View File
@@ -0,0 +1,129 @@
"""Tests for configuration handling."""
import pytest
from pydantic_settings.exceptions import SettingsError
from npm_mcp.config import DEFAULT_PROXY_SETTINGS, Settings
class TestProxyDefaults:
"""Test NPM_PROXY_DEFAULTS parsing and merging."""
def test_default_proxy_settings(self):
"""Test that default settings are correct."""
settings = Settings(identity="test", secret="test")
defaults = settings.get_proxy_defaults()
assert defaults["forward_scheme"] == "http"
assert defaults["certificate_id"] == 0
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True
assert defaults["allow_websocket_upgrade"] is True
assert defaults["access_list_id"] == 0
assert defaults["advanced_config"] == ""
def test_proxy_defaults_json_parsing(self, monkeypatch):
"""Test parsing JSON string from environment variable."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", '{"certificate_id": 24, "ssl_forced": false}')
settings = Settings()
defaults = settings.get_proxy_defaults()
# Overridden values
assert defaults["certificate_id"] == 24
assert defaults["ssl_forced"] is False
# Default values preserved
assert defaults["forward_scheme"] == "http"
assert defaults["block_exploits"] is True
def test_proxy_defaults_dict_passthrough(self):
"""Test that dict values pass through correctly."""
settings = Settings(
identity="test",
secret="test",
proxy_defaults={"certificate_id": 18, "access_list_id": 5},
)
defaults = settings.get_proxy_defaults()
assert defaults["certificate_id"] == 18
assert defaults["access_list_id"] == 5
def test_proxy_defaults_empty_env_raises(self, monkeypatch):
"""Test that empty string env var raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "")
# pydantic-settings tries to JSON decode empty string and fails
with pytest.raises(SettingsError):
Settings()
def test_proxy_defaults_invalid_json_raises(self, monkeypatch):
"""Test that invalid JSON raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "{not valid json}")
# pydantic-settings tries to JSON decode and fails
with pytest.raises(SettingsError):
Settings()
def test_proxy_defaults_merges_not_replaces(self):
"""Test that user defaults merge with base defaults."""
settings = Settings(
identity="test",
secret="test",
proxy_defaults={"certificate_id": 24},
)
defaults = settings.get_proxy_defaults()
# All keys should be present
assert set(defaults.keys()) == set(DEFAULT_PROXY_SETTINGS.keys())
# User value applied
assert defaults["certificate_id"] == 24
# Other defaults preserved
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True
class TestMultiServerConfig:
"""Test multi-server configuration parsing."""
def test_servers_empty_by_default(self):
"""Test that servers is empty by default."""
s = Settings(identity="test", secret="test")
assert s.servers == []
assert s.default_server is None
def test_servers_json_parsing(self, monkeypatch):
"""Test parsing servers JSON list from environment variable."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv(
"NPM_SERVERS",
'[{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"}, '
'{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"}]'
)
monkeypatch.setenv("NPM_DEFAULT_SERVER", "prod")
s = Settings()
assert len(s.servers) == 2
assert s.servers[0]["name"] == "prod"
assert s.servers[0]["url"] == "http://prod:81/api"
assert s.servers[1]["name"] == "dev"
assert s.default_server == "prod"
def test_servers_invalid_json_raises(self, monkeypatch):
"""Test that invalid servers JSON raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_SERVERS", "{not valid}")
with pytest.raises(SettingsError):
Settings()
+146
View File
@@ -0,0 +1,146 @@
"""Tests for the log reader module."""
import pytest
from npm_mcp.exceptions import NpmLogError
from npm_mcp.logs import (
is_log_dir_configured,
list_available_logs,
read_log_lines,
)
SAMPLE_ACCESS_LOG = (
'[22/May/2025:10:00:01 +0000] - 200 200 - GET https app.example.com'
' "/" [Client 10.0.0.1] [Length 1542] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:02 +0000] - 301 301 - GET http app.example.com'
' "/old-path" [Client 10.0.0.2] [Length 0] "curl/7.88" "-"\n'
'[22/May/2025:10:00:03 +0000] - 404 404 - GET https app.example.com'
' "/missing" [Client 10.0.0.1] [Length 548] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:04 +0000] - 200 200 - POST https app.example.com'
' "/api/data" [Client 10.0.0.3] [Length 256] "python-requests/2.31" "-"\n'
'[22/May/2025:10:00:05 +0000] - 502 502 - GET https app.example.com'
' "/health" [Client 10.0.0.1] [Length 166] "kube-probe/1.28" "-"\n'
)
SAMPLE_ERROR_LOG = (
"2025/05/22 10:00:05 [error] 42#42: *123 connect() failed"
" (111: Connection refused) while connecting to upstream,"
" client: 10.0.0.1, server: app.example.com\n"
)
@pytest.fixture
def log_dir(tmp_path, monkeypatch):
"""Create a temp log directory with sample log files."""
logs = tmp_path / "logs"
logs.mkdir()
(logs / "proxy-host-5_access.log").write_text(SAMPLE_ACCESS_LOG)
(logs / "proxy-host-5_error.log").write_text(SAMPLE_ERROR_LOG)
(logs / "proxy-host-12_access.log").write_text("")
(logs / "fallback_error.log").write_text("global error\n")
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", str(logs))
return logs
@pytest.fixture
def no_log_dir(monkeypatch):
"""Ensure log_dir is unconfigured."""
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "")
class TestReadLogLines:
def test_read_access_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="access")
assert result["host_id"] == 5
assert result["log_type"] == "access"
assert result["file"] == "proxy-host-5_access.log"
assert result["returned_lines"] == 5
assert result["total_lines_in_file"] == 5
assert "app.example.com" in result["lines"][0]
def test_read_error_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="error")
assert result["log_type"] == "error"
assert result["returned_lines"] == 1
assert "Connection refused" in result["lines"][0]
def test_lines_limit(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=2)
assert result["returned_lines"] == 2
assert "10:00:04" in result["lines"][0]
assert "10:00:05" in result["lines"][1]
def test_lines_capped_at_max(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=9999)
assert result["returned_lines"] == 5
def test_search_filter(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="404")
assert result["returned_lines"] == 1
assert result["matched_lines"] == 1
assert result["total_lines_in_file"] is None
assert "/missing" in result["lines"][0]
def test_search_case_insensitive(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="MOZILLA")
assert result["returned_lines"] == 2
def test_search_by_ip(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="10.0.0.1")
assert result["returned_lines"] == 3
def test_nonexistent_host(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=999, log_type="access")
def test_empty_log_file(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=12, log_type="error")
def test_invalid_log_type(self, log_dir):
with pytest.raises(NpmLogError, match="Invalid log type"):
read_log_lines(host_id=5, log_type="combined")
def test_no_log_dir_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
read_log_lines(host_id=5, log_type="access")
def test_nonexistent_log_dir(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent/path")
with pytest.raises(NpmLogError, match="does not exist"):
read_log_lines(host_id=5, log_type="access")
class TestIsLogDirConfigured:
def test_configured(self, log_dir):
assert is_log_dir_configured() is True
def test_not_configured(self, no_log_dir):
assert is_log_dir_configured() is False
def test_configured_but_missing(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent")
assert is_log_dir_configured() is False
class TestListAvailableLogs:
def test_lists_proxy_host_logs_only(self, log_dir):
results = list_available_logs()
files = {r["file"] for r in results}
assert "proxy-host-5_access.log" in files
assert "proxy-host-5_error.log" in files
assert "proxy-host-12_access.log" in files
assert "fallback_error.log" not in files
def test_correct_metadata(self, log_dir):
results = list_available_logs()
access_5 = next(r for r in results if r["file"] == "proxy-host-5_access.log")
assert access_5["host_id"] == 5
assert access_5["log_type"] == "access"
assert access_5["size_bytes"] > 0
def test_not_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
list_available_logs()
+97
View File
@@ -0,0 +1,97 @@
"""Tests for ServerRegistry."""
import pytest
from npm_mcp.config import settings
from npm_mcp.server import ServerRegistry
def test_registry_fallback_to_single_server(monkeypatch):
"""Test that registry falls back to single-server settings when empty."""
monkeypatch.setattr(settings, "api_url", "http://test-url:81/api")
monkeypatch.setattr(settings, "identity", "test-user")
monkeypatch.setattr(settings, "secret", "test-pass")
registry = ServerRegistry(configs=[], default=None)
assert registry.list_names() == ["default"]
assert registry.get_default() == "default"
client = registry.get()
assert client.base_url == "http://test-url:81/api"
assert client._identity == "test-user"
def test_registry_multiple_servers():
"""Test that multiple servers are correctly registered."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"},
]
registry = ServerRegistry(configs=configs, default="prod")
assert set(registry.list_names()) == {"prod", "dev"}
assert registry.get_default() == "prod"
prod_client = registry.get("prod")
assert prod_client.base_url == "http://prod:81/api"
dev_client = registry.get("dev")
assert dev_client.base_url == "http://dev:81/api"
def test_registry_get_default_fallback():
"""Test that get() falls back to default server when name is None/empty."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"},
]
registry = ServerRegistry(configs=configs, default="dev")
# Name is None
client = registry.get(None)
assert client.base_url == "http://dev:81/api"
# Name is empty string
client_empty = registry.get("")
assert client_empty.base_url == "http://dev:81/api"
def test_registry_single_client_no_default_specified():
"""Test that get() succeeds if there is only 1 server, even if no default is specified."""
configs = [
{"name": "only-one", "url": "http://only:81/api", "identity": "o", "secret": "os"}
]
registry = ServerRegistry(configs=configs, default=None)
assert registry.get_default() is None
client = registry.get()
assert client.base_url == "http://only:81/api"
def test_registry_multiple_clients_no_default_raises():
"""Test that get() raises KeyError if multiple servers are defined but no default is set."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
{"name": "dev", "url": "http://dev:81/api", "identity": "d", "secret": "ds"},
]
registry = ServerRegistry(configs=configs, default=None)
with pytest.raises(KeyError, match="Multiple servers configured but no default server"):
registry.get()
def test_registry_invalid_name_raises():
"""Test that get() raises KeyError for non-existent server names."""
configs = [
{"name": "prod", "url": "http://prod:81/api", "identity": "p", "secret": "ps"},
]
registry = ServerRegistry(configs=configs, default="prod")
with pytest.raises(KeyError, match="Server 'non-existent' not found"):
registry.get("non-existent")
+315
View File
@@ -0,0 +1,315 @@
"""Tests for multi-server management and sync tools."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from npm_mcp.models import AccessList, Certificate, HealthStatus, ProxyHost
from npm_mcp.server import (
clone_proxy_host,
get_proxy_host_logs,
list_servers,
sync_access_lists,
sync_certificates,
)
@pytest.fixture
def mock_registry():
"""Mock registry with prod and dev servers."""
reg = MagicMock()
reg.list_names.return_value = ["prod", "dev"]
reg.get_default.return_value = "prod"
return reg
@pytest.mark.asyncio
async def test_list_servers(mock_registry):
"""Test list_servers tool output and health query."""
client_prod = MagicMock()
client_prod.get_status = AsyncMock(
return_value=HealthStatus(status="online", version={"major": "2"})
)
client_dev = MagicMock()
client_dev.get_status = AsyncMock(side_effect=Exception("Connection failed"))
mock_registry.get.side_effect = lambda name: client_prod if name == "prod" else client_dev
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result_json = await list_servers()
result = json.loads(result_json)
assert result["servers"] == ["prod", "dev"]
assert result["default_server"] == "prod"
assert result["health"]["prod"]["status"] == "online"
assert result["health"]["dev"]["status"] == "error"
assert "Connection failed" in result["health"]["dev"]["error"]
@pytest.mark.asyncio
async def test_clone_proxy_host(mock_registry):
"""Test clone_proxy_host tool with cert and access list resolution."""
source_client = MagicMock()
target_client = MagicMock()
mock_registry.get.side_effect = lambda name: source_client if name == "prod" else target_client
# Source host setup
source_host = ProxyHost(
id=12,
created_on="2024-01-01T00:00:00Z",
modified_on="2024-01-01T00:00:00Z",
owner_user_id=1,
domain_names=["test.example.com"],
forward_host="192.168.1.50",
forward_port=8080,
forward_scheme="http",
certificate_id=10,
access_list_id=5,
ssl_forced=True,
hsts_enabled=True,
hsts_subdomains=False,
http2_support=True,
block_exploits=True,
caching_enabled=False,
allow_websocket_upgrade=True,
advanced_config="my advanced config",
meta={"key": "val"},
)
source_client.get_proxy_host = AsyncMock(return_value=source_host)
# Source dependencies setup
source_cert = Certificate(
id=10,
nice_name="wildcard-example",
domain_names=["*.example.com"],
provider="letsencrypt",
)
source_client.get_certificate = AsyncMock(return_value=source_cert)
source_alists = [
AccessList(
id=5,
name="Staging Auth",
created_on="2024-01-01T00:00:00Z",
modified_on="2024-01-01T00:00:00Z",
)
]
source_client.get_access_lists = AsyncMock(return_value=source_alists)
# Target dependency search results
target_certs = [
Certificate(
id=100,
nice_name="wildcard-example",
domain_names=["*.example.com"],
provider="letsencrypt",
)
]
target_client.get_certificates = AsyncMock(return_value=target_certs)
target_alists = [
AccessList(
id=500,
name="Staging Auth",
created_on="2024-01-02T00:00:00Z",
modified_on="2024-01-02T00:00:00Z",
)
]
target_client.get_access_lists = AsyncMock(return_value=target_alists)
# Mock creation on target
cloned_host = ProxyHost(
id=999,
created_on="2024-01-03T00:00:00Z",
modified_on="2024-01-03T00:00:00Z",
owner_user_id=1,
domain_names=["test.example.com"],
forward_host="192.168.1.50",
forward_port=8080,
)
target_client.create_proxy_host = AsyncMock(return_value=cloned_host)
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result = await clone_proxy_host(
source_server="prod",
target_server="dev",
host_id=12,
override_settings={"forward_host": "10.0.0.10"},
)
assert "Successfully cloned" in result
assert "Source Host ID: 12" in result
assert "Target Host ID: 999" in result
assert "Resolved to ID 100" in result
assert "Resolved to ID 500" in result
target_client.create_proxy_host.assert_called_once_with(
domain_names=["test.example.com"],
forward_host="10.0.0.10", # Overridden!
forward_port=8080,
forward_scheme="http",
certificate_id=100, # Resolved!
ssl_forced=True,
hsts_enabled=True,
hsts_subdomains=False,
http2_support=True,
block_exploits=True,
caching_enabled=False,
allow_websocket_upgrade=True,
access_list_id=500, # Resolved!
advanced_config="my advanced config",
meta={"key": "val"},
)
@pytest.mark.asyncio
async def test_sync_access_lists(mock_registry):
"""Test sync_access_lists replicates missing access lists with credentials/IPs."""
source_client = MagicMock()
target_client = MagicMock()
mock_registry.get.side_effect = lambda name: source_client if name == "prod" else target_client
# Source returns raw JSON including items & clients
source_mock_response = MagicMock()
source_mock_response.json.return_value = [
{
"id": 1,
"name": "Staging Auth",
"satisfy_any": False,
"pass_auth": True,
"items": [
{"id": 10, "access_list_id": 1, "username": "u", "password": "p"}
],
"clients": [
{"id": 20, "access_list_id": 1, "address": "1.1.1.1", "directive": "allow"}
],
},
{
"id": 2,
"name": "Already Synced",
"satisfy_any": True,
"pass_auth": False,
}
]
source_client._request = AsyncMock(return_value=source_mock_response)
# Target returns raw JSON showing "Already Synced" exists
target_mock_response = MagicMock()
target_mock_response.json.return_value = [{"id": 99, "name": "Already Synced"}]
target_client._request = AsyncMock(return_value=target_mock_response)
target_client.create_access_list = AsyncMock()
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result = await sync_access_lists(source_server="prod", target_server="dev")
assert "Created: Staging Auth" in result
assert "Matched (exists): 'Already Synced' (already exists)" in result
# Verify items and clients were stripped of database IDs
target_client.create_access_list.assert_called_once_with(
name="Staging Auth",
satisfy_any=False,
pass_auth=True,
items=[{"username": "u", "password": "p"}],
clients=[{"address": "1.1.1.1", "directive": "allow"}],
)
@pytest.mark.asyncio
async def test_sync_certificates(mock_registry):
"""Test sync_certificates provisions Let's Encrypt and skips custom certs."""
source_client = MagicMock()
target_client = MagicMock()
mock_registry.get.side_effect = lambda name: source_client if name == "prod" else target_client
# Source certificates
source_certs = [
Certificate(
id=1,
nice_name="le-cert",
domain_names=["le.example.com"],
provider="letsencrypt",
meta={"letsencrypt_email": "le@test.com", "dns_challenge": True},
),
Certificate(
id=2,
nice_name="custom-cert",
domain_names=["custom.example.com"],
provider="other-provider",
),
Certificate(
id=3,
nice_name="already-on-target",
domain_names=["existing.example.com"],
provider="letsencrypt",
)
]
source_client.get_certificates = AsyncMock(return_value=source_certs)
# Target certificates
target_certs = [
Certificate(
id=10,
nice_name="already-on-target",
domain_names=["existing.example.com"],
provider="letsencrypt",
)
]
target_client.get_certificates = AsyncMock(return_value=target_certs)
target_client.create_certificate = AsyncMock()
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
result = await sync_certificates(source_server="prod", target_server="dev")
assert "Provisioned: 'le.example.com'" in result
assert "Matched (exists): 'already-on-target'" in result
assert "Skipped (manual upload required): 'custom-cert'" in result
target_client.create_certificate.assert_called_once_with(
domain_names=["le.example.com"],
email="le@test.com",
dns_challenge=True,
)
@pytest.mark.asyncio
async def test_get_proxy_host_logs_api(mock_registry):
"""Test get_proxy_host_logs tool queries the API for logs first."""
client = MagicMock()
mock_registry.get.return_value = client
# Mock host details
host = ProxyHost(
id=5,
created_on="2024-01-01T00:00:00Z",
modified_on="2024-01-01T00:00:00Z",
owner_user_id=1,
domain_names=["test.example.com"],
forward_host="192.168.1.50",
forward_port=8080,
)
client.get_proxy_host = AsyncMock(return_value=host)
# Mock API logs endpoint
client.get_proxy_host_logs = AsyncMock(return_value={
"lines": ["Log line A", "Log line B", "Filter me out"]
})
with patch("npm_mcp.server.get_registry", return_value=mock_registry):
# Retrieve logs with search filter
result = await get_proxy_host_logs(
host_id=5, log_type="access", lines=10, search="Log line"
)
assert "test.example.com" in result
assert "(retrieved via API)" in result
assert "Log line A" in result
assert "Log line B" in result
assert "Filter me out" not in result
assert "Showing last 2 lines:" in result
Generated
+1 -1
View File
@@ -313,7 +313,7 @@ wheels = [
[[package]]
name = "npm-mcp"
version = "0.1.0"
version = "0.0.2"
source = { editable = "." }
dependencies = [
{ name = "httpx" },