6 Commits

Author SHA1 Message Date
5e89176806 feat: Add get_proxy_host_logs tool for reading nginx proxy host logs
Reads nginx access/error logs directly from a mounted NPM log directory,
enabling agents to debug proxy issues without SSH access. Requires mounting
NPM's /data/logs volume and setting NPM_LOG_DIR.

Also includes a feature request PRD for proposing a native log API upstream.
2026-05-22 14:27:37 +00:00
c3227c3a5f fix: Disable Docker push on PR builds and update README
Fork PRs don't have write access to GHCR. Use conditional push
that only pushes on non-PR events (push to main, tags).

Also update README with new tools from v0.0.3:
- update_proxy_host
- create_certificate
2026-02-12 04:06:58 +00:00
32f57b1a9e fix: Disable Docker push on PR builds
Fork PRs don't have write access to GHCR. Use conditional push
that only pushes on non-PR events (push to main, tags).
2026-02-12 03:51:18 +00:00
Jordan Réjaud
f81bf796a6 Fix Pydantic backward compatibility and add update/certificate tools (#3)
- Make Owner and Certificate model fields optional with defaults to fix
  parsing errors when NPM API returns null/missing nested objects
- Add update_proxy_host tool for modifying existing proxy host configs
- Add create_certificate tool for provisioning Let's Encrypt SSL certs
- Add corresponding client methods with full parameter support
2026-02-11 21:47:54 -06:00
Ben
15b2876d7b chore: Update version to 0.0.2 2025-12-24 21:23:15 +00:00
b3nw
52eb484432 feat: Add proxy host creation and access list tools (#1)
Add MCP tools for creating proxy hosts and listing access lists, with configurable defaults via environment variable.

New tools:
- create_proxy_host: Create new proxy hosts with SSL, access control, and websocket support
- list_access_lists: List available access lists for use in proxy host creation

New features:
- NPM_PROXY_DEFAULTS env var for configurable default values (certificate_id, access_list_id, ssl settings, etc.)
2025-12-24 15:20:45 -06:00
15 changed files with 1311 additions and 26 deletions

View File

@@ -26,7 +26,6 @@ jobs:
uses: docker/setup-buildx-action@v3 uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry - name: Log in to Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
registry: ${{ env.REGISTRY }} registry: ${{ env.REGISTRY }}

127
README.md
View File

@@ -49,8 +49,42 @@ NPM_SECRET=yourpassword
# Optional: Server settings # Optional: Server settings
NPM_MCP_PORT=8000 NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # or "http" NPM_MCP_TRANSPORT=stdio # or "http"
# Optional: Default values for create_proxy_host (JSON)
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
``` ```
### Environment Variables
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `NPM_API_URL` | Yes | `http://localhost:81/api` | NPM API endpoint |
| `NPM_IDENTITY` | Yes | - | NPM user email |
| `NPM_SECRET` | Yes | - | NPM user password |
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
| `NPM_LOG_DIR` | No | - | Path to mounted NPM log directory (enables `get_proxy_host_logs`) |
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
### NPM_PROXY_DEFAULTS Keys
Configure default values for proxy host creation:
```bash
NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true, "block_exploits": true}'
```
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `forward_scheme` | string | `"http"` | Backend protocol (`http` or `https`) |
| `certificate_id` | int | `0` | SSL certificate ID (use `list_certificates` to find) |
| `ssl_forced` | bool | `true` | Force HTTPS redirect |
| `block_exploits` | bool | `true` | Enable common exploit blocking |
| `allow_websocket_upgrade` | bool | `true` | Allow WebSocket connections |
| `access_list_id` | int | `0` | Access list ID (use `list_access_lists` to find) |
| `advanced_config` | string | `""` | Custom nginx configuration block |
## Usage ## Usage
### Stdio Mode (for Claude Desktop, etc.) ### Stdio Mode (for Claude Desktop, etc.)
@@ -93,8 +127,101 @@ Add to your `claude_desktop_config.json`:
|------|-------------| |------|-------------|
| `list_proxy_hosts` | List all proxy hosts | | `list_proxy_hosts` | List all proxy hosts |
| `get_proxy_host_details` | Get full config for a specific host | | `get_proxy_host_details` | Get full config for a specific host |
| `get_proxy_host_logs` | Retrieve nginx access/error logs for a proxy host (requires log mount) |
| `get_system_health` | Check NPM version and status | | `get_system_health` | Check NPM version and status |
| `search_audit_logs` | Query audit log entries | | `search_audit_logs` | Query audit log entries |
| `list_certificates` | List SSL certificates |
| `list_access_lists` | List access lists for authentication/IP restrictions |
| `create_proxy_host` | Create a new proxy host |
| `update_proxy_host` | Update an existing proxy host (v0.0.3+) |
| `create_certificate` | Provision a new Let's Encrypt SSL certificate (v0.0.3+) |
## Log Access Setup
The `get_proxy_host_logs` tool reads nginx log files directly from disk. Since NPM has no API for log retrieval, you need to mount NPM's log directory into the MCP container.
NPM writes per-host logs to `/data/logs/` inside its container:
- `proxy-host-{id}_access.log` — HTTP request log (client IP, status, path, user agent)
- `proxy-host-{id}_error.log` — nginx error log (upstream failures, config issues)
### Docker Compose (same stack)
If NPM and the MCP server share a compose stack with a named volume:
```yaml
services:
nginx-proxy-manager:
image: jc21/nginx-proxy-manager:latest
volumes:
- npm_data:/data
npm-mcp:
image: ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
environment:
- NPM_API_URL=http://nginx-proxy-manager:81/api
- NPM_IDENTITY=admin@example.com
- NPM_SECRET=yourpassword
- NPM_LOG_DIR=/data/npm-logs
volumes:
# Mount NPM's /data volume — logs are in /data/logs/ inside it
- npm_data:/data/npm-logs:ro
depends_on:
- nginx-proxy-manager
volumes:
npm_data:
```
> **Note:** NPM stores logs under `/data/logs/` inside its data volume. When you
> mount the full `/data` volume to `/data/npm-logs`, the MCP server looks for logs at
> `/data/npm-logs/logs/`. Set `NPM_LOG_DIR` to match your mount path plus `/logs`.
If you mounted the full data volume:
```bash
NPM_LOG_DIR=/data/npm-logs/logs
```
### Bind Mount (separate stacks)
If NPM uses a bind mount (e.g., `./npm-data:/data`), mount the logs subdirectory directly:
```yaml
npm-mcp:
volumes:
- /path/to/npm-data/logs:/data/npm-logs:ro
environment:
- NPM_LOG_DIR=/data/npm-logs
```
### Docker Run
```bash
docker run -d \
--name npm-mcp \
-p 8000:8000 \
-v npm_data:/data/npm-logs:ro \
-e NPM_API_URL=http://your-npm:81/api \
-e NPM_IDENTITY=admin@example.com \
-e NPM_SECRET=yourpassword \
-e NPM_LOG_DIR=/data/npm-logs/logs \
-e NPM_MCP_TRANSPORT=http \
ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
```
### Local Development (non-Docker)
Point `NPM_LOG_DIR` at wherever NPM's logs are on your filesystem:
```bash
NPM_LOG_DIR=/path/to/npm/data/logs npm-mcp
```
### Verifying the Mount
After starting, call `get_system_health` — if the log directory is mounted and accessible
the tool will confirm it. You can also call `get_proxy_host_logs` with any host ID to
verify logs are readable.
## Development ## Development

View File

@@ -16,6 +16,11 @@ services:
- NPM_MCP_TRANSPORT=http - NPM_MCP_TRANSPORT=http
- NPM_MCP_HOST=0.0.0.0 - NPM_MCP_HOST=0.0.0.0
- NPM_MCP_PORT=8000 - NPM_MCP_PORT=8000
# Optional: Log access (requires volume mount below)
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# # Mount NPM's log directory for get_proxy_host_logs tool (read-only)
# - npm_data:/data/npm-logs:ro # named volume — see "Log Access" in README
# Uncomment to use .env file instead of inline environment # Uncomment to use .env file instead of inline environment
# env_file: # env_file:
# - .env # - .env
@@ -26,7 +31,7 @@ services:
retries: 3 retries: 3
start_period: 10s start_period: 10s
# Example: Running alongside NPM in same compose stack # Example: Running alongside NPM in same compose stack (with log access)
# services: # services:
# nginx-proxy-manager: # nginx-proxy-manager:
# image: jc21/nginx-proxy-manager:latest # image: jc21/nginx-proxy-manager:latest
@@ -44,6 +49,9 @@ services:
# - NPM_API_URL=http://nginx-proxy-manager:81/api # - NPM_API_URL=http://nginx-proxy-manager:81/api
# - NPM_IDENTITY=admin@example.com # - NPM_IDENTITY=admin@example.com
# - NPM_SECRET=changeme # - NPM_SECRET=changeme
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# - npm_data:/data/npm-logs:ro
# depends_on: # depends_on:
# - nginx-proxy-manager # - nginx-proxy-manager
# #

View File

@@ -0,0 +1,176 @@
# Feature Request: Per-Host Log Retrieval API
**Project:** [NginxProxyManager/nginx-proxy-manager](https://github.com/NginxProxyManager/nginx-proxy-manager)
**Type:** Feature Request
**Status:** Draft PRD
## Problem Statement
Nginx Proxy Manager writes per-host access and error logs to predictable paths on disk
(`/data/logs/proxy-host-{id}_access.log`, `/data/logs/proxy-host-{id}_error.log`), but
provides no API to read them. The only log-related API is the audit log (`GET /api/audit-log`),
which tracks admin configuration changes — not HTTP traffic.
This means operators and automation tools have no programmatic way to:
- Retrieve recent access log entries for a specific proxy host
- Check error logs when debugging upstream connectivity issues
- Monitor traffic patterns or detect anomalies through the existing API surface
The only workarounds today are direct filesystem access (requiring volume mounts or
`docker exec`) or external log aggregation pipelines, both of which add significant
operational complexity for a task that should be simple.
## Proposed Solution
Add REST API endpoints to retrieve nginx access and error logs for individual proxy hosts.
### New Endpoints
#### `GET /api/nginx/proxy-hosts/{id}/logs`
Retrieve log entries for a specific proxy host.
**Query Parameters:**
| Parameter | Type | Default | Description |
|------------|--------|------------|-------------------------------------------------------|
| `type` | string | `"access"` | Log type: `"access"` or `"error"` |
| `lines` | int | `100` | Number of most recent lines to return (max: `1000`) |
| `search` | string | - | Filter lines containing this substring |
| `since` | string | - | ISO 8601 timestamp — only return lines after this time|
**Response (200):**
```json
{
"host_id": 5,
"log_type": "access",
"file": "proxy-host-5_access.log",
"total_lines": 4821,
"returned_lines": 100,
"lines": [
"[01/Jun/2025:14:22:31 +0000] HIT 200 200 - GET https app.example.com \"/api/data\" [Client 10.0.0.1] [Length 1542] [Gzip -] [Sent-to 192.168.1.50] \"Mozilla/5.0\" \"https://app.example.com/\"",
"..."
]
}
```
**Error Responses:**
| Status | Condition |
|--------|-------------------------------------|
| 404 | Proxy host not found |
| 404 | Log file does not exist |
| 403 | User lacks permission for this host |
#### `GET /api/nginx/proxy-hosts/{id}/logs/summary`
Return a statistical summary of recent traffic for a proxy host.
**Response (200):**
```json
{
"host_id": 5,
"period": "last_1000_lines",
"status_codes": {"200": 812, "301": 45, "404": 23, "500": 3},
"top_paths": ["/api/data", "/", "/login"],
"top_clients": ["10.0.0.1", "10.0.0.5"],
"cache_hit_rate": 0.42,
"access_log_size_bytes": 524288,
"error_log_size_bytes": 8192
}
```
### Permissions
| Permission | Description |
|--------------------|--------------------------------------------|
| `proxy-hosts:logs` | Read logs for proxy hosts the user can view |
Admin users can read logs for any host. Non-admin users can only read logs
for hosts they own, consistent with existing proxy host permissions.
### Backend Implementation Notes
The implementation is straightforward because log paths are already deterministic
and hardcoded in nginx templates:
```javascript
// backend/templates/proxy_host.conf
access_log /data/logs/proxy-host-{{ id }}_access.log proxy;
error_log /data/logs/proxy-host-{{ id }}_error.log warn;
```
A minimal implementation would:
1. Add a new route in `backend/routes/` (e.g., `proxy-host-logs.js`)
2. Verify the proxy host exists and the user has access
3. Read the last N lines from the log file using a reverse-reader (or `tail`-like approach)
4. Optionally filter lines by substring or timestamp
5. Return as JSON
**Reference files for implementation:**
- `backend/routes/nginx/proxy_hosts.js` — existing proxy host routes and permission model
- `backend/templates/proxy_host.conf` — log path template confirming the naming convention
- `backend/lib/access/` — permission definition files
- `docker/rootfs/etc/logrotate.d/nginx-proxy-manager` — log rotation config (rotated logs
have `.1`, `.2.gz` suffixes)
### Log Rotation Consideration
NPM rotates logs weekly (access: 4 rotations, error: 10 rotations). The API should
read only the current (unrotated) log file. Rotated archives (`.1`, `.2.gz`) could be
supported in a future iteration but are not required for the initial implementation.
## Motivation
### Use Cases
1. **MCP/AI Agent Integration** — MCP servers wrapping the NPM API (like
[nginx-proxy-manager-mcp](https://github.com/b3nw/nginx-proxy-manager-mcp)) can
expose log retrieval to AI assistants for debugging proxy issues conversationally.
2. **Quick Debugging** — When a reverse proxy returns errors, operators need to quickly
check the access and error logs for that specific host. Today this requires SSH/exec
access to the container.
3. **Monitoring Dashboards** — Custom dashboards can poll the log endpoint for
traffic summaries without deploying a full log aggregation stack.
4. **Automation** — CI/CD pipelines and health-check scripts can verify that traffic
is flowing correctly to newly deployed services behind NPM.
### Why Not External Log Aggregation?
External solutions (Loki, ELK, Fluentd) are powerful but heavy. Many NPM users run
single-host homelab setups where a full log pipeline is disproportionate to the need.
A built-in API covers 80% of use cases with zero additional infrastructure.
## Alternatives Considered
| Approach | Pros | Cons |
|---------------------------|-----------------------------|-------------------------------------------------|
| **API endpoint (proposed)** | Native, zero extra infra | Requires upstream PR |
| Volume mount + file read | Works today, no NPM changes | Tight coupling, no access control, not portable |
| Docker exec | Works today | Requires Docker socket, security risk |
| Sidecar log server | Decoupled | Extra container, extra config, extra maintenance |
## Scope
### In Scope (v1)
- `GET /api/nginx/proxy-hosts/{id}/logs` with `type`, `lines`, `search` params
- Permission checks consistent with existing proxy host access model
- Current (unrotated) log file only
### Out of Scope (Future)
- Log streaming via WebSocket or SSE
- Rotated log archive access (`.gz` files)
- Log summary/analytics endpoint
- Redirection host, dead host, and stream logs (same pattern, easy to add later)
- Log download as file attachment
- Log retention configuration via API

View File

@@ -7,3 +7,13 @@ NPM_SECRET=changeme
NPM_MCP_HOST=0.0.0.0 NPM_MCP_HOST=0.0.0.0
NPM_MCP_PORT=8000 NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # stdio or http NPM_MCP_TRANSPORT=stdio # stdio or http
# Log Access (optional)
# Mount NPM's /data/logs directory and set this to the mount path.
# Enables the get_proxy_host_logs tool for reading nginx access/error logs.
# NPM_LOG_DIR=/data/npm-logs
# Proxy Host Creation Defaults (JSON)
# Set default values for create_proxy_host tool parameters
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
# NPM_PROXY_DEFAULTS='{"certificate_id": 0, "ssl_forced": true, "block_exploits": true, "allow_websocket_upgrade": true}'

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "npm-mcp" name = "npm-mcp"
version = "0.1.0" version = "0.0.2"
description = "MCP server for Nginx Proxy Manager" description = "MCP server for Nginx Proxy Manager"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View File

@@ -13,6 +13,7 @@ from .exceptions import (
NpmNotFoundError, NpmNotFoundError,
) )
from .models import ( from .models import (
AccessList,
AuditLogEntry, AuditLogEntry,
Certificate, Certificate,
HealthStatus, HealthStatus,
@@ -92,9 +93,7 @@ class NpmClient:
raise NpmAuthenticationError("Invalid credentials") raise NpmAuthenticationError("Invalid credentials")
if response.status_code != 200: if response.status_code != 200:
raise NpmApiError( raise NpmApiError(f"Login failed: {response.text}", status_code=response.status_code)
f"Login failed: {response.text}", status_code=response.status_code
)
data = response.json() data = response.json()
token_response = TokenResponse(**data) token_response = TokenResponse(**data)
@@ -172,9 +171,7 @@ class NpmClient:
raise NpmNotFoundError(f"Resource not found: {endpoint}") raise NpmNotFoundError(f"Resource not found: {endpoint}")
if response.status_code >= 400: if response.status_code >= 400:
raise NpmApiError( raise NpmApiError(f"API error: {response.text}", status_code=response.status_code)
f"API error: {response.text}", status_code=response.status_code
)
return response return response
@@ -259,3 +256,139 @@ class NpmClient:
) )
data = response.json() data = response.json()
return [AuditLogEntry(**entry) for entry in data] return [AuditLogEntry(**entry) for entry in data]
async def get_access_lists(self) -> list[AccessList]:
"""Get all access lists."""
response = await self._request("GET", "/nginx/access-lists")
data = response.json()
return [AccessList(**item) for item in data]
async def create_proxy_host(
self,
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str = "http",
certificate_id: int | None = None,
ssl_forced: bool = True,
hsts_enabled: bool = True,
hsts_subdomains: bool = False,
http2_support: bool = True,
block_exploits: bool = True,
caching_enabled: bool = False,
allow_websocket_upgrade: bool = True,
access_list_id: int = 0,
advanced_config: str = "",
meta: dict | None = None,
) -> ProxyHost:
"""Create a new proxy host.
Args:
domain_names: List of domain names for this host
forward_host: Backend host to forward to
forward_port: Backend port to forward to
forward_scheme: http or https
certificate_id: SSL certificate ID (0 for none, use list_certificates to find)
ssl_forced: Force SSL/HTTPS
hsts_enabled: Enable HSTS
hsts_subdomains: Include subdomains in HSTS
http2_support: Enable HTTP/2
block_exploits: Enable exploit blocking
caching_enabled: Enable caching
allow_websocket_upgrade: Allow WebSocket upgrades
access_list_id: Access list ID (0 for none, use list_access_lists to find)
advanced_config: Custom nginx configuration
meta: Additional metadata
Returns:
Created ProxyHost object
"""
payload = {
"domain_names": domain_names,
"forward_host": forward_host,
"forward_port": forward_port,
"forward_scheme": forward_scheme,
"certificate_id": certificate_id or 0,
"ssl_forced": ssl_forced,
"hsts_enabled": hsts_enabled,
"hsts_subdomains": hsts_subdomains,
"http2_support": http2_support,
"block_exploits": block_exploits,
"caching_enabled": caching_enabled,
"allow_websocket_upgrade": allow_websocket_upgrade,
"access_list_id": access_list_id,
"advanced_config": advanced_config,
"meta": meta or {},
}
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
return ProxyHost(**response.json())
async def update_proxy_host(
self,
host_id: int,
**kwargs,
) -> ProxyHost:
"""Update an existing proxy host.
Args:
host_id: The proxy host ID to update
**kwargs: Fields to update (same as create_proxy_host)
Returns:
Updated ProxyHost object
"""
# Get existing host to merge with updates
existing = await self.get_proxy_host(host_id)
payload = {
"domain_names": existing.domain_names,
"forward_host": existing.forward_host,
"forward_port": existing.forward_port,
"forward_scheme": existing.forward_scheme,
"certificate_id": existing.certificate_id or 0,
"ssl_forced": existing.ssl_forced,
"hsts_enabled": existing.hsts_enabled,
"hsts_subdomains": existing.hsts_subdomains,
"http2_support": existing.http2_support,
"block_exploits": existing.block_exploits,
"caching_enabled": existing.caching_enabled,
"allow_websocket_upgrade": existing.allow_websocket_upgrade,
"access_list_id": existing.access_list_id,
"advanced_config": existing.advanced_config,
"meta": existing.meta,
}
payload.update({k: v for k, v in kwargs.items() if v is not None})
response = await self._request("PUT", f"/nginx/proxy-hosts/{host_id}", json=payload)
return ProxyHost(**response.json())
async def create_certificate(
self,
domain_names: list[str],
email: str,
provider: str = "letsencrypt",
dns_challenge: bool = False,
) -> Certificate:
"""Create/provision a new SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
provider: Certificate provider (default: "letsencrypt")
dns_challenge: Use DNS challenge instead of HTTP (default: False)
Returns:
Created Certificate object
"""
payload = {
"domain_names": domain_names,
"meta": {
"letsencrypt_email": email,
"letsencrypt_agree": True,
"dns_challenge": dns_challenge,
},
"provider": provider,
}
response = await self._request("POST", "/nginx/certificates", json=payload)
return Certificate(**response.json())

View File

@@ -1,7 +1,26 @@
"""Configuration management using pydantic-settings.""" """Configuration management using pydantic-settings."""
from typing import Any
from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
# Default values for proxy host creation
DEFAULT_PROXY_SETTINGS: dict[str, Any] = {
"forward_scheme": "http",
"certificate_id": 0,
"ssl_forced": True,
"hsts_enabled": True,
"hsts_subdomains": False,
"http2_support": True,
"caching_enabled": False,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
"meta": {},
}
class Settings(BaseSettings): class Settings(BaseSettings):
"""Application settings loaded from environment variables.""" """Application settings loaded from environment variables."""
@@ -23,5 +42,33 @@ class Settings(BaseSettings):
mcp_port: int = 8000 mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http" mcp_transport: str = "stdio" # "stdio" or "http"
# Path to NPM log directory (mount NPM's /data/logs here)
log_dir: str = ""
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}
@field_validator("proxy_defaults", mode="before")
@classmethod
def parse_proxy_defaults(cls, v: Any) -> dict[str, Any]:
"""Parse JSON string to dict, or pass through if already dict."""
if isinstance(v, dict):
return v
if isinstance(v, str) and v.strip():
import json
try:
return json.loads(v)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in NPM_PROXY_DEFAULTS: {e}") from e
return {}
def get_proxy_defaults(self) -> dict[str, Any]:
"""Get merged proxy defaults (base defaults + user overrides)."""
merged = DEFAULT_PROXY_SETTINGS.copy()
merged.update(self.proxy_defaults)
return merged
settings = Settings() settings = Settings()

View File

@@ -31,3 +31,9 @@ class NpmApiError(NpmClientError):
def __init__(self, message: str, status_code: int | None = None): def __init__(self, message: str, status_code: int | None = None):
super().__init__(message) super().__init__(message)
self.status_code = status_code self.status_code = status_code
class NpmLogError(NpmClientError):
"""Raised when log file operations fail."""
pass

118
src/npm_mcp/logs.py Normal file
View File

@@ -0,0 +1,118 @@
"""Log file reader for Nginx Proxy Manager proxy host logs."""
from __future__ import annotations
import re
from pathlib import Path
from .config import settings
from .exceptions import NpmLogError
LOG_FILE_PATTERN = re.compile(r"^proxy-host-(\d+)_(access|error)\.log$")
MAX_LINES = 500
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB safety cap
def _get_log_dir() -> Path:
"""Resolve and validate the configured log directory."""
log_dir = settings.log_dir
if not log_dir:
raise NpmLogError(
"NPM_LOG_DIR is not configured. Mount NPM's /data/logs volume "
"and set NPM_LOG_DIR to the mount path. See README for details."
)
path = Path(log_dir)
if not path.is_dir():
raise NpmLogError(f"Log directory does not exist: {log_dir}")
return path
def _log_file_path(host_id: int, log_type: str) -> Path:
if log_type not in ("access", "error"):
raise NpmLogError(f"Invalid log type: {log_type!r} (must be 'access' or 'error')")
log_dir = _get_log_dir()
return log_dir / f"proxy-host-{host_id}_{log_type}.log"
def read_log_lines(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> dict:
"""Read the last N lines from a proxy host log file.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return (capped at MAX_LINES).
search: Optional substring filter applied to each line.
Returns:
Dict with host_id, log_type, file name, line count, and the lines themselves.
"""
lines = max(1, min(lines, MAX_LINES))
log_path = _log_file_path(host_id, log_type)
if not log_path.is_file():
raise NpmLogError(
f"Log file not found: {log_path.name}. "
"The proxy host may not have received any traffic yet, "
"or the log directory mount is incorrect."
)
file_size = log_path.stat().st_size
if file_size > MAX_FILE_SIZE:
raise NpmLogError(
f"Log file is too large ({file_size / 1024 / 1024:.1f} MB). "
"Consider using an external log aggregation tool."
)
all_lines = log_path.read_text(errors="replace").splitlines()
total_lines = len(all_lines)
if search:
search_lower = search.lower()
all_lines = [line for line in all_lines if search_lower in line.lower()]
tail = all_lines[-lines:]
return {
"host_id": host_id,
"log_type": log_type,
"file": log_path.name,
"total_lines_in_file": total_lines if not search else None,
"matched_lines": len(all_lines) if search else None,
"returned_lines": len(tail),
"lines": tail,
}
def is_log_dir_configured() -> bool:
"""Check whether the log directory is configured and accessible."""
if not settings.log_dir:
return False
return Path(settings.log_dir).is_dir()
def list_available_logs() -> list[dict]:
"""List all proxy-host log files present in the log directory.
Returns:
List of dicts with host_id, log_type, file name, and size.
"""
log_dir = _get_log_dir()
results = []
for entry in sorted(log_dir.iterdir()):
match = LOG_FILE_PATTERN.match(entry.name)
if match and entry.is_file():
results.append(
{
"host_id": int(match.group(1)),
"log_type": match.group(2),
"file": entry.name,
"size_bytes": entry.stat().st_size,
}
)
return results

View File

@@ -23,27 +23,39 @@ class UserMeta(BaseModel):
class Owner(BaseModel): class Owner(BaseModel):
"""Proxy host owner information.""" """Proxy host owner information."""
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
is_disabled: bool = False
email: str | None = None
name: str = ""
nickname: str = ""
avatar: str = ""
roles: list[str] = Field(default_factory=list)
class AccessList(BaseModel):
"""Access list for authentication/IP restrictions."""
id: int id: int
created_on: datetime created_on: datetime
modified_on: datetime modified_on: datetime
is_disabled: bool owner_user_id: int = 0
email: str
name: str name: str
nickname: str satisfy_any: bool = False
avatar: str pass_auth: bool = False
roles: list[str]
class Certificate(BaseModel): class Certificate(BaseModel):
"""SSL Certificate information.""" """SSL Certificate information."""
id: int id: int | None = None
created_on: datetime created_on: datetime | None = None
modified_on: datetime modified_on: datetime | None = None
owner_user_id: int owner_user_id: int | None = None
provider: str provider: str = ""
nice_name: str nice_name: str = ""
domain_names: list[str] domain_names: list[str] = Field(default_factory=list)
expires_on: datetime | None = None expires_on: datetime | None = None
meta: dict[str, Any] = Field(default_factory=dict) meta: dict[str, Any] = Field(default_factory=dict)
@@ -81,7 +93,7 @@ class ProxyHost(BaseModel):
advanced_config: str = "" advanced_config: str = ""
enabled: bool = True enabled: bool = True
meta: dict[str, Any] = Field(default_factory=dict) meta: dict[str, Any] = Field(default_factory=dict)
locations: list[ProxyHostLocation] = Field(default_factory=list) locations: list[ProxyHostLocation] | None = None
# Optional expanded relations # Optional expanded relations
owner: Owner | None = None owner: Owner | None = None
certificate: Certificate | None = None certificate: Certificate | None = None

View File

@@ -9,7 +9,8 @@ from mcp.server.fastmcp import FastMCP
from .client import NpmClient from .client import NpmClient
from .config import settings from .config import settings
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError, NpmLogError
from .logs import is_log_dir_configured, list_available_logs, read_log_lines
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -56,6 +57,8 @@ def _format_error(e: Exception) -> str:
return f"Authentication failed: {e}" return f"Authentication failed: {e}"
elif isinstance(e, NpmConnectionError): elif isinstance(e, NpmConnectionError):
return f"Connection error: {e}" return f"Connection error: {e}"
elif isinstance(e, NpmLogError):
return f"Log error: {e}"
elif isinstance(e, NpmApiError): elif isinstance(e, NpmApiError):
return f"API error: {e}" return f"API error: {e}"
return f"Error: {e}" return f"Error: {e}"
@@ -189,6 +192,14 @@ async def get_system_health() -> str:
except NpmAuthenticationError: except NpmAuthenticationError:
result.append("Authenticated: ❌ (check credentials)") result.append("Authenticated: ❌ (check credentials)")
if is_log_dir_configured():
logs = list_available_logs()
result.append(f"Log directory: ✅ ({len(logs)} log files found)")
else:
result.append(
"Log directory: ❌ (not configured — set NPM_LOG_DIR to enable get_proxy_host_logs)"
)
return "\n".join(result) return "\n".join(result)
except Exception as e: except Exception as e:
@@ -253,11 +264,299 @@ async def list_certificates() -> str:
expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})" expiry = f" (expires: {cert.expires_on.strftime('%Y-%m-%d')})"
result.append( result.append(
f"[{cert.id}] {cert.nice_name} ({cert.provider})\n" f"[{cert.id}] {cert.nice_name} ({cert.provider})\n Domains: {domains}{expiry}"
f" Domains: {domains}{expiry}"
) )
return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result) return f"Found {len(certs)} certificate(s):\n\n" + "\n\n".join(result)
except Exception as e: except Exception as e:
return _format_error(e) return _format_error(e)
@mcp.tool()
async def list_access_lists() -> str:
"""List all access lists configured in Nginx Proxy Manager.
Returns a summary of all access lists including their IDs and names.
Use these IDs when creating proxy hosts that require access control.
"""
try:
client = get_client()
access_lists = await client.get_access_lists()
if not access_lists:
return "No access lists configured."
result = []
for al in access_lists:
result.append(f"[{al.id}] {al.name}")
return f"Found {len(access_lists)} access list(s):\n\n" + "\n".join(result)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_proxy_host(
domain_names: list[str],
forward_host: str,
forward_port: int,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
) -> str:
"""Create a new proxy host in Nginx Proxy Manager.
Args:
domain_names: List of domain names (e.g., ["app.ext.ben.io"])
forward_host: Backend host/IP to forward to (e.g., "192.168.1.100" or "container-name")
forward_port: Backend port to forward to (e.g., 8080)
forward_scheme: Backend protocol - "http" or "https" (default from config)
certificate_id: SSL certificate ID. Use list_certificates to find available certs.
Use 0 for no SSL, or the ID of a wildcard cert. (default from config)
ssl_forced: Force HTTPS redirect (default from config)
block_exploits: Enable common exploit blocking (default from config)
allow_websocket_upgrade: Allow WebSocket connections (default from config)
access_list_id: Access list ID for authentication. Use list_access_lists to find.
Use 0 for no access restrictions. (default from config)
advanced_config: Custom nginx configuration block (default from config)
Returns:
Details of the created proxy host including the new host ID.
Note:
Default values can be configured via NPM_PROXY_DEFAULTS environment variable.
Example: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
Example:
create_proxy_host(
domain_names=["myapp.ext.ben.io"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24, # *.ext.ben.io wildcard
)
"""
try:
# Get defaults from config, then override with provided values
defaults = settings.get_proxy_defaults()
client = get_client()
host = await client.create_proxy_host(
domain_names=domain_names,
forward_host=forward_host,
forward_port=forward_port,
forward_scheme=forward_scheme
if forward_scheme is not None
else defaults["forward_scheme"],
certificate_id=certificate_id
if certificate_id is not None
else defaults["certificate_id"],
ssl_forced=ssl_forced if ssl_forced is not None else defaults["ssl_forced"],
hsts_enabled=defaults.get("hsts_enabled", True),
hsts_subdomains=defaults.get("hsts_subdomains", False),
http2_support=defaults.get("http2_support", True),
block_exploits=block_exploits
if block_exploits is not None
else defaults["block_exploits"],
caching_enabled=defaults.get("caching_enabled", False),
allow_websocket_upgrade=allow_websocket_upgrade
if allow_websocket_upgrade is not None
else defaults["allow_websocket_upgrade"],
access_list_id=access_list_id
if access_list_id is not None
else defaults["access_list_id"],
advanced_config=advanced_config
if advanced_config is not None
else defaults["advanced_config"],
meta=defaults.get("meta", {}),
)
domains = ", ".join(host.domain_names)
return (
f"Successfully created proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def update_proxy_host(
host_id: int,
forward_host: str | None = None,
forward_port: int | None = None,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
) -> str:
"""Update an existing proxy host in Nginx Proxy Manager.
Only provided fields will be updated; all others remain unchanged.
Args:
host_id: The ID of the proxy host to update
forward_host: Backend host/IP to forward to
forward_port: Backend port to forward to
forward_scheme: Backend protocol - "http" or "https"
certificate_id: SSL certificate ID (use list_certificates to find, 0 for none)
ssl_forced: Force HTTPS redirect
block_exploits: Enable common exploit blocking
allow_websocket_upgrade: Allow WebSocket connections
access_list_id: Access list ID (0 for no restrictions)
advanced_config: Custom nginx configuration block
Returns:
Details of the updated proxy host.
"""
try:
client = get_client()
kwargs = {}
if forward_host is not None:
kwargs["forward_host"] = forward_host
if forward_port is not None:
kwargs["forward_port"] = forward_port
if forward_scheme is not None:
kwargs["forward_scheme"] = forward_scheme
if certificate_id is not None:
kwargs["certificate_id"] = certificate_id
if ssl_forced is not None:
kwargs["ssl_forced"] = ssl_forced
if block_exploits is not None:
kwargs["block_exploits"] = block_exploits
if allow_websocket_upgrade is not None:
kwargs["allow_websocket_upgrade"] = allow_websocket_upgrade
if access_list_id is not None:
kwargs["access_list_id"] = access_list_id
if advanced_config is not None:
kwargs["advanced_config"] = advanced_config
host = await client.update_proxy_host(host_id, **kwargs)
domains = ", ".join(host.domain_names)
return (
f"Successfully updated proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}\n"
f"Certificate ID: {host.certificate_id}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def get_proxy_host_logs(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> str:
"""Retrieve recent nginx log entries for a specific proxy host.
Reads the raw nginx access or error log file for the given host.
Requires the NPM log directory to be mounted (see NPM_LOG_DIR config).
Args:
host_id: The ID of the proxy host (use list_proxy_hosts to find IDs)
log_type: Log type - "access" for HTTP traffic or "error"
for nginx errors (default: "access")
lines: Number of most recent lines to return
(default: 100, max: 500)
search: Optional filter string - only lines containing this
text are returned (case-insensitive)
Returns:
The most recent log lines for the proxy host, with metadata.
Examples:
- get_proxy_host_logs(5) — last 100 access log lines for host 5
- get_proxy_host_logs(5, log_type="error") — recent error log
- get_proxy_host_logs(5, lines=50, search="404") — last 50 lines containing "404"
- get_proxy_host_logs(5, search="10.0.0.1") — filter by client IP
"""
try:
client = get_client()
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
result = read_log_lines(
host_id=host_id,
log_type=log_type,
lines=lines,
search=search,
)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log",
f"File: {result['file']}",
]
if result["total_lines_in_file"] is not None:
header_parts.append(f"Total lines in file: {result['total_lines_in_file']}")
if result["matched_lines"] is not None:
header_parts.append(f"Lines matching '{search}': {result['matched_lines']}")
header_parts.append(f"Showing last {result['returned_lines']} lines:")
header = "\n".join(header_parts)
if not result["lines"]:
return f"{header}\n\n(no log entries found)"
log_output = "\n".join(result["lines"])
return f"{header}\n\n{log_output}"
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_certificate(
domain_names: list[str],
email: str,
dns_challenge: bool = False,
) -> str:
"""Provision a new Let's Encrypt SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
dns_challenge: Use DNS challenge instead of HTTP (default: False)
Returns:
Details of the created certificate including its ID.
Use the returned ID with create_proxy_host or update_proxy_host.
"""
try:
client = get_client()
cert = await client.create_certificate(
domain_names=domain_names,
email=email,
dns_challenge=dns_challenge,
)
domains = ", ".join(cert.domain_names)
expiry = cert.expires_on.strftime("%Y-%m-%d") if cert.expires_on else "N/A"
return (
f"Successfully created certificate!\n\n"
f"ID: {cert.id}\n"
f"Provider: {cert.provider}\n"
f"Domains: {domains}\n"
f"Expires: {expiry}"
)
except Exception as e:
return _format_error(e)

View File

@@ -35,6 +35,53 @@ def mock_proxy_hosts():
] ]
@pytest.fixture
def mock_access_lists():
"""Mock access lists response."""
return [
{
"id": 1,
"created_on": "2024-01-01T00:00:00Z",
"modified_on": "2024-01-01T00:00:00Z",
"owner_user_id": 1,
"name": "Admin Only",
"satisfy_any": False,
"pass_auth": True,
},
{
"id": 2,
"created_on": "2024-01-02T00:00:00Z",
"modified_on": "2024-01-02T00:00:00Z",
"owner_user_id": 1,
"name": "Internal Network",
"satisfy_any": True,
"pass_auth": False,
},
]
@pytest.fixture
def mock_created_proxy_host():
"""Mock response for created proxy host."""
return {
"id": 42,
"created_on": "2024-01-15T10:00:00Z",
"modified_on": "2024-01-15T10:00:00Z",
"owner_user_id": 1,
"domain_names": ["newapp.example.com"],
"forward_host": "10.0.0.50",
"forward_port": 3000,
"forward_scheme": "http",
"enabled": True,
"ssl_forced": True,
"certificate_id": 24,
"block_exploits": True,
"allow_websocket_upgrade": True,
"access_list_id": 0,
"advanced_config": "",
}
class TestNpmClientAuth: class TestNpmClientAuth:
"""Test authentication logic.""" """Test authentication logic."""
@@ -114,3 +161,69 @@ class TestNpmClientEndpoints:
assert hosts[0].id == 1 assert hosts[0].id == 1
assert hosts[0].domain_names == ["example.com"] assert hosts[0].domain_names == ["example.com"]
assert hosts[0].forward_host == "192.168.1.100" assert hosts[0].forward_host == "192.168.1.100"
@pytest.mark.asyncio
async def test_get_access_lists(self, httpx_mock, mock_token_response, mock_access_lists):
"""Test fetching access lists."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="GET",
url="http://localhost:81/api/nginx/access-lists",
json=mock_access_lists,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
access_lists = await client.get_access_lists()
assert len(access_lists) == 2
assert access_lists[0].id == 1
assert access_lists[0].name == "Admin Only"
assert access_lists[0].pass_auth is True
assert access_lists[1].id == 2
assert access_lists[1].name == "Internal Network"
assert access_lists[1].satisfy_any is True
@pytest.mark.asyncio
async def test_create_proxy_host(
self, httpx_mock, mock_token_response, mock_created_proxy_host
):
"""Test creating a proxy host."""
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/tokens",
json=mock_token_response,
)
httpx_mock.add_response(
method="POST",
url="http://localhost:81/api/nginx/proxy-hosts",
json=mock_created_proxy_host,
status_code=201,
)
async with NpmClient(
base_url="http://localhost:81/api",
identity="test@test.com",
secret="password",
) as client:
host = await client.create_proxy_host(
domain_names=["newapp.example.com"],
forward_host="10.0.0.50",
forward_port=3000,
certificate_id=24,
ssl_forced=True,
)
assert host.id == 42
assert host.domain_names == ["newapp.example.com"]
assert host.forward_host == "10.0.0.50"
assert host.forward_port == 3000
assert host.ssl_forced is True
assert host.certificate_id == 24

91
tests/test_config.py Normal file
View File

@@ -0,0 +1,91 @@
"""Tests for configuration handling."""
import pytest
from pydantic_settings.exceptions import SettingsError
from npm_mcp.config import DEFAULT_PROXY_SETTINGS, Settings
class TestProxyDefaults:
"""Test NPM_PROXY_DEFAULTS parsing and merging."""
def test_default_proxy_settings(self):
"""Test that default settings are correct."""
settings = Settings(identity="test", secret="test")
defaults = settings.get_proxy_defaults()
assert defaults["forward_scheme"] == "http"
assert defaults["certificate_id"] == 0
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True
assert defaults["allow_websocket_upgrade"] is True
assert defaults["access_list_id"] == 0
assert defaults["advanced_config"] == ""
def test_proxy_defaults_json_parsing(self, monkeypatch):
"""Test parsing JSON string from environment variable."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", '{"certificate_id": 24, "ssl_forced": false}')
settings = Settings()
defaults = settings.get_proxy_defaults()
# Overridden values
assert defaults["certificate_id"] == 24
assert defaults["ssl_forced"] is False
# Default values preserved
assert defaults["forward_scheme"] == "http"
assert defaults["block_exploits"] is True
def test_proxy_defaults_dict_passthrough(self):
"""Test that dict values pass through correctly."""
settings = Settings(
identity="test",
secret="test",
proxy_defaults={"certificate_id": 18, "access_list_id": 5},
)
defaults = settings.get_proxy_defaults()
assert defaults["certificate_id"] == 18
assert defaults["access_list_id"] == 5
def test_proxy_defaults_empty_env_raises(self, monkeypatch):
"""Test that empty string env var raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "")
# pydantic-settings tries to JSON decode empty string and fails
with pytest.raises(SettingsError):
Settings()
def test_proxy_defaults_invalid_json_raises(self, monkeypatch):
"""Test that invalid JSON raises SettingsError."""
monkeypatch.setenv("NPM_IDENTITY", "test")
monkeypatch.setenv("NPM_SECRET", "test")
monkeypatch.setenv("NPM_PROXY_DEFAULTS", "{not valid json}")
# pydantic-settings tries to JSON decode and fails
with pytest.raises(SettingsError):
Settings()
def test_proxy_defaults_merges_not_replaces(self):
"""Test that user defaults merge with base defaults."""
settings = Settings(
identity="test",
secret="test",
proxy_defaults={"certificate_id": 24},
)
defaults = settings.get_proxy_defaults()
# All keys should be present
assert set(defaults.keys()) == set(DEFAULT_PROXY_SETTINGS.keys())
# User value applied
assert defaults["certificate_id"] == 24
# Other defaults preserved
assert defaults["ssl_forced"] is True
assert defaults["block_exploits"] is True

146
tests/test_logs.py Normal file
View File

@@ -0,0 +1,146 @@
"""Tests for the log reader module."""
import pytest
from npm_mcp.exceptions import NpmLogError
from npm_mcp.logs import (
is_log_dir_configured,
list_available_logs,
read_log_lines,
)
SAMPLE_ACCESS_LOG = (
'[22/May/2025:10:00:01 +0000] - 200 200 - GET https app.example.com'
' "/" [Client 10.0.0.1] [Length 1542] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:02 +0000] - 301 301 - GET http app.example.com'
' "/old-path" [Client 10.0.0.2] [Length 0] "curl/7.88" "-"\n'
'[22/May/2025:10:00:03 +0000] - 404 404 - GET https app.example.com'
' "/missing" [Client 10.0.0.1] [Length 548] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:04 +0000] - 200 200 - POST https app.example.com'
' "/api/data" [Client 10.0.0.3] [Length 256] "python-requests/2.31" "-"\n'
'[22/May/2025:10:00:05 +0000] - 502 502 - GET https app.example.com'
' "/health" [Client 10.0.0.1] [Length 166] "kube-probe/1.28" "-"\n'
)
SAMPLE_ERROR_LOG = (
"2025/05/22 10:00:05 [error] 42#42: *123 connect() failed"
" (111: Connection refused) while connecting to upstream,"
" client: 10.0.0.1, server: app.example.com\n"
)
@pytest.fixture
def log_dir(tmp_path, monkeypatch):
"""Create a temp log directory with sample log files."""
logs = tmp_path / "logs"
logs.mkdir()
(logs / "proxy-host-5_access.log").write_text(SAMPLE_ACCESS_LOG)
(logs / "proxy-host-5_error.log").write_text(SAMPLE_ERROR_LOG)
(logs / "proxy-host-12_access.log").write_text("")
(logs / "fallback_error.log").write_text("global error\n")
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", str(logs))
return logs
@pytest.fixture
def no_log_dir(monkeypatch):
"""Ensure log_dir is unconfigured."""
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "")
class TestReadLogLines:
def test_read_access_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="access")
assert result["host_id"] == 5
assert result["log_type"] == "access"
assert result["file"] == "proxy-host-5_access.log"
assert result["returned_lines"] == 5
assert result["total_lines_in_file"] == 5
assert "app.example.com" in result["lines"][0]
def test_read_error_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="error")
assert result["log_type"] == "error"
assert result["returned_lines"] == 1
assert "Connection refused" in result["lines"][0]
def test_lines_limit(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=2)
assert result["returned_lines"] == 2
assert "10:00:04" in result["lines"][0]
assert "10:00:05" in result["lines"][1]
def test_lines_capped_at_max(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=9999)
assert result["returned_lines"] == 5
def test_search_filter(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="404")
assert result["returned_lines"] == 1
assert result["matched_lines"] == 1
assert result["total_lines_in_file"] is None
assert "/missing" in result["lines"][0]
def test_search_case_insensitive(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="MOZILLA")
assert result["returned_lines"] == 2
def test_search_by_ip(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="10.0.0.1")
assert result["returned_lines"] == 3
def test_nonexistent_host(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=999, log_type="access")
def test_empty_log_file(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=12, log_type="error")
def test_invalid_log_type(self, log_dir):
with pytest.raises(NpmLogError, match="Invalid log type"):
read_log_lines(host_id=5, log_type="combined")
def test_no_log_dir_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
read_log_lines(host_id=5, log_type="access")
def test_nonexistent_log_dir(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent/path")
with pytest.raises(NpmLogError, match="does not exist"):
read_log_lines(host_id=5, log_type="access")
class TestIsLogDirConfigured:
def test_configured(self, log_dir):
assert is_log_dir_configured() is True
def test_not_configured(self, no_log_dir):
assert is_log_dir_configured() is False
def test_configured_but_missing(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent")
assert is_log_dir_configured() is False
class TestListAvailableLogs:
def test_lists_proxy_host_logs_only(self, log_dir):
results = list_available_logs()
files = {r["file"] for r in results}
assert "proxy-host-5_access.log" in files
assert "proxy-host-5_error.log" in files
assert "proxy-host-12_access.log" in files
assert "fallback_error.log" not in files
def test_correct_metadata(self, log_dir):
results = list_available_logs()
access_5 = next(r for r in results if r["file"] == "proxy-host-5_access.log")
assert access_5["host_id"] == 5
assert access_5["log_type"] == "access"
assert access_5["size_bytes"] > 0
def test_not_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
list_available_logs()