All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
- Fix infinite recursion on re-auth by adding attempt counter - Add endpoint validation to unifi_api_call (block path traversal, require /api/ prefix) - Clean up redundant SSL context creation - Add safe port parsing with fallback and warning log
314 lines
10 KiB
Python
314 lines
10 KiB
Python
"""
|
|
UniFi Controller Client
|
|
|
|
A clean abstraction layer wrapping aiohttp for UniFi API communication.
|
|
Handles authentication, UniFi OS detection, and write protection.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import ssl
|
|
from typing import Any, Optional
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# HTTP methods that modify state
|
|
WRITE_METHODS = frozenset({"POST", "PUT", "DELETE", "PATCH"})
|
|
|
|
|
|
class UnifiClientError(Exception):
|
|
"""Base exception for UniFi client errors."""
|
|
|
|
pass
|
|
|
|
|
|
class UnifiAuthError(UnifiClientError):
|
|
"""Authentication failed."""
|
|
|
|
pass
|
|
|
|
|
|
class UnifiWriteBlockedError(UnifiClientError):
|
|
"""Write operation blocked by configuration."""
|
|
|
|
pass
|
|
|
|
|
|
class UnifiClient:
|
|
"""
|
|
Async client for UniFi Network Controller API.
|
|
|
|
Features:
|
|
- Auto-detects UniFi OS vs standalone controller paths
|
|
- Cookie-based session authentication
|
|
- Write protection (configurable)
|
|
- Proper async context manager support
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 443,
|
|
site: str = "default",
|
|
verify_ssl: bool = False,
|
|
allow_writes: bool = False,
|
|
):
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.port = port
|
|
self.site = site
|
|
self.verify_ssl = verify_ssl
|
|
self.allow_writes = allow_writes
|
|
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
self._is_unifi_os: Optional[bool] = None
|
|
self._csrf_token: Optional[str] = None
|
|
self._authenticated = False
|
|
|
|
@property
|
|
def base_url(self) -> str:
|
|
return f"https://{self.host}:{self.port}"
|
|
|
|
@property
|
|
def api_prefix(self) -> str:
|
|
"""Return the correct API prefix based on controller type."""
|
|
if self._is_unifi_os:
|
|
return "/proxy/network"
|
|
return ""
|
|
|
|
async def __aenter__(self) -> "UnifiClient":
|
|
await self.connect()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
|
|
await self.close()
|
|
|
|
async def connect(self) -> None:
|
|
"""Initialize session, detect controller type, and authenticate."""
|
|
if self._session is not None:
|
|
return
|
|
|
|
# Create SSL context
|
|
ssl_context = ssl.create_default_context()
|
|
if not self.verify_ssl:
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
|
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
|
self._session = aiohttp.ClientSession(
|
|
connector=connector,
|
|
cookie_jar=aiohttp.CookieJar(unsafe=True),
|
|
)
|
|
|
|
# Detect controller type and authenticate
|
|
await self._detect_controller_type()
|
|
await self._authenticate()
|
|
|
|
async def close(self) -> None:
|
|
"""Close the HTTP session."""
|
|
if self._session:
|
|
await self._session.close()
|
|
self._session = None
|
|
self._authenticated = False
|
|
|
|
async def _detect_controller_type(self) -> None:
|
|
"""
|
|
Detect if this is a UniFi OS controller (UDM, Cloud Gateway, etc.)
|
|
or a standalone Network Controller.
|
|
|
|
UniFi OS uses /proxy/network prefix for Network API calls.
|
|
"""
|
|
# Try UniFi OS path first (more common in modern setups)
|
|
try:
|
|
async with self._session.get(
|
|
f"{self.base_url}/proxy/network/api/s/{self.site}/stat/health",
|
|
timeout=aiohttp.ClientTimeout(total=5),
|
|
) as resp:
|
|
if resp.status in (200, 401, 403):
|
|
# Path exists, this is UniFi OS
|
|
self._is_unifi_os = True
|
|
logger.info("Detected UniFi OS controller (proxy path)")
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# Try direct path (standalone controller)
|
|
try:
|
|
async with self._session.get(
|
|
f"{self.base_url}/api/s/{self.site}/stat/health",
|
|
timeout=aiohttp.ClientTimeout(total=5),
|
|
) as resp:
|
|
if resp.status in (200, 401, 403):
|
|
self._is_unifi_os = False
|
|
logger.info("Detected standalone UniFi controller (direct path)")
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# Default to UniFi OS if detection fails
|
|
self._is_unifi_os = True
|
|
logger.warning("Controller type detection failed, defaulting to UniFi OS")
|
|
|
|
async def _authenticate(self) -> None:
|
|
"""Authenticate with the UniFi controller.
|
|
|
|
UniFi OS (UDM, Cloud Gateway) uses /api/auth/login
|
|
Standalone controllers use /api/login
|
|
"""
|
|
if self._is_unifi_os:
|
|
# UniFi OS uses a different auth endpoint (no api_prefix needed)
|
|
login_url = f"{self.base_url}/api/auth/login"
|
|
else:
|
|
# Standalone controller
|
|
login_url = f"{self.base_url}/api/login"
|
|
|
|
payload = {
|
|
"username": self.username,
|
|
"password": self.password,
|
|
}
|
|
|
|
try:
|
|
async with self._session.post(
|
|
login_url,
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=10),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
self._authenticated = True
|
|
# Extract CSRF token if present (UniFi OS)
|
|
if "x-csrf-token" in resp.headers:
|
|
self._csrf_token = resp.headers["x-csrf-token"]
|
|
logger.info(f"Successfully authenticated to {self.host}")
|
|
elif resp.status == 401:
|
|
raise UnifiAuthError("Invalid username or password")
|
|
else:
|
|
text = await resp.text()
|
|
raise UnifiAuthError(
|
|
f"Authentication failed: {resp.status} - {text}"
|
|
)
|
|
except aiohttp.ClientError as e:
|
|
raise UnifiClientError(f"Connection error during authentication: {e}")
|
|
|
|
async def ping(self) -> bool:
|
|
"""
|
|
Check if the connection to the controller is alive.
|
|
Returns True if connected, raises exception otherwise.
|
|
"""
|
|
if not self._authenticated:
|
|
raise UnifiClientError("Not authenticated")
|
|
|
|
try:
|
|
result = await self.request("GET", f"/api/s/{self.site}/stat/health")
|
|
return True
|
|
except Exception as e:
|
|
raise UnifiClientError(f"Controller ping failed: {e}")
|
|
|
|
async def request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
payload: Optional[dict] = None,
|
|
_reauth_attempt: bool = False,
|
|
) -> Any:
|
|
"""
|
|
Make an authenticated request to the UniFi API.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, PUT, DELETE, PATCH)
|
|
endpoint: API endpoint (e.g., /api/s/default/stat/sta)
|
|
payload: Optional request body for POST/PUT requests
|
|
|
|
Returns:
|
|
Parsed JSON response data
|
|
|
|
Raises:
|
|
UnifiWriteBlockedError: If write operation attempted without permission
|
|
UnifiClientError: For other API errors
|
|
"""
|
|
if not self._session:
|
|
raise UnifiClientError(
|
|
"Client not connected. Use 'async with' or call connect()"
|
|
)
|
|
|
|
# Check write protection
|
|
method_upper = method.upper()
|
|
if method_upper in WRITE_METHODS and not self.allow_writes:
|
|
raise UnifiWriteBlockedError(
|
|
f"Write operation ({method_upper}) blocked. "
|
|
"Set UNIFI_ALLOW_WRITES=true to enable write operations."
|
|
)
|
|
|
|
# Build full URL with correct prefix
|
|
url = f"{self.base_url}{self.api_prefix}{endpoint}"
|
|
|
|
# Build headers
|
|
headers = {}
|
|
if self._csrf_token:
|
|
headers["x-csrf-token"] = self._csrf_token
|
|
|
|
try:
|
|
async with self._session.request(
|
|
method_upper,
|
|
url,
|
|
json=payload,
|
|
headers=headers,
|
|
timeout=aiohttp.ClientTimeout(total=30),
|
|
) as resp:
|
|
text = await resp.text()
|
|
|
|
if resp.status == 401:
|
|
# Session expired, try to re-authenticate (once only)
|
|
if _reauth_attempt:
|
|
raise UnifiAuthError(
|
|
"Re-authentication failed. Check credentials or account status."
|
|
)
|
|
await self._authenticate()
|
|
return await self.request(
|
|
method, endpoint, payload, _reauth_attempt=True
|
|
)
|
|
|
|
if resp.status >= 400:
|
|
raise UnifiClientError(f"API error {resp.status}: {text}")
|
|
|
|
try:
|
|
data = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return {"raw": text}
|
|
|
|
# UniFi API returns {"meta": {"rc": "ok"}, "data": [...]}
|
|
if isinstance(data, dict):
|
|
meta = data.get("meta", {})
|
|
if meta.get("rc") == "error":
|
|
raise UnifiClientError(
|
|
f"API error: {meta.get('msg', 'Unknown error')}"
|
|
)
|
|
if "data" in data:
|
|
return data["data"]
|
|
|
|
return data
|
|
|
|
except aiohttp.ClientError as e:
|
|
raise UnifiClientError(f"Request failed: {e}")
|
|
|
|
async def get_clients(self) -> list:
|
|
"""Get all connected clients."""
|
|
return await self.request("GET", f"/api/s/{self.site}/stat/sta")
|
|
|
|
async def get_devices(self) -> list:
|
|
"""Get all network devices."""
|
|
return await self.request("GET", f"/api/s/{self.site}/stat/device")
|
|
|
|
async def get_system_info(self) -> list:
|
|
"""Get system information."""
|
|
return await self.request("GET", f"/api/s/{self.site}/stat/sysinfo")
|
|
|
|
async def get_health(self) -> list:
|
|
"""Get network health status."""
|
|
return await self.request("GET", f"/api/s/{self.site}/stat/health")
|