""" Plex MCP Server - A lightweight MCP server for Plex Media Server API. Follows the Hybrid MCP Light pattern: - 5 specific tools for common operations - 1 API pass-through for full API coverage - Documentation resources for AI agent reference """ import json import logging import os import sys from contextlib import asynccontextmanager from pathlib import Path from typing import Optional import httpx from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Mount, Route load_dotenv() # Configure logging LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stdout, ) logger = logging.getLogger("plex-mcp") # Configuration PLEX_URL = os.getenv("PLEX_URL", "http://localhost:32400") PLEX_TOKEN = os.getenv("PLEX_TOKEN", "") PLEX_CLIENT_ID = os.getenv("PLEX_CLIENT_ID", "plex-mcp-server") PORT = int(os.getenv("PORT", "8000")) def validate_config() -> None: """Validate required configuration on startup.""" errors = [] if not PLEX_TOKEN: errors.append("PLEX_TOKEN environment variable is required") if not PLEX_URL: errors.append("PLEX_URL environment variable is required") if errors: for error in errors: logger.error(f"Configuration error: {error}") raise ValueError(f"Configuration validation failed: {'; '.join(errors)}") # Paths SCRIPT_DIR = Path(__file__).parent OPENAPI_PATH = SCRIPT_DIR / "openapi.json" API_REFERENCE_PATH = SCRIPT_DIR / "docs" / "api_reference.md" class PlexClient: """HTTP client for Plex Media Server API.""" def __init__(self, base_url: str, token: str, client_id: str): self.base_url = base_url.rstrip("/") self.token = token self.client_id = client_id self._client: Optional[httpx.AsyncClient] = None logger.debug(f"PlexClient initialized for {self.base_url}") @property def headers(self) -> dict: return { "X-Plex-Token": self.token, "X-Plex-Client-Identifier": self.client_id, "Accept": "application/json", } async def get_client(self) -> httpx.AsyncClient: if self._client is None or self._client.is_closed: logger.debug("Creating new HTTP client") self._client = httpx.AsyncClient( base_url=self.base_url, headers=self.headers, timeout=30.0, ) return self._client async def close(self): if self._client and not self._client.is_closed: logger.debug("Closing HTTP client") await self._client.aclose() async def request( self, method: str, endpoint: str, params: Optional[dict] = None, body: Optional[dict] = None, ) -> dict: """Execute an API request to Plex.""" client = await self.get_client() logger.info(f"Plex API request: {method.upper()} {endpoint}") if params: logger.debug(f"Request params: {params}") try: response = await client.request( method=method.upper(), url=endpoint, params=params, json=body if body else None, ) response.raise_for_status() logger.debug(f"Response status: {response.status_code}") # Handle empty responses if not response.content: return {"status": "ok", "statusCode": response.status_code} return response.json() except httpx.HTTPStatusError as e: logger.warning( f"Plex API HTTP error: {e.response.status_code} for {method.upper()} {endpoint}" ) return { "error": True, "statusCode": e.response.status_code, "message": str(e), } except Exception as e: logger.error(f"Plex API error: {e}", exc_info=True) return {"error": True, "message": str(e)} async def health_check(self) -> bool: """Check if connection to Plex is working.""" try: result = await self.request("GET", "/identity") is_healthy = "error" not in result logger.debug(f"Health check result: {is_healthy}") return is_healthy except Exception as e: logger.warning(f"Health check failed: {e}") return False # Initialize client and MCP server plex_client = PlexClient(PLEX_URL, PLEX_TOKEN, PLEX_CLIENT_ID) mcp = FastMCP( name="Plex MCP Server", instructions="MCP server for interacting with Plex Media Server", ) # ============================================================================= # Specific Tools (5 high-value operations) # ============================================================================= @mcp.tool() async def get_libraries() -> str: """List all Plex library sections (Movies, TV Shows, Music, etc.). Returns a list of all libraries with their IDs, names, types, and item counts. Use the section ID from this response with other tools like search_library or refresh_library. """ result = await plex_client.request("GET", "/library/sections/all") return json.dumps(result, indent=2) @mcp.tool() async def search_library( query: str, limit: int = 10, section_id: Optional[int] = None ) -> str: """Search for media across all libraries or within a specific library. Args: query: Search term (title, artist, etc.) limit: Maximum results per media type (default: 10) section_id: Optional library section ID to restrict search Returns search results grouped by media type (movies, shows, episodes, etc.) """ params = {"query": query, "limit": limit} if section_id is not None: params["sectionId"] = section_id result = await plex_client.request("GET", "/hubs/search", params=params) return json.dumps(result, indent=2) @mcp.tool() async def get_metadata(rating_key: str, include_children: bool = False) -> str: """Get detailed metadata for a specific media item. Args: rating_key: The unique identifier (ratingKey) of the media item include_children: If True, include child items (seasons for shows, episodes for seasons) Returns detailed metadata including title, summary, ratings, cast, and more. """ endpoint = f"/library/metadata/{rating_key}" params = {} if include_children: params["includeChildren"] = 1 result = await plex_client.request("GET", endpoint, params=params) return json.dumps(result, indent=2) @mcp.tool() async def get_recently_added(section_id: Optional[int] = None, limit: int = 20) -> str: """Get recently added media items. Args: section_id: Optional library section ID to filter results limit: Maximum number of items to return (default: 20) Returns recently added items across all libraries or for a specific library. """ if section_id is not None: # Get recently added for specific library via hubs endpoint = f"/hubs/sections/{section_id}" else: # Get global recently added endpoint = "/hubs" params = {"X-Plex-Container-Size": limit} result = await plex_client.request("GET", endpoint, params=params) return json.dumps(result, indent=2) @mcp.tool() async def refresh_library( section_id: int, force: bool = False, path: Optional[str] = None, ) -> str: """Trigger a library scan to detect new or changed media files. Args: section_id: The library section ID to refresh (get from get_libraries) force: If True, force metadata refresh even if files appear unchanged path: Optional path to restrict the scan to a specific directory Returns confirmation of the refresh request. """ endpoint = f"/library/sections/{section_id}/refresh" params = {} if force: params["force"] = 1 if path: params["path"] = path result = await plex_client.request("POST", endpoint, params=params) return json.dumps(result, indent=2) # ============================================================================= # API Pass-through Tool # ============================================================================= @mcp.tool() async def plex_api_call( endpoint: str, method: str = "GET", params: str = "{}", body: str = "{}", ) -> str: """Execute any Plex API call directly. This is the escape hatch for accessing any Plex API endpoint not covered by the specific tools. Refer to the 'plex://api-reference' resource or use search_api_docs() to find available endpoints. Args: endpoint: API path (e.g., '/playlists', '/:scrobble', '/library/metadata/123') method: HTTP method (GET, POST, PUT, DELETE) params: JSON string of query parameters (e.g., '{"query": "test", "limit": 10}') body: JSON string of request body for POST/PUT requests Returns the API response as JSON. Examples: - Mark item watched: plex_api_call('/:scrobble', params='{"key": "12345", "identifier": "com.plexapp.plugins.library"}') - Get playlists: plex_api_call('/playlists') - Rate item: plex_api_call('/:rate', 'PUT', params='{"key": "12345", "identifier": "com.plexapp.plugins.library", "rating": 8}') """ try: parsed_params = json.loads(params) if params and params != "{}" else None parsed_body = json.loads(body) if body and body != "{}" else None except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) result = await plex_client.request( method, endpoint, params=parsed_params, body=parsed_body ) return json.dumps(result, indent=2) # ============================================================================= # Documentation Tools & Resources # ============================================================================= @mcp.tool() async def search_api_docs(query: str, limit: int = 20) -> str: """Search the Plex OpenAPI specification for endpoints matching a query. Args: query: Search term to find in endpoint paths, summaries, or descriptions limit: Maximum number of results to return (default: 20) Returns matching endpoints with their methods, summaries, and parameters. """ if not OPENAPI_PATH.exists(): return json.dumps({"error": True, "message": "OpenAPI spec not found"}) try: with open(OPENAPI_PATH) as f: spec = json.load(f) except Exception as e: return json.dumps( {"error": True, "message": f"Failed to load OpenAPI spec: {e}"} ) query_lower = query.lower() results = [] for path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method.startswith("x-"): # Skip OpenAPI extensions continue # Search in path, summary, description, and tags searchable = " ".join( [ path, details.get("summary", ""), details.get("description", ""), " ".join(details.get("tags", [])), ] ).lower() if query_lower in searchable: # Extract parameter info params = [] for param in details.get("parameters", []): params.append( { "name": param.get("name"), "in": param.get("in"), "required": param.get("required", False), "description": param.get("description", ""), } ) results.append( { "path": path, "method": method.upper(), "summary": details.get("summary", ""), "description": details.get("description", ""), "tags": details.get("tags", []), "parameters": params, } ) if len(results) >= limit: break if len(results) >= limit: break return json.dumps( { "query": query, "count": len(results), "results": results, }, indent=2, ) @mcp.resource("plex://api-reference") async def get_api_reference() -> str: """Get the curated Plex API quick reference documentation. This resource provides a human-readable guide to the most commonly used Plex API endpoints, organized by category. Use this to understand how to use the plex_api_call tool for operations not covered by specific tools. """ if not API_REFERENCE_PATH.exists(): return "API reference documentation not found." return API_REFERENCE_PATH.read_text() # ============================================================================= # Health Check & Application Setup # ============================================================================= async def health_check(request): """Health check endpoint for Docker/orchestration.""" is_healthy = await plex_client.health_check() status = "ok" if is_healthy else "degraded" logger.debug(f"Health check endpoint called: status={status}") return JSONResponse( {"status": status, "plex_connected": is_healthy}, status_code=200 if is_healthy else 503, ) def create_app() -> Starlette: """Create the Starlette application wrapping the MCP server.""" mcp_app = mcp.http_app() # Combine our custom lifespan with MCP's lifespan @asynccontextmanager async def combined_lifespan(app): # Our startup logic logger.info("Plex MCP Server starting up") # Nest MCP's lifespan inside ours async with mcp_app.lifespan(app): yield # Our shutdown logic logger.info("Plex MCP Server shutting down") await plex_client.close() return Starlette( routes=[ Route("/health", health_check), Mount("/", app=mcp_app), ], lifespan=combined_lifespan, ) app = create_app() if __name__ == "__main__": import uvicorn # Validate configuration before starting validate_config() logger.info(f"Starting Plex MCP Server on port {PORT}") logger.info(f"Plex URL: {PLEX_URL}") logger.info(f"Health check: http://localhost:{PORT}/health") uvicorn.run(app, host="0.0.0.0", port=PORT)