Address code review findings
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
- Fix infinite recursion on re-auth by adding attempt counter - Add endpoint validation to unifi_api_call (block path traversal, require /api/ prefix) - Clean up redundant SSL context creation - Add safe port parsing with fallback and warning log
This commit is contained in:
30
server.py
30
server.py
@@ -32,12 +32,22 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
logger = logging.getLogger("unifi-mcp-light")
|
logger = logging.getLogger("unifi-mcp-light")
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_port(value: str, default: int = 443) -> int:
|
||||||
|
"""Safely parse port from environment variable."""
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
logger.warning(f"Invalid UNIFI_PORT '{value}', using default {default}")
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
# Configuration from environment
|
# Configuration from environment
|
||||||
CONFIG = {
|
CONFIG = {
|
||||||
"host": os.getenv("UNIFI_HOST", ""),
|
"host": os.getenv("UNIFI_HOST", ""),
|
||||||
"username": os.getenv("UNIFI_USERNAME", ""),
|
"username": os.getenv("UNIFI_USERNAME", ""),
|
||||||
"password": os.getenv("UNIFI_PASSWORD", ""),
|
"password": os.getenv("UNIFI_PASSWORD", ""),
|
||||||
"port": int(os.getenv("UNIFI_PORT", "443")),
|
"port": _parse_port(os.getenv("UNIFI_PORT", "443")),
|
||||||
"site": os.getenv("UNIFI_SITE", "default"),
|
"site": os.getenv("UNIFI_SITE", "default"),
|
||||||
"verify_ssl": os.getenv("UNIFI_VERIFY_SSL", "false").lower() == "true",
|
"verify_ssl": os.getenv("UNIFI_VERIFY_SSL", "false").lower() == "true",
|
||||||
"allow_writes": os.getenv("UNIFI_ALLOW_WRITES", "false").lower() == "true",
|
"allow_writes": os.getenv("UNIFI_ALLOW_WRITES", "false").lower() == "true",
|
||||||
@@ -324,15 +334,27 @@ async def unifi_api_call(
|
|||||||
params: "{}"
|
params: "{}"
|
||||||
|
|
||||||
# Block a client (requires UNIFI_ALLOW_WRITES=true)
|
# Block a client (requires UNIFI_ALLOW_WRITES=true)
|
||||||
endpoint: "/api/s/default/cmd/stamgr"
|
endpoint: \"/api/s/default/cmd/stamgr\"
|
||||||
method: "POST"
|
method: \"POST\"
|
||||||
params: '{"cmd": "block-sta", "mac": "aa:bb:cc:dd:ee:ff"}'
|
params: '{\"cmd\": \"block-sta\", \"mac\": \"aa:bb:cc:dd:ee:ff\"}'
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
# Validate endpoint to prevent path traversal attacks
|
||||||
|
if ".." in endpoint:
|
||||||
|
return json.dumps({"error": "Invalid endpoint: path traversal not allowed"})
|
||||||
|
if not endpoint.startswith("/api/"):
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"error": "Invalid endpoint: must start with /api/",
|
||||||
|
"hint": "Example: /api/s/default/stat/sta",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Parse params JSON
|
# Parse params JSON
|
||||||
try:
|
try:
|
||||||
payload = json.loads(params) if params and params != "{}" else None
|
payload = json.loads(params) if params and params != "{}" else None
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
|
logger.warning(f"Invalid params JSON in unifi_api_call: {e}")
|
||||||
return json.dumps({"error": f"Invalid params JSON: {e}"})
|
return json.dumps({"error": f"Invalid params JSON: {e}"})
|
||||||
|
|
||||||
client = await get_client()
|
client = await get_client()
|
||||||
|
|||||||
@@ -95,10 +95,8 @@ class UnifiClient:
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Create SSL context
|
# Create SSL context
|
||||||
if self.verify_ssl:
|
ssl_context = ssl.create_default_context()
|
||||||
ssl_context = ssl.create_default_context()
|
if not self.verify_ssl:
|
||||||
else:
|
|
||||||
ssl_context = ssl.create_default_context()
|
|
||||||
ssl_context.check_hostname = False
|
ssl_context.check_hostname = False
|
||||||
ssl_context.verify_mode = ssl.CERT_NONE
|
ssl_context.verify_mode = ssl.CERT_NONE
|
||||||
|
|
||||||
@@ -216,6 +214,7 @@ class UnifiClient:
|
|||||||
method: str,
|
method: str,
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
payload: Optional[dict] = None,
|
payload: Optional[dict] = None,
|
||||||
|
_reauth_attempt: bool = False,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""
|
"""
|
||||||
Make an authenticated request to the UniFi API.
|
Make an authenticated request to the UniFi API.
|
||||||
@@ -264,9 +263,15 @@ class UnifiClient:
|
|||||||
text = await resp.text()
|
text = await resp.text()
|
||||||
|
|
||||||
if resp.status == 401:
|
if resp.status == 401:
|
||||||
# Session expired, try to re-authenticate
|
# Session expired, try to re-authenticate (once only)
|
||||||
|
if _reauth_attempt:
|
||||||
|
raise UnifiAuthError(
|
||||||
|
"Re-authentication failed. Check credentials or account status."
|
||||||
|
)
|
||||||
await self._authenticate()
|
await self._authenticate()
|
||||||
return await self.request(method, endpoint, payload)
|
return await self.request(
|
||||||
|
method, endpoint, payload, _reauth_attempt=True
|
||||||
|
)
|
||||||
|
|
||||||
if resp.status >= 400:
|
if resp.status >= 400:
|
||||||
raise UnifiClientError(f"API error {resp.status}: {text}")
|
raise UnifiClientError(f"API error {resp.status}: {text}")
|
||||||
|
|||||||
Reference in New Issue
Block a user