""" UniFi Controller Client A clean abstraction layer wrapping aiohttp for UniFi API communication. Handles authentication, UniFi OS detection, and write protection. """ import asyncio import json import logging import ssl from typing import Any, Optional import aiohttp logger = logging.getLogger(__name__) # HTTP methods that modify state WRITE_METHODS = frozenset({"POST", "PUT", "DELETE", "PATCH"}) class UnifiClientError(Exception): """Base exception for UniFi client errors.""" pass class UnifiAuthError(UnifiClientError): """Authentication failed.""" pass class UnifiWriteBlockedError(UnifiClientError): """Write operation blocked by configuration.""" pass class UnifiClient: """ Async client for UniFi Network Controller API. Features: - Auto-detects UniFi OS vs standalone controller paths - Cookie-based session authentication - Write protection (configurable) - Proper async context manager support """ def __init__( self, host: str, username: str, password: str, port: int = 443, site: str = "default", verify_ssl: bool = False, allow_writes: bool = False, ): self.host = host self.username = username self.password = password self.port = port self.site = site self.verify_ssl = verify_ssl self.allow_writes = allow_writes self._session: Optional[aiohttp.ClientSession] = None self._is_unifi_os: Optional[bool] = None self._csrf_token: Optional[str] = None self._authenticated = False @property def base_url(self) -> str: return f"https://{self.host}:{self.port}" @property def api_prefix(self) -> str: """Return the correct API prefix based on controller type.""" if self._is_unifi_os: return "/proxy/network" return "" async def __aenter__(self) -> "UnifiClient": await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.close() async def connect(self) -> None: """Initialize session, detect controller type, and authenticate.""" if self._session is not None: return # Create SSL context ssl_context = ssl.create_default_context() if not self.verify_ssl: ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE connector = aiohttp.TCPConnector(ssl=ssl_context) self._session = aiohttp.ClientSession( connector=connector, cookie_jar=aiohttp.CookieJar(unsafe=True), ) # Detect controller type and authenticate await self._detect_controller_type() await self._authenticate() async def close(self) -> None: """Close the HTTP session.""" if self._session: await self._session.close() self._session = None self._authenticated = False async def _detect_controller_type(self) -> None: """ Detect if this is a UniFi OS controller (UDM, Cloud Gateway, etc.) or a standalone Network Controller. UniFi OS uses /proxy/network prefix for Network API calls. """ # Try UniFi OS path first (more common in modern setups) try: async with self._session.get( f"{self.base_url}/proxy/network/api/s/{self.site}/stat/health", timeout=aiohttp.ClientTimeout(total=5), ) as resp: if resp.status in (200, 401, 403): # Path exists, this is UniFi OS self._is_unifi_os = True logger.info("Detected UniFi OS controller (proxy path)") return except Exception: pass # Try direct path (standalone controller) try: async with self._session.get( f"{self.base_url}/api/s/{self.site}/stat/health", timeout=aiohttp.ClientTimeout(total=5), ) as resp: if resp.status in (200, 401, 403): self._is_unifi_os = False logger.info("Detected standalone UniFi controller (direct path)") return except Exception: pass # Default to UniFi OS if detection fails self._is_unifi_os = True logger.warning("Controller type detection failed, defaulting to UniFi OS") async def _authenticate(self) -> None: """Authenticate with the UniFi controller. UniFi OS (UDM, Cloud Gateway) uses /api/auth/login Standalone controllers use /api/login """ if self._is_unifi_os: # UniFi OS uses a different auth endpoint (no api_prefix needed) login_url = f"{self.base_url}/api/auth/login" else: # Standalone controller login_url = f"{self.base_url}/api/login" payload = { "username": self.username, "password": self.password, } try: async with self._session.post( login_url, json=payload, timeout=aiohttp.ClientTimeout(total=10), ) as resp: if resp.status == 200: self._authenticated = True # Extract CSRF token if present (UniFi OS) if "x-csrf-token" in resp.headers: self._csrf_token = resp.headers["x-csrf-token"] logger.info(f"Successfully authenticated to {self.host}") elif resp.status == 401: raise UnifiAuthError("Invalid username or password") else: text = await resp.text() raise UnifiAuthError( f"Authentication failed: {resp.status} - {text}" ) except aiohttp.ClientError as e: raise UnifiClientError(f"Connection error during authentication: {e}") async def ping(self) -> bool: """ Check if the connection to the controller is alive. Returns True if connected, raises exception otherwise. """ if not self._authenticated: raise UnifiClientError("Not authenticated") try: result = await self.request("GET", f"/api/s/{self.site}/stat/health") return True except Exception as e: raise UnifiClientError(f"Controller ping failed: {e}") async def request( self, method: str, endpoint: str, payload: Optional[dict] = None, _reauth_attempt: bool = False, ) -> Any: """ Make an authenticated request to the UniFi API. Args: method: HTTP method (GET, POST, PUT, DELETE, PATCH) endpoint: API endpoint (e.g., /api/s/default/stat/sta) payload: Optional request body for POST/PUT requests Returns: Parsed JSON response data Raises: UnifiWriteBlockedError: If write operation attempted without permission UnifiClientError: For other API errors """ if not self._session: raise UnifiClientError( "Client not connected. Use 'async with' or call connect()" ) # Check write protection method_upper = method.upper() if method_upper in WRITE_METHODS and not self.allow_writes: raise UnifiWriteBlockedError( f"Write operation ({method_upper}) blocked. " "Set UNIFI_ALLOW_WRITES=true to enable write operations." ) # Build full URL with correct prefix url = f"{self.base_url}{self.api_prefix}{endpoint}" # Build headers headers = {} if self._csrf_token: headers["x-csrf-token"] = self._csrf_token try: async with self._session.request( method_upper, url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30), ) as resp: text = await resp.text() if resp.status == 401: # Session expired, try to re-authenticate (once only) if _reauth_attempt: raise UnifiAuthError( "Re-authentication failed. Check credentials or account status." ) await self._authenticate() return await self.request( method, endpoint, payload, _reauth_attempt=True ) if resp.status >= 400: raise UnifiClientError(f"API error {resp.status}: {text}") try: data = json.loads(text) except json.JSONDecodeError: return {"raw": text} # UniFi API returns {"meta": {"rc": "ok"}, "data": [...]} if isinstance(data, dict): meta = data.get("meta", {}) if meta.get("rc") == "error": raise UnifiClientError( f"API error: {meta.get('msg', 'Unknown error')}" ) if "data" in data: return data["data"] return data except aiohttp.ClientError as e: raise UnifiClientError(f"Request failed: {e}") async def get_clients(self) -> list: """Get all connected clients.""" return await self.request("GET", f"/api/s/{self.site}/stat/sta") async def get_devices(self) -> list: """Get all network devices.""" return await self.request("GET", f"/api/s/{self.site}/stat/device") async def get_system_info(self) -> list: """Get system information.""" return await self.request("GET", f"/api/s/{self.site}/stat/sysinfo") async def get_health(self) -> list: """Get network health status.""" return await self.request("GET", f"/api/s/{self.site}/stat/health")