Initial implementation of UniFi MCP Light server
Some checks failed
Build and Push Docker Image / build (push) Failing after 12s
Some checks failed
Build and Push Docker Image / build (push) Failing after 12s
Implements Hybrid MCP Light pattern with: - 4 specific tools: list_clients, list_devices, get_system_info, get_network_health - Meta tools: tool_index, api_call (raw API pass-through) - API documentation resource at unifi://api-reference - Starlette wrapper with /health endpoint - Write protection (UNIFI_ALLOW_WRITES env var) - UniFi OS auto-detection (proxy vs direct paths) - Docker multi-stage build - Gitea CI workflow Closes #1, #2, #3, #4, #5, #7
This commit is contained in:
299
unifi_client.py
Normal file
299
unifi_client.py
Normal file
@@ -0,0 +1,299 @@
|
||||
"""
|
||||
UniFi Controller Client
|
||||
|
||||
A clean abstraction layer wrapping aiohttp for UniFi API communication.
|
||||
Handles authentication, UniFi OS detection, and write protection.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import ssl
|
||||
from typing import Any, Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# HTTP methods that modify state
|
||||
WRITE_METHODS = frozenset({"POST", "PUT", "DELETE", "PATCH"})
|
||||
|
||||
|
||||
class UnifiClientError(Exception):
|
||||
"""Base exception for UniFi client errors."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class UnifiAuthError(UnifiClientError):
|
||||
"""Authentication failed."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class UnifiWriteBlockedError(UnifiClientError):
|
||||
"""Write operation blocked by configuration."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class UnifiClient:
|
||||
"""
|
||||
Async client for UniFi Network Controller API.
|
||||
|
||||
Features:
|
||||
- Auto-detects UniFi OS vs standalone controller paths
|
||||
- Cookie-based session authentication
|
||||
- Write protection (configurable)
|
||||
- Proper async context manager support
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 443,
|
||||
site: str = "default",
|
||||
verify_ssl: bool = False,
|
||||
allow_writes: bool = False,
|
||||
):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.port = port
|
||||
self.site = site
|
||||
self.verify_ssl = verify_ssl
|
||||
self.allow_writes = allow_writes
|
||||
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
self._is_unifi_os: Optional[bool] = None
|
||||
self._csrf_token: Optional[str] = None
|
||||
self._authenticated = False
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
return f"https://{self.host}:{self.port}"
|
||||
|
||||
@property
|
||||
def api_prefix(self) -> str:
|
||||
"""Return the correct API prefix based on controller type."""
|
||||
if self._is_unifi_os:
|
||||
return "/proxy/network"
|
||||
return ""
|
||||
|
||||
async def __aenter__(self) -> "UnifiClient":
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
|
||||
await self.close()
|
||||
|
||||
async def connect(self) -> None:
|
||||
"""Initialize session, detect controller type, and authenticate."""
|
||||
if self._session is not None:
|
||||
return
|
||||
|
||||
# Create SSL context
|
||||
if self.verify_ssl:
|
||||
ssl_context = ssl.create_default_context()
|
||||
else:
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
||||
self._session = aiohttp.ClientSession(
|
||||
connector=connector,
|
||||
cookie_jar=aiohttp.CookieJar(unsafe=True),
|
||||
)
|
||||
|
||||
# Detect controller type and authenticate
|
||||
await self._detect_controller_type()
|
||||
await self._authenticate()
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the HTTP session."""
|
||||
if self._session:
|
||||
await self._session.close()
|
||||
self._session = None
|
||||
self._authenticated = False
|
||||
|
||||
async def _detect_controller_type(self) -> None:
|
||||
"""
|
||||
Detect if this is a UniFi OS controller (UDM, Cloud Gateway, etc.)
|
||||
or a standalone Network Controller.
|
||||
|
||||
UniFi OS uses /proxy/network prefix for Network API calls.
|
||||
"""
|
||||
# Try UniFi OS path first (more common in modern setups)
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self.base_url}/proxy/network/api/s/{self.site}/stat/health",
|
||||
timeout=aiohttp.ClientTimeout(total=5),
|
||||
) as resp:
|
||||
if resp.status in (200, 401, 403):
|
||||
# Path exists, this is UniFi OS
|
||||
self._is_unifi_os = True
|
||||
logger.info("Detected UniFi OS controller (proxy path)")
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Try direct path (standalone controller)
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self.base_url}/api/s/{self.site}/stat/health",
|
||||
timeout=aiohttp.ClientTimeout(total=5),
|
||||
) as resp:
|
||||
if resp.status in (200, 401, 403):
|
||||
self._is_unifi_os = False
|
||||
logger.info("Detected standalone UniFi controller (direct path)")
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Default to UniFi OS if detection fails
|
||||
self._is_unifi_os = True
|
||||
logger.warning("Controller type detection failed, defaulting to UniFi OS")
|
||||
|
||||
async def _authenticate(self) -> None:
|
||||
"""Authenticate with the UniFi controller."""
|
||||
login_url = f"{self.base_url}{self.api_prefix}/api/login"
|
||||
|
||||
payload = {
|
||||
"username": self.username,
|
||||
"password": self.password,
|
||||
}
|
||||
|
||||
try:
|
||||
async with self._session.post(
|
||||
login_url,
|
||||
json=payload,
|
||||
timeout=aiohttp.ClientTimeout(total=10),
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
self._authenticated = True
|
||||
# Extract CSRF token if present (UniFi OS)
|
||||
if "x-csrf-token" in resp.headers:
|
||||
self._csrf_token = resp.headers["x-csrf-token"]
|
||||
logger.info(f"Successfully authenticated to {self.host}")
|
||||
elif resp.status == 401:
|
||||
raise UnifiAuthError("Invalid username or password")
|
||||
else:
|
||||
text = await resp.text()
|
||||
raise UnifiAuthError(
|
||||
f"Authentication failed: {resp.status} - {text}"
|
||||
)
|
||||
except aiohttp.ClientError as e:
|
||||
raise UnifiClientError(f"Connection error during authentication: {e}")
|
||||
|
||||
async def ping(self) -> bool:
|
||||
"""
|
||||
Check if the connection to the controller is alive.
|
||||
Returns True if connected, raises exception otherwise.
|
||||
"""
|
||||
if not self._authenticated:
|
||||
raise UnifiClientError("Not authenticated")
|
||||
|
||||
try:
|
||||
result = await self.request("GET", f"/api/s/{self.site}/stat/health")
|
||||
return True
|
||||
except Exception as e:
|
||||
raise UnifiClientError(f"Controller ping failed: {e}")
|
||||
|
||||
async def request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
payload: Optional[dict] = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Make an authenticated request to the UniFi API.
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, PUT, DELETE, PATCH)
|
||||
endpoint: API endpoint (e.g., /api/s/default/stat/sta)
|
||||
payload: Optional request body for POST/PUT requests
|
||||
|
||||
Returns:
|
||||
Parsed JSON response data
|
||||
|
||||
Raises:
|
||||
UnifiWriteBlockedError: If write operation attempted without permission
|
||||
UnifiClientError: For other API errors
|
||||
"""
|
||||
if not self._session:
|
||||
raise UnifiClientError(
|
||||
"Client not connected. Use 'async with' or call connect()"
|
||||
)
|
||||
|
||||
# Check write protection
|
||||
method_upper = method.upper()
|
||||
if method_upper in WRITE_METHODS and not self.allow_writes:
|
||||
raise UnifiWriteBlockedError(
|
||||
f"Write operation ({method_upper}) blocked. "
|
||||
"Set UNIFI_ALLOW_WRITES=true to enable write operations."
|
||||
)
|
||||
|
||||
# Build full URL with correct prefix
|
||||
url = f"{self.base_url}{self.api_prefix}{endpoint}"
|
||||
|
||||
# Build headers
|
||||
headers = {}
|
||||
if self._csrf_token:
|
||||
headers["x-csrf-token"] = self._csrf_token
|
||||
|
||||
try:
|
||||
async with self._session.request(
|
||||
method_upper,
|
||||
url,
|
||||
json=payload,
|
||||
headers=headers,
|
||||
timeout=aiohttp.ClientTimeout(total=30),
|
||||
) as resp:
|
||||
text = await resp.text()
|
||||
|
||||
if resp.status == 401:
|
||||
# Session expired, try to re-authenticate
|
||||
await self._authenticate()
|
||||
return await self.request(method, endpoint, payload)
|
||||
|
||||
if resp.status >= 400:
|
||||
raise UnifiClientError(f"API error {resp.status}: {text}")
|
||||
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
return {"raw": text}
|
||||
|
||||
# UniFi API returns {"meta": {"rc": "ok"}, "data": [...]}
|
||||
if isinstance(data, dict):
|
||||
meta = data.get("meta", {})
|
||||
if meta.get("rc") == "error":
|
||||
raise UnifiClientError(
|
||||
f"API error: {meta.get('msg', 'Unknown error')}"
|
||||
)
|
||||
if "data" in data:
|
||||
return data["data"]
|
||||
|
||||
return data
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
raise UnifiClientError(f"Request failed: {e}")
|
||||
|
||||
async def get_clients(self) -> list:
|
||||
"""Get all connected clients."""
|
||||
return await self.request("GET", f"/api/s/{self.site}/stat/sta")
|
||||
|
||||
async def get_devices(self) -> list:
|
||||
"""Get all network devices."""
|
||||
return await self.request("GET", f"/api/s/{self.site}/stat/device")
|
||||
|
||||
async def get_system_info(self) -> list:
|
||||
"""Get system information."""
|
||||
return await self.request("GET", f"/api/s/{self.site}/stat/sysinfo")
|
||||
|
||||
async def get_health(self) -> list:
|
||||
"""Get network health status."""
|
||||
return await self.request("GET", f"/api/s/{self.site}/stat/health")
|
||||
Reference in New Issue
Block a user