""" Plex MCP Server - A lightweight MCP server for Plex Media Server API. Follows the Hybrid MCP Light pattern: - 5 specific tools for common operations - 1 API pass-through for full API coverage - Documentation resources for AI agent reference """ import json import os from pathlib import Path from typing import Optional import httpx from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Mount, Route load_dotenv() # Configuration PLEX_URL = os.getenv("PLEX_URL", "http://localhost:32400") PLEX_TOKEN = os.getenv("PLEX_TOKEN", "") PLEX_CLIENT_ID = os.getenv("PLEX_CLIENT_ID", "plex-mcp-server") PORT = int(os.getenv("PORT", "8000")) # Paths SCRIPT_DIR = Path(__file__).parent OPENAPI_PATH = SCRIPT_DIR / "openapi.json" API_REFERENCE_PATH = SCRIPT_DIR / "docs" / "api_reference.md" class PlexClient: """HTTP client for Plex Media Server API.""" def __init__(self, base_url: str, token: str, client_id: str): self.base_url = base_url.rstrip("/") self.token = token self.client_id = client_id self._client: Optional[httpx.AsyncClient] = None @property def headers(self) -> dict: return { "X-Plex-Token": self.token, "X-Plex-Client-Identifier": self.client_id, "Accept": "application/json", } async def get_client(self) -> httpx.AsyncClient: if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( base_url=self.base_url, headers=self.headers, timeout=30.0, ) return self._client async def close(self): if self._client and not self._client.is_closed: await self._client.aclose() async def request( self, method: str, endpoint: str, params: Optional[dict] = None, body: Optional[dict] = None, ) -> dict: """Execute an API request to Plex.""" client = await self.get_client() try: response = await client.request( method=method.upper(), url=endpoint, params=params, json=body if body else None, ) response.raise_for_status() # Handle empty responses if not response.content: return {"status": "ok", "statusCode": response.status_code} return response.json() except httpx.HTTPStatusError as e: return { "error": True, "statusCode": e.response.status_code, "message": str(e), } except Exception as e: return {"error": True, "message": str(e)} async def health_check(self) -> bool: """Check if connection to Plex is working.""" try: result = await self.request("GET", "/identity") return "error" not in result except Exception: return False # Initialize client and MCP server plex_client = PlexClient(PLEX_URL, PLEX_TOKEN, PLEX_CLIENT_ID) mcp = FastMCP( "Plex MCP Server", description="MCP server for interacting with Plex Media Server", ) # ============================================================================= # Specific Tools (5 high-value operations) # ============================================================================= @mcp.tool() async def get_libraries() -> str: """List all Plex library sections (Movies, TV Shows, Music, etc.). Returns a list of all libraries with their IDs, names, types, and item counts. Use the section ID from this response with other tools like search_library or refresh_library. """ result = await plex_client.request("GET", "/library/sections/all") return json.dumps(result, indent=2) @mcp.tool() async def search_library( query: str, limit: int = 10, section_id: Optional[int] = None ) -> str: """Search for media across all libraries or within a specific library. Args: query: Search term (title, artist, etc.) limit: Maximum results per media type (default: 10) section_id: Optional library section ID to restrict search Returns search results grouped by media type (movies, shows, episodes, etc.) """ params = {"query": query, "limit": limit} if section_id is not None: params["sectionId"] = section_id result = await plex_client.request("GET", "/hubs/search", params=params) return json.dumps(result, indent=2) @mcp.tool() async def get_metadata(rating_key: str, include_children: bool = False) -> str: """Get detailed metadata for a specific media item. Args: rating_key: The unique identifier (ratingKey) of the media item include_children: If True, include child items (seasons for shows, episodes for seasons) Returns detailed metadata including title, summary, ratings, cast, and more. """ endpoint = f"/library/metadata/{rating_key}" params = {} if include_children: params["includeChildren"] = 1 result = await plex_client.request("GET", endpoint, params=params) return json.dumps(result, indent=2) @mcp.tool() async def get_recently_added(section_id: Optional[int] = None, limit: int = 20) -> str: """Get recently added media items. Args: section_id: Optional library section ID to filter results limit: Maximum number of items to return (default: 20) Returns recently added items across all libraries or for a specific library. """ if section_id is not None: # Get recently added for specific library via hubs endpoint = f"/hubs/sections/{section_id}" else: # Get global recently added endpoint = "/hubs" params = {"X-Plex-Container-Size": limit} result = await plex_client.request("GET", endpoint, params=params) return json.dumps(result, indent=2) @mcp.tool() async def refresh_library( section_id: int, force: bool = False, path: Optional[str] = None, ) -> str: """Trigger a library scan to detect new or changed media files. Args: section_id: The library section ID to refresh (get from get_libraries) force: If True, force metadata refresh even if files appear unchanged path: Optional path to restrict the scan to a specific directory Returns confirmation of the refresh request. """ endpoint = f"/library/sections/{section_id}/refresh" params = {} if force: params["force"] = 1 if path: params["path"] = path result = await plex_client.request("POST", endpoint, params=params) return json.dumps(result, indent=2) # ============================================================================= # API Pass-through Tool # ============================================================================= @mcp.tool() async def plex_api_call( endpoint: str, method: str = "GET", params: str = "{}", body: str = "{}", ) -> str: """Execute any Plex API call directly. This is the escape hatch for accessing any Plex API endpoint not covered by the specific tools. Refer to the 'plex://api-reference' resource or use search_api_docs() to find available endpoints. Args: endpoint: API path (e.g., '/playlists', '/:scrobble', '/library/metadata/123') method: HTTP method (GET, POST, PUT, DELETE) params: JSON string of query parameters (e.g., '{"query": "test", "limit": 10}') body: JSON string of request body for POST/PUT requests Returns the API response as JSON. Examples: - Mark item watched: plex_api_call('/:scrobble', params='{"key": "12345", "identifier": "com.plexapp.plugins.library"}') - Get playlists: plex_api_call('/playlists') - Rate item: plex_api_call('/:rate', 'PUT', params='{"key": "12345", "identifier": "com.plexapp.plugins.library", "rating": 8}') """ try: parsed_params = json.loads(params) if params and params != "{}" else None parsed_body = json.loads(body) if body and body != "{}" else None except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) result = await plex_client.request( method, endpoint, params=parsed_params, body=parsed_body ) return json.dumps(result, indent=2) # ============================================================================= # Documentation Tools & Resources # ============================================================================= @mcp.tool() async def search_api_docs(query: str, limit: int = 20) -> str: """Search the Plex OpenAPI specification for endpoints matching a query. Args: query: Search term to find in endpoint paths, summaries, or descriptions limit: Maximum number of results to return (default: 20) Returns matching endpoints with their methods, summaries, and parameters. """ if not OPENAPI_PATH.exists(): return json.dumps({"error": True, "message": "OpenAPI spec not found"}) try: with open(OPENAPI_PATH) as f: spec = json.load(f) except Exception as e: return json.dumps( {"error": True, "message": f"Failed to load OpenAPI spec: {e}"} ) query_lower = query.lower() results = [] for path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method.startswith("x-"): # Skip OpenAPI extensions continue # Search in path, summary, description, and tags searchable = " ".join( [ path, details.get("summary", ""), details.get("description", ""), " ".join(details.get("tags", [])), ] ).lower() if query_lower in searchable: # Extract parameter info params = [] for param in details.get("parameters", []): params.append( { "name": param.get("name"), "in": param.get("in"), "required": param.get("required", False), "description": param.get("description", ""), } ) results.append( { "path": path, "method": method.upper(), "summary": details.get("summary", ""), "description": details.get("description", ""), "tags": details.get("tags", []), "parameters": params, } ) if len(results) >= limit: break if len(results) >= limit: break return json.dumps( { "query": query, "count": len(results), "results": results, }, indent=2, ) @mcp.resource("plex://api-reference") async def get_api_reference() -> str: """Get the curated Plex API quick reference documentation. This resource provides a human-readable guide to the most commonly used Plex API endpoints, organized by category. Use this to understand how to use the plex_api_call tool for operations not covered by specific tools. """ if not API_REFERENCE_PATH.exists(): return "API reference documentation not found." return API_REFERENCE_PATH.read_text() # ============================================================================= # Health Check & Application Setup # ============================================================================= async def health_check(request): """Health check endpoint for Docker/orchestration.""" is_healthy = await plex_client.health_check() status = "ok" if is_healthy else "degraded" return JSONResponse( {"status": status, "plex_connected": is_healthy}, status_code=200 if is_healthy else 503, ) def create_app() -> Starlette: """Create the Starlette application wrapping the MCP server.""" mcp_app = mcp.http_app() async def lifespan(app): # Startup yield # Shutdown await plex_client.close() return Starlette( routes=[ Route("/health", health_check), Mount("/", app=mcp_app), ], lifespan=lifespan, ) app = create_app() if __name__ == "__main__": import uvicorn print(f"Starting Plex MCP Server on port {PORT}") print(f"Plex URL: {PLEX_URL}") print(f"Health check: http://localhost:{PORT}/health") uvicorn.run(app, host="0.0.0.0", port=PORT)