4 Commits
v0.0.2 ... main

Author SHA1 Message Date
5e89176806 feat: Add get_proxy_host_logs tool for reading nginx proxy host logs
Reads nginx access/error logs directly from a mounted NPM log directory,
enabling agents to debug proxy issues without SSH access. Requires mounting
NPM's /data/logs volume and setting NPM_LOG_DIR.

Also includes a feature request PRD for proposing a native log API upstream.
2026-05-22 14:27:37 +00:00
c3227c3a5f fix: Disable Docker push on PR builds and update README
Fork PRs don't have write access to GHCR. Use conditional push
that only pushes on non-PR events (push to main, tags).

Also update README with new tools from v0.0.3:
- update_proxy_host
- create_certificate
2026-02-12 04:06:58 +00:00
32f57b1a9e fix: Disable Docker push on PR builds
Fork PRs don't have write access to GHCR. Use conditional push
that only pushes on non-PR events (push to main, tags).
2026-02-12 03:51:18 +00:00
Jordan Réjaud
f81bf796a6 Fix Pydantic backward compatibility and add update/certificate tools (#3)
- Make Owner and Certificate model fields optional with defaults to fix
  parsing errors when NPM API returns null/missing nested objects
- Add update_proxy_host tool for modifying existing proxy host configs
- Add create_certificate tool for provisioning Let's Encrypt SSL certs
- Add corresponding client methods with full parameter support
2026-02-11 21:47:54 -06:00
12 changed files with 825 additions and 19 deletions

View File

@@ -48,7 +48,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
push: true
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha

View File

@@ -64,6 +64,7 @@ NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'
| `NPM_MCP_HOST` | No | `0.0.0.0` | MCP server bind address |
| `NPM_MCP_PORT` | No | `8000` | MCP server port |
| `NPM_MCP_TRANSPORT` | No | `stdio` | Transport mode (`stdio` or `http`) |
| `NPM_LOG_DIR` | No | - | Path to mounted NPM log directory (enables `get_proxy_host_logs`) |
| `NPM_PROXY_DEFAULTS` | No | `{}` | JSON defaults for `create_proxy_host` |
### NPM_PROXY_DEFAULTS Keys
@@ -126,11 +127,101 @@ Add to your `claude_desktop_config.json`:
|------|-------------|
| `list_proxy_hosts` | List all proxy hosts |
| `get_proxy_host_details` | Get full config for a specific host |
| `get_proxy_host_logs` | Retrieve nginx access/error logs for a proxy host (requires log mount) |
| `get_system_health` | Check NPM version and status |
| `search_audit_logs` | Query audit log entries |
| `list_certificates` | List SSL certificates |
| `list_access_lists` | List access lists for authentication/IP restrictions |
| `create_proxy_host` | Create a new proxy host |
| `update_proxy_host` | Update an existing proxy host (v0.0.3+) |
| `create_certificate` | Provision a new Let's Encrypt SSL certificate (v0.0.3+) |
## Log Access Setup
The `get_proxy_host_logs` tool reads nginx log files directly from disk. Since NPM has no API for log retrieval, you need to mount NPM's log directory into the MCP container.
NPM writes per-host logs to `/data/logs/` inside its container:
- `proxy-host-{id}_access.log` — HTTP request log (client IP, status, path, user agent)
- `proxy-host-{id}_error.log` — nginx error log (upstream failures, config issues)
### Docker Compose (same stack)
If NPM and the MCP server share a compose stack with a named volume:
```yaml
services:
nginx-proxy-manager:
image: jc21/nginx-proxy-manager:latest
volumes:
- npm_data:/data
npm-mcp:
image: ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
environment:
- NPM_API_URL=http://nginx-proxy-manager:81/api
- NPM_IDENTITY=admin@example.com
- NPM_SECRET=yourpassword
- NPM_LOG_DIR=/data/npm-logs
volumes:
# Mount NPM's /data volume — logs are in /data/logs/ inside it
- npm_data:/data/npm-logs:ro
depends_on:
- nginx-proxy-manager
volumes:
npm_data:
```
> **Note:** NPM stores logs under `/data/logs/` inside its data volume. When you
> mount the full `/data` volume to `/data/npm-logs`, the MCP server looks for logs at
> `/data/npm-logs/logs/`. Set `NPM_LOG_DIR` to match your mount path plus `/logs`.
If you mounted the full data volume:
```bash
NPM_LOG_DIR=/data/npm-logs/logs
```
### Bind Mount (separate stacks)
If NPM uses a bind mount (e.g., `./npm-data:/data`), mount the logs subdirectory directly:
```yaml
npm-mcp:
volumes:
- /path/to/npm-data/logs:/data/npm-logs:ro
environment:
- NPM_LOG_DIR=/data/npm-logs
```
### Docker Run
```bash
docker run -d \
--name npm-mcp \
-p 8000:8000 \
-v npm_data:/data/npm-logs:ro \
-e NPM_API_URL=http://your-npm:81/api \
-e NPM_IDENTITY=admin@example.com \
-e NPM_SECRET=yourpassword \
-e NPM_LOG_DIR=/data/npm-logs/logs \
-e NPM_MCP_TRANSPORT=http \
ghcr.io/b3nw/nginx-proxy-manager-mcp:latest
```
### Local Development (non-Docker)
Point `NPM_LOG_DIR` at wherever NPM's logs are on your filesystem:
```bash
NPM_LOG_DIR=/path/to/npm/data/logs npm-mcp
```
### Verifying the Mount
After starting, call `get_system_health` — if the log directory is mounted and accessible
the tool will confirm it. You can also call `get_proxy_host_logs` with any host ID to
verify logs are readable.
## Development

View File

@@ -16,6 +16,11 @@ services:
- NPM_MCP_TRANSPORT=http
- NPM_MCP_HOST=0.0.0.0
- NPM_MCP_PORT=8000
# Optional: Log access (requires volume mount below)
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# # Mount NPM's log directory for get_proxy_host_logs tool (read-only)
# - npm_data:/data/npm-logs:ro # named volume — see "Log Access" in README
# Uncomment to use .env file instead of inline environment
# env_file:
# - .env
@@ -26,7 +31,7 @@ services:
retries: 3
start_period: 10s
# Example: Running alongside NPM in same compose stack
# Example: Running alongside NPM in same compose stack (with log access)
# services:
# nginx-proxy-manager:
# image: jc21/nginx-proxy-manager:latest
@@ -44,6 +49,9 @@ services:
# - NPM_API_URL=http://nginx-proxy-manager:81/api
# - NPM_IDENTITY=admin@example.com
# - NPM_SECRET=changeme
# - NPM_LOG_DIR=/data/npm-logs
# volumes:
# - npm_data:/data/npm-logs:ro
# depends_on:
# - nginx-proxy-manager
#

View File

@@ -0,0 +1,176 @@
# Feature Request: Per-Host Log Retrieval API
**Project:** [NginxProxyManager/nginx-proxy-manager](https://github.com/NginxProxyManager/nginx-proxy-manager)
**Type:** Feature Request
**Status:** Draft PRD
## Problem Statement
Nginx Proxy Manager writes per-host access and error logs to predictable paths on disk
(`/data/logs/proxy-host-{id}_access.log`, `/data/logs/proxy-host-{id}_error.log`), but
provides no API to read them. The only log-related API is the audit log (`GET /api/audit-log`),
which tracks admin configuration changes — not HTTP traffic.
This means operators and automation tools have no programmatic way to:
- Retrieve recent access log entries for a specific proxy host
- Check error logs when debugging upstream connectivity issues
- Monitor traffic patterns or detect anomalies through the existing API surface
The only workarounds today are direct filesystem access (requiring volume mounts or
`docker exec`) or external log aggregation pipelines, both of which add significant
operational complexity for a task that should be simple.
## Proposed Solution
Add REST API endpoints to retrieve nginx access and error logs for individual proxy hosts.
### New Endpoints
#### `GET /api/nginx/proxy-hosts/{id}/logs`
Retrieve log entries for a specific proxy host.
**Query Parameters:**
| Parameter | Type | Default | Description |
|------------|--------|------------|-------------------------------------------------------|
| `type` | string | `"access"` | Log type: `"access"` or `"error"` |
| `lines` | int | `100` | Number of most recent lines to return (max: `1000`) |
| `search` | string | - | Filter lines containing this substring |
| `since` | string | - | ISO 8601 timestamp — only return lines after this time|
**Response (200):**
```json
{
"host_id": 5,
"log_type": "access",
"file": "proxy-host-5_access.log",
"total_lines": 4821,
"returned_lines": 100,
"lines": [
"[01/Jun/2025:14:22:31 +0000] HIT 200 200 - GET https app.example.com \"/api/data\" [Client 10.0.0.1] [Length 1542] [Gzip -] [Sent-to 192.168.1.50] \"Mozilla/5.0\" \"https://app.example.com/\"",
"..."
]
}
```
**Error Responses:**
| Status | Condition |
|--------|-------------------------------------|
| 404 | Proxy host not found |
| 404 | Log file does not exist |
| 403 | User lacks permission for this host |
#### `GET /api/nginx/proxy-hosts/{id}/logs/summary`
Return a statistical summary of recent traffic for a proxy host.
**Response (200):**
```json
{
"host_id": 5,
"period": "last_1000_lines",
"status_codes": {"200": 812, "301": 45, "404": 23, "500": 3},
"top_paths": ["/api/data", "/", "/login"],
"top_clients": ["10.0.0.1", "10.0.0.5"],
"cache_hit_rate": 0.42,
"access_log_size_bytes": 524288,
"error_log_size_bytes": 8192
}
```
### Permissions
| Permission | Description |
|--------------------|--------------------------------------------|
| `proxy-hosts:logs` | Read logs for proxy hosts the user can view |
Admin users can read logs for any host. Non-admin users can only read logs
for hosts they own, consistent with existing proxy host permissions.
### Backend Implementation Notes
The implementation is straightforward because log paths are already deterministic
and hardcoded in nginx templates:
```javascript
// backend/templates/proxy_host.conf
access_log /data/logs/proxy-host-{{ id }}_access.log proxy;
error_log /data/logs/proxy-host-{{ id }}_error.log warn;
```
A minimal implementation would:
1. Add a new route in `backend/routes/` (e.g., `proxy-host-logs.js`)
2. Verify the proxy host exists and the user has access
3. Read the last N lines from the log file using a reverse-reader (or `tail`-like approach)
4. Optionally filter lines by substring or timestamp
5. Return as JSON
**Reference files for implementation:**
- `backend/routes/nginx/proxy_hosts.js` — existing proxy host routes and permission model
- `backend/templates/proxy_host.conf` — log path template confirming the naming convention
- `backend/lib/access/` — permission definition files
- `docker/rootfs/etc/logrotate.d/nginx-proxy-manager` — log rotation config (rotated logs
have `.1`, `.2.gz` suffixes)
### Log Rotation Consideration
NPM rotates logs weekly (access: 4 rotations, error: 10 rotations). The API should
read only the current (unrotated) log file. Rotated archives (`.1`, `.2.gz`) could be
supported in a future iteration but are not required for the initial implementation.
## Motivation
### Use Cases
1. **MCP/AI Agent Integration** — MCP servers wrapping the NPM API (like
[nginx-proxy-manager-mcp](https://github.com/b3nw/nginx-proxy-manager-mcp)) can
expose log retrieval to AI assistants for debugging proxy issues conversationally.
2. **Quick Debugging** — When a reverse proxy returns errors, operators need to quickly
check the access and error logs for that specific host. Today this requires SSH/exec
access to the container.
3. **Monitoring Dashboards** — Custom dashboards can poll the log endpoint for
traffic summaries without deploying a full log aggregation stack.
4. **Automation** — CI/CD pipelines and health-check scripts can verify that traffic
is flowing correctly to newly deployed services behind NPM.
### Why Not External Log Aggregation?
External solutions (Loki, ELK, Fluentd) are powerful but heavy. Many NPM users run
single-host homelab setups where a full log pipeline is disproportionate to the need.
A built-in API covers 80% of use cases with zero additional infrastructure.
## Alternatives Considered
| Approach | Pros | Cons |
|---------------------------|-----------------------------|-------------------------------------------------|
| **API endpoint (proposed)** | Native, zero extra infra | Requires upstream PR |
| Volume mount + file read | Works today, no NPM changes | Tight coupling, no access control, not portable |
| Docker exec | Works today | Requires Docker socket, security risk |
| Sidecar log server | Decoupled | Extra container, extra config, extra maintenance |
## Scope
### In Scope (v1)
- `GET /api/nginx/proxy-hosts/{id}/logs` with `type`, `lines`, `search` params
- Permission checks consistent with existing proxy host access model
- Current (unrotated) log file only
### Out of Scope (Future)
- Log streaming via WebSocket or SSE
- Rotated log archive access (`.gz` files)
- Log summary/analytics endpoint
- Redirection host, dead host, and stream logs (same pattern, easy to add later)
- Log download as file attachment
- Log retention configuration via API

View File

@@ -8,6 +8,11 @@ NPM_MCP_HOST=0.0.0.0
NPM_MCP_PORT=8000
NPM_MCP_TRANSPORT=stdio # stdio or http
# Log Access (optional)
# Mount NPM's /data/logs directory and set this to the mount path.
# Enables the get_proxy_host_logs tool for reading nginx access/error logs.
# NPM_LOG_DIR=/data/npm-logs
# Proxy Host Creation Defaults (JSON)
# Set default values for create_proxy_host tool parameters
# Example with wildcard cert: NPM_PROXY_DEFAULTS='{"certificate_id": 24, "ssl_forced": true}'

View File

@@ -323,3 +323,72 @@ class NpmClient:
response = await self._request("POST", "/nginx/proxy-hosts", json=payload)
return ProxyHost(**response.json())
async def update_proxy_host(
self,
host_id: int,
**kwargs,
) -> ProxyHost:
"""Update an existing proxy host.
Args:
host_id: The proxy host ID to update
**kwargs: Fields to update (same as create_proxy_host)
Returns:
Updated ProxyHost object
"""
# Get existing host to merge with updates
existing = await self.get_proxy_host(host_id)
payload = {
"domain_names": existing.domain_names,
"forward_host": existing.forward_host,
"forward_port": existing.forward_port,
"forward_scheme": existing.forward_scheme,
"certificate_id": existing.certificate_id or 0,
"ssl_forced": existing.ssl_forced,
"hsts_enabled": existing.hsts_enabled,
"hsts_subdomains": existing.hsts_subdomains,
"http2_support": existing.http2_support,
"block_exploits": existing.block_exploits,
"caching_enabled": existing.caching_enabled,
"allow_websocket_upgrade": existing.allow_websocket_upgrade,
"access_list_id": existing.access_list_id,
"advanced_config": existing.advanced_config,
"meta": existing.meta,
}
payload.update({k: v for k, v in kwargs.items() if v is not None})
response = await self._request("PUT", f"/nginx/proxy-hosts/{host_id}", json=payload)
return ProxyHost(**response.json())
async def create_certificate(
self,
domain_names: list[str],
email: str,
provider: str = "letsencrypt",
dns_challenge: bool = False,
) -> Certificate:
"""Create/provision a new SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
provider: Certificate provider (default: "letsencrypt")
dns_challenge: Use DNS challenge instead of HTTP (default: False)
Returns:
Created Certificate object
"""
payload = {
"domain_names": domain_names,
"meta": {
"letsencrypt_email": email,
"letsencrypt_agree": True,
"dns_challenge": dns_challenge,
},
"provider": provider,
}
response = await self._request("POST", "/nginx/certificates", json=payload)
return Certificate(**response.json())

View File

@@ -42,6 +42,9 @@ class Settings(BaseSettings):
mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http"
# Path to NPM log directory (mount NPM's /data/logs here)
log_dir: str = ""
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}

View File

@@ -31,3 +31,9 @@ class NpmApiError(NpmClientError):
def __init__(self, message: str, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class NpmLogError(NpmClientError):
"""Raised when log file operations fail."""
pass

118
src/npm_mcp/logs.py Normal file
View File

@@ -0,0 +1,118 @@
"""Log file reader for Nginx Proxy Manager proxy host logs."""
from __future__ import annotations
import re
from pathlib import Path
from .config import settings
from .exceptions import NpmLogError
LOG_FILE_PATTERN = re.compile(r"^proxy-host-(\d+)_(access|error)\.log$")
MAX_LINES = 500
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB safety cap
def _get_log_dir() -> Path:
"""Resolve and validate the configured log directory."""
log_dir = settings.log_dir
if not log_dir:
raise NpmLogError(
"NPM_LOG_DIR is not configured. Mount NPM's /data/logs volume "
"and set NPM_LOG_DIR to the mount path. See README for details."
)
path = Path(log_dir)
if not path.is_dir():
raise NpmLogError(f"Log directory does not exist: {log_dir}")
return path
def _log_file_path(host_id: int, log_type: str) -> Path:
if log_type not in ("access", "error"):
raise NpmLogError(f"Invalid log type: {log_type!r} (must be 'access' or 'error')")
log_dir = _get_log_dir()
return log_dir / f"proxy-host-{host_id}_{log_type}.log"
def read_log_lines(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> dict:
"""Read the last N lines from a proxy host log file.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return (capped at MAX_LINES).
search: Optional substring filter applied to each line.
Returns:
Dict with host_id, log_type, file name, line count, and the lines themselves.
"""
lines = max(1, min(lines, MAX_LINES))
log_path = _log_file_path(host_id, log_type)
if not log_path.is_file():
raise NpmLogError(
f"Log file not found: {log_path.name}. "
"The proxy host may not have received any traffic yet, "
"or the log directory mount is incorrect."
)
file_size = log_path.stat().st_size
if file_size > MAX_FILE_SIZE:
raise NpmLogError(
f"Log file is too large ({file_size / 1024 / 1024:.1f} MB). "
"Consider using an external log aggregation tool."
)
all_lines = log_path.read_text(errors="replace").splitlines()
total_lines = len(all_lines)
if search:
search_lower = search.lower()
all_lines = [line for line in all_lines if search_lower in line.lower()]
tail = all_lines[-lines:]
return {
"host_id": host_id,
"log_type": log_type,
"file": log_path.name,
"total_lines_in_file": total_lines if not search else None,
"matched_lines": len(all_lines) if search else None,
"returned_lines": len(tail),
"lines": tail,
}
def is_log_dir_configured() -> bool:
"""Check whether the log directory is configured and accessible."""
if not settings.log_dir:
return False
return Path(settings.log_dir).is_dir()
def list_available_logs() -> list[dict]:
"""List all proxy-host log files present in the log directory.
Returns:
List of dicts with host_id, log_type, file name, and size.
"""
log_dir = _get_log_dir()
results = []
for entry in sorted(log_dir.iterdir()):
match = LOG_FILE_PATTERN.match(entry.name)
if match and entry.is_file():
results.append(
{
"host_id": int(match.group(1)),
"log_type": match.group(2),
"file": entry.name,
"size_bytes": entry.stat().st_size,
}
)
return results

View File

@@ -23,15 +23,15 @@ class UserMeta(BaseModel):
class Owner(BaseModel):
"""Proxy host owner information."""
id: int
created_on: datetime
modified_on: datetime
is_disabled: bool
email: str
name: str
nickname: str
avatar: str
roles: list[str]
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
is_disabled: bool = False
email: str | None = None
name: str = ""
nickname: str = ""
avatar: str = ""
roles: list[str] = Field(default_factory=list)
class AccessList(BaseModel):
@@ -49,13 +49,13 @@ class AccessList(BaseModel):
class Certificate(BaseModel):
"""SSL Certificate information."""
id: int
created_on: datetime
modified_on: datetime
owner_user_id: int
provider: str
nice_name: str
domain_names: list[str]
id: int | None = None
created_on: datetime | None = None
modified_on: datetime | None = None
owner_user_id: int | None = None
provider: str = ""
nice_name: str = ""
domain_names: list[str] = Field(default_factory=list)
expires_on: datetime | None = None
meta: dict[str, Any] = Field(default_factory=dict)

View File

@@ -9,7 +9,8 @@ from mcp.server.fastmcp import FastMCP
from .client import NpmClient
from .config import settings
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError, NpmLogError
from .logs import is_log_dir_configured, list_available_logs, read_log_lines
logger = logging.getLogger(__name__)
@@ -56,6 +57,8 @@ def _format_error(e: Exception) -> str:
return f"Authentication failed: {e}"
elif isinstance(e, NpmConnectionError):
return f"Connection error: {e}"
elif isinstance(e, NpmLogError):
return f"Log error: {e}"
elif isinstance(e, NpmApiError):
return f"API error: {e}"
return f"Error: {e}"
@@ -189,6 +192,14 @@ async def get_system_health() -> str:
except NpmAuthenticationError:
result.append("Authenticated: ❌ (check credentials)")
if is_log_dir_configured():
logs = list_available_logs()
result.append(f"Log directory: ✅ ({len(logs)} log files found)")
else:
result.append(
"Log directory: ❌ (not configured — set NPM_LOG_DIR to enable get_proxy_host_logs)"
)
return "\n".join(result)
except Exception as e:
@@ -376,3 +387,176 @@ async def create_proxy_host(
except Exception as e:
return _format_error(e)
@mcp.tool()
async def update_proxy_host(
host_id: int,
forward_host: str | None = None,
forward_port: int | None = None,
forward_scheme: str | None = None,
certificate_id: int | None = None,
ssl_forced: bool | None = None,
block_exploits: bool | None = None,
allow_websocket_upgrade: bool | None = None,
access_list_id: int | None = None,
advanced_config: str | None = None,
) -> str:
"""Update an existing proxy host in Nginx Proxy Manager.
Only provided fields will be updated; all others remain unchanged.
Args:
host_id: The ID of the proxy host to update
forward_host: Backend host/IP to forward to
forward_port: Backend port to forward to
forward_scheme: Backend protocol - "http" or "https"
certificate_id: SSL certificate ID (use list_certificates to find, 0 for none)
ssl_forced: Force HTTPS redirect
block_exploits: Enable common exploit blocking
allow_websocket_upgrade: Allow WebSocket connections
access_list_id: Access list ID (0 for no restrictions)
advanced_config: Custom nginx configuration block
Returns:
Details of the updated proxy host.
"""
try:
client = get_client()
kwargs = {}
if forward_host is not None:
kwargs["forward_host"] = forward_host
if forward_port is not None:
kwargs["forward_port"] = forward_port
if forward_scheme is not None:
kwargs["forward_scheme"] = forward_scheme
if certificate_id is not None:
kwargs["certificate_id"] = certificate_id
if ssl_forced is not None:
kwargs["ssl_forced"] = ssl_forced
if block_exploits is not None:
kwargs["block_exploits"] = block_exploits
if allow_websocket_upgrade is not None:
kwargs["allow_websocket_upgrade"] = allow_websocket_upgrade
if access_list_id is not None:
kwargs["access_list_id"] = access_list_id
if advanced_config is not None:
kwargs["advanced_config"] = advanced_config
host = await client.update_proxy_host(host_id, **kwargs)
domains = ", ".join(host.domain_names)
return (
f"Successfully updated proxy host!\n\n"
f"ID: {host.id}\n"
f"Domains: {domains}\n"
f"Forward: {host.forward_scheme}://{host.forward_host}:{host.forward_port}\n"
f"SSL: {'Enabled' if host.ssl_forced else 'Disabled'}\n"
f"Certificate ID: {host.certificate_id}"
)
except Exception as e:
return _format_error(e)
@mcp.tool()
async def get_proxy_host_logs(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> str:
"""Retrieve recent nginx log entries for a specific proxy host.
Reads the raw nginx access or error log file for the given host.
Requires the NPM log directory to be mounted (see NPM_LOG_DIR config).
Args:
host_id: The ID of the proxy host (use list_proxy_hosts to find IDs)
log_type: Log type - "access" for HTTP traffic or "error"
for nginx errors (default: "access")
lines: Number of most recent lines to return
(default: 100, max: 500)
search: Optional filter string - only lines containing this
text are returned (case-insensitive)
Returns:
The most recent log lines for the proxy host, with metadata.
Examples:
- get_proxy_host_logs(5) — last 100 access log lines for host 5
- get_proxy_host_logs(5, log_type="error") — recent error log
- get_proxy_host_logs(5, lines=50, search="404") — last 50 lines containing "404"
- get_proxy_host_logs(5, search="10.0.0.1") — filter by client IP
"""
try:
client = get_client()
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
result = read_log_lines(
host_id=host_id,
log_type=log_type,
lines=lines,
search=search,
)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log",
f"File: {result['file']}",
]
if result["total_lines_in_file"] is not None:
header_parts.append(f"Total lines in file: {result['total_lines_in_file']}")
if result["matched_lines"] is not None:
header_parts.append(f"Lines matching '{search}': {result['matched_lines']}")
header_parts.append(f"Showing last {result['returned_lines']} lines:")
header = "\n".join(header_parts)
if not result["lines"]:
return f"{header}\n\n(no log entries found)"
log_output = "\n".join(result["lines"])
return f"{header}\n\n{log_output}"
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_certificate(
domain_names: list[str],
email: str,
dns_challenge: bool = False,
) -> str:
"""Provision a new Let's Encrypt SSL certificate.
Args:
domain_names: List of domain names for the certificate
email: Email address for Let's Encrypt notifications
dns_challenge: Use DNS challenge instead of HTTP (default: False)
Returns:
Details of the created certificate including its ID.
Use the returned ID with create_proxy_host or update_proxy_host.
"""
try:
client = get_client()
cert = await client.create_certificate(
domain_names=domain_names,
email=email,
dns_challenge=dns_challenge,
)
domains = ", ".join(cert.domain_names)
expiry = cert.expires_on.strftime("%Y-%m-%d") if cert.expires_on else "N/A"
return (
f"Successfully created certificate!\n\n"
f"ID: {cert.id}\n"
f"Provider: {cert.provider}\n"
f"Domains: {domains}\n"
f"Expires: {expiry}"
)
except Exception as e:
return _format_error(e)

146
tests/test_logs.py Normal file
View File

@@ -0,0 +1,146 @@
"""Tests for the log reader module."""
import pytest
from npm_mcp.exceptions import NpmLogError
from npm_mcp.logs import (
is_log_dir_configured,
list_available_logs,
read_log_lines,
)
SAMPLE_ACCESS_LOG = (
'[22/May/2025:10:00:01 +0000] - 200 200 - GET https app.example.com'
' "/" [Client 10.0.0.1] [Length 1542] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:02 +0000] - 301 301 - GET http app.example.com'
' "/old-path" [Client 10.0.0.2] [Length 0] "curl/7.88" "-"\n'
'[22/May/2025:10:00:03 +0000] - 404 404 - GET https app.example.com'
' "/missing" [Client 10.0.0.1] [Length 548] "Mozilla/5.0" "-"\n'
'[22/May/2025:10:00:04 +0000] - 200 200 - POST https app.example.com'
' "/api/data" [Client 10.0.0.3] [Length 256] "python-requests/2.31" "-"\n'
'[22/May/2025:10:00:05 +0000] - 502 502 - GET https app.example.com'
' "/health" [Client 10.0.0.1] [Length 166] "kube-probe/1.28" "-"\n'
)
SAMPLE_ERROR_LOG = (
"2025/05/22 10:00:05 [error] 42#42: *123 connect() failed"
" (111: Connection refused) while connecting to upstream,"
" client: 10.0.0.1, server: app.example.com\n"
)
@pytest.fixture
def log_dir(tmp_path, monkeypatch):
"""Create a temp log directory with sample log files."""
logs = tmp_path / "logs"
logs.mkdir()
(logs / "proxy-host-5_access.log").write_text(SAMPLE_ACCESS_LOG)
(logs / "proxy-host-5_error.log").write_text(SAMPLE_ERROR_LOG)
(logs / "proxy-host-12_access.log").write_text("")
(logs / "fallback_error.log").write_text("global error\n")
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", str(logs))
return logs
@pytest.fixture
def no_log_dir(monkeypatch):
"""Ensure log_dir is unconfigured."""
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "")
class TestReadLogLines:
def test_read_access_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="access")
assert result["host_id"] == 5
assert result["log_type"] == "access"
assert result["file"] == "proxy-host-5_access.log"
assert result["returned_lines"] == 5
assert result["total_lines_in_file"] == 5
assert "app.example.com" in result["lines"][0]
def test_read_error_log(self, log_dir):
result = read_log_lines(host_id=5, log_type="error")
assert result["log_type"] == "error"
assert result["returned_lines"] == 1
assert "Connection refused" in result["lines"][0]
def test_lines_limit(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=2)
assert result["returned_lines"] == 2
assert "10:00:04" in result["lines"][0]
assert "10:00:05" in result["lines"][1]
def test_lines_capped_at_max(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", lines=9999)
assert result["returned_lines"] == 5
def test_search_filter(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="404")
assert result["returned_lines"] == 1
assert result["matched_lines"] == 1
assert result["total_lines_in_file"] is None
assert "/missing" in result["lines"][0]
def test_search_case_insensitive(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="MOZILLA")
assert result["returned_lines"] == 2
def test_search_by_ip(self, log_dir):
result = read_log_lines(host_id=5, log_type="access", search="10.0.0.1")
assert result["returned_lines"] == 3
def test_nonexistent_host(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=999, log_type="access")
def test_empty_log_file(self, log_dir):
with pytest.raises(NpmLogError, match="Log file not found"):
read_log_lines(host_id=12, log_type="error")
def test_invalid_log_type(self, log_dir):
with pytest.raises(NpmLogError, match="Invalid log type"):
read_log_lines(host_id=5, log_type="combined")
def test_no_log_dir_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
read_log_lines(host_id=5, log_type="access")
def test_nonexistent_log_dir(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent/path")
with pytest.raises(NpmLogError, match="does not exist"):
read_log_lines(host_id=5, log_type="access")
class TestIsLogDirConfigured:
def test_configured(self, log_dir):
assert is_log_dir_configured() is True
def test_not_configured(self, no_log_dir):
assert is_log_dir_configured() is False
def test_configured_but_missing(self, monkeypatch):
monkeypatch.setattr("npm_mcp.logs.settings.log_dir", "/nonexistent")
assert is_log_dir_configured() is False
class TestListAvailableLogs:
def test_lists_proxy_host_logs_only(self, log_dir):
results = list_available_logs()
files = {r["file"] for r in results}
assert "proxy-host-5_access.log" in files
assert "proxy-host-5_error.log" in files
assert "proxy-host-12_access.log" in files
assert "fallback_error.log" not in files
def test_correct_metadata(self, log_dir):
results = list_available_logs()
access_5 = next(r for r in results if r["file"] == "proxy-host-5_access.log")
assert access_5["host_id"] == 5
assert access_5["log_type"] == "access"
assert access_5["size_bytes"] > 0
def test_not_configured(self, no_log_dir):
with pytest.raises(NpmLogError, match="NPM_LOG_DIR is not configured"):
list_available_logs()