feat: Add get_proxy_host_logs tool for reading nginx proxy host logs

Reads nginx access/error logs directly from a mounted NPM log directory,
enabling agents to debug proxy issues without SSH access. Requires mounting
NPM's /data/logs volume and setting NPM_LOG_DIR.

Also includes a feature request PRD for proposing a native log API upstream.
This commit is contained in:
2026-05-22 14:27:37 +00:00
parent c3227c3a5f
commit 5e89176806
9 changed files with 628 additions and 2 deletions
+3
View File
@@ -42,6 +42,9 @@ class Settings(BaseSettings):
mcp_port: int = 8000
mcp_transport: str = "stdio" # "stdio" or "http"
# Path to NPM log directory (mount NPM's /data/logs here)
log_dir: str = ""
# Proxy host creation defaults (JSON string)
# Example: '{"certificate_id": 24, "ssl_forced": true}'
proxy_defaults: dict[str, Any] = {}
+6
View File
@@ -31,3 +31,9 @@ class NpmApiError(NpmClientError):
def __init__(self, message: str, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class NpmLogError(NpmClientError):
"""Raised when log file operations fail."""
pass
+118
View File
@@ -0,0 +1,118 @@
"""Log file reader for Nginx Proxy Manager proxy host logs."""
from __future__ import annotations
import re
from pathlib import Path
from .config import settings
from .exceptions import NpmLogError
LOG_FILE_PATTERN = re.compile(r"^proxy-host-(\d+)_(access|error)\.log$")
MAX_LINES = 500
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB safety cap
def _get_log_dir() -> Path:
"""Resolve and validate the configured log directory."""
log_dir = settings.log_dir
if not log_dir:
raise NpmLogError(
"NPM_LOG_DIR is not configured. Mount NPM's /data/logs volume "
"and set NPM_LOG_DIR to the mount path. See README for details."
)
path = Path(log_dir)
if not path.is_dir():
raise NpmLogError(f"Log directory does not exist: {log_dir}")
return path
def _log_file_path(host_id: int, log_type: str) -> Path:
if log_type not in ("access", "error"):
raise NpmLogError(f"Invalid log type: {log_type!r} (must be 'access' or 'error')")
log_dir = _get_log_dir()
return log_dir / f"proxy-host-{host_id}_{log_type}.log"
def read_log_lines(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> dict:
"""Read the last N lines from a proxy host log file.
Args:
host_id: NPM proxy host ID.
log_type: "access" or "error".
lines: Number of most recent lines to return (capped at MAX_LINES).
search: Optional substring filter applied to each line.
Returns:
Dict with host_id, log_type, file name, line count, and the lines themselves.
"""
lines = max(1, min(lines, MAX_LINES))
log_path = _log_file_path(host_id, log_type)
if not log_path.is_file():
raise NpmLogError(
f"Log file not found: {log_path.name}. "
"The proxy host may not have received any traffic yet, "
"or the log directory mount is incorrect."
)
file_size = log_path.stat().st_size
if file_size > MAX_FILE_SIZE:
raise NpmLogError(
f"Log file is too large ({file_size / 1024 / 1024:.1f} MB). "
"Consider using an external log aggregation tool."
)
all_lines = log_path.read_text(errors="replace").splitlines()
total_lines = len(all_lines)
if search:
search_lower = search.lower()
all_lines = [line for line in all_lines if search_lower in line.lower()]
tail = all_lines[-lines:]
return {
"host_id": host_id,
"log_type": log_type,
"file": log_path.name,
"total_lines_in_file": total_lines if not search else None,
"matched_lines": len(all_lines) if search else None,
"returned_lines": len(tail),
"lines": tail,
}
def is_log_dir_configured() -> bool:
"""Check whether the log directory is configured and accessible."""
if not settings.log_dir:
return False
return Path(settings.log_dir).is_dir()
def list_available_logs() -> list[dict]:
"""List all proxy-host log files present in the log directory.
Returns:
List of dicts with host_id, log_type, file name, and size.
"""
log_dir = _get_log_dir()
results = []
for entry in sorted(log_dir.iterdir()):
match = LOG_FILE_PATTERN.match(entry.name)
if match and entry.is_file():
results.append(
{
"host_id": int(match.group(1)),
"log_type": match.group(2),
"file": entry.name,
"size_bytes": entry.stat().st_size,
}
)
return results
+76 -1
View File
@@ -9,7 +9,8 @@ from mcp.server.fastmcp import FastMCP
from .client import NpmClient
from .config import settings
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError
from .exceptions import NpmApiError, NpmAuthenticationError, NpmConnectionError, NpmLogError
from .logs import is_log_dir_configured, list_available_logs, read_log_lines
logger = logging.getLogger(__name__)
@@ -56,6 +57,8 @@ def _format_error(e: Exception) -> str:
return f"Authentication failed: {e}"
elif isinstance(e, NpmConnectionError):
return f"Connection error: {e}"
elif isinstance(e, NpmLogError):
return f"Log error: {e}"
elif isinstance(e, NpmApiError):
return f"API error: {e}"
return f"Error: {e}"
@@ -189,6 +192,14 @@ async def get_system_health() -> str:
except NpmAuthenticationError:
result.append("Authenticated: ❌ (check credentials)")
if is_log_dir_configured():
logs = list_available_logs()
result.append(f"Log directory: ✅ ({len(logs)} log files found)")
else:
result.append(
"Log directory: ❌ (not configured — set NPM_LOG_DIR to enable get_proxy_host_logs)"
)
return "\n".join(result)
except Exception as e:
@@ -448,6 +459,70 @@ async def update_proxy_host(
return _format_error(e)
@mcp.tool()
async def get_proxy_host_logs(
host_id: int,
log_type: str = "access",
lines: int = 100,
search: str | None = None,
) -> str:
"""Retrieve recent nginx log entries for a specific proxy host.
Reads the raw nginx access or error log file for the given host.
Requires the NPM log directory to be mounted (see NPM_LOG_DIR config).
Args:
host_id: The ID of the proxy host (use list_proxy_hosts to find IDs)
log_type: Log type - "access" for HTTP traffic or "error"
for nginx errors (default: "access")
lines: Number of most recent lines to return
(default: 100, max: 500)
search: Optional filter string - only lines containing this
text are returned (case-insensitive)
Returns:
The most recent log lines for the proxy host, with metadata.
Examples:
- get_proxy_host_logs(5) — last 100 access log lines for host 5
- get_proxy_host_logs(5, log_type="error") — recent error log
- get_proxy_host_logs(5, lines=50, search="404") — last 50 lines containing "404"
- get_proxy_host_logs(5, search="10.0.0.1") — filter by client IP
"""
try:
client = get_client()
host = await client.get_proxy_host(host_id)
domains = ", ".join(host.domain_names)
result = read_log_lines(
host_id=host_id,
log_type=log_type,
lines=lines,
search=search,
)
header_parts = [
f"Proxy host [{host_id}] {domains}{log_type} log",
f"File: {result['file']}",
]
if result["total_lines_in_file"] is not None:
header_parts.append(f"Total lines in file: {result['total_lines_in_file']}")
if result["matched_lines"] is not None:
header_parts.append(f"Lines matching '{search}': {result['matched_lines']}")
header_parts.append(f"Showing last {result['returned_lines']} lines:")
header = "\n".join(header_parts)
if not result["lines"]:
return f"{header}\n\n(no log entries found)"
log_output = "\n".join(result["lines"])
return f"{header}\n\n{log_output}"
except Exception as e:
return _format_error(e)
@mcp.tool()
async def create_certificate(
domain_names: list[str],