All checks were successful
Build and Push Plex MCP Docker Image / build (push) Successful in 8s
FastMCP requires its lifespan context to be passed to the parent ASGI app to initialize the StreamableHTTPSessionManager task group. This fix nests mcp_app.lifespan() inside our custom lifespan to ensure proper initialization while maintaining our custom startup/shutdown logic. Resolves: RuntimeError: Task group is not initialized
456 lines
15 KiB
Python
456 lines
15 KiB
Python
"""
|
|
Plex MCP Server - A lightweight MCP server for Plex Media Server API.
|
|
|
|
Follows the Hybrid MCP Light pattern:
|
|
- 5 specific tools for common operations
|
|
- 1 API pass-through for full API coverage
|
|
- Documentation resources for AI agent reference
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastmcp import FastMCP
|
|
from starlette.applications import Starlette
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Mount, Route
|
|
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
logging.basicConfig(
|
|
level=getattr(logging, LOG_LEVEL, logging.INFO),
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
logger = logging.getLogger("plex-mcp")
|
|
|
|
# Configuration
|
|
PLEX_URL = os.getenv("PLEX_URL", "http://localhost:32400")
|
|
PLEX_TOKEN = os.getenv("PLEX_TOKEN", "")
|
|
PLEX_CLIENT_ID = os.getenv("PLEX_CLIENT_ID", "plex-mcp-server")
|
|
PORT = int(os.getenv("PORT", "8000"))
|
|
|
|
|
|
def validate_config() -> None:
|
|
"""Validate required configuration on startup."""
|
|
errors = []
|
|
|
|
if not PLEX_TOKEN:
|
|
errors.append("PLEX_TOKEN environment variable is required")
|
|
|
|
if not PLEX_URL:
|
|
errors.append("PLEX_URL environment variable is required")
|
|
|
|
if errors:
|
|
for error in errors:
|
|
logger.error(f"Configuration error: {error}")
|
|
raise ValueError(f"Configuration validation failed: {'; '.join(errors)}")
|
|
|
|
|
|
# Paths
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
OPENAPI_PATH = SCRIPT_DIR / "openapi.json"
|
|
API_REFERENCE_PATH = SCRIPT_DIR / "docs" / "api_reference.md"
|
|
|
|
|
|
class PlexClient:
|
|
"""HTTP client for Plex Media Server API."""
|
|
|
|
def __init__(self, base_url: str, token: str, client_id: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
self.client_id = client_id
|
|
self._client: Optional[httpx.AsyncClient] = None
|
|
logger.debug(f"PlexClient initialized for {self.base_url}")
|
|
|
|
@property
|
|
def headers(self) -> dict:
|
|
return {
|
|
"X-Plex-Token": self.token,
|
|
"X-Plex-Client-Identifier": self.client_id,
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
async def get_client(self) -> httpx.AsyncClient:
|
|
if self._client is None or self._client.is_closed:
|
|
logger.debug("Creating new HTTP client")
|
|
self._client = httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
headers=self.headers,
|
|
timeout=30.0,
|
|
)
|
|
return self._client
|
|
|
|
async def close(self):
|
|
if self._client and not self._client.is_closed:
|
|
logger.debug("Closing HTTP client")
|
|
await self._client.aclose()
|
|
|
|
async def request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
params: Optional[dict] = None,
|
|
body: Optional[dict] = None,
|
|
) -> dict:
|
|
"""Execute an API request to Plex."""
|
|
client = await self.get_client()
|
|
logger.info(f"Plex API request: {method.upper()} {endpoint}")
|
|
if params:
|
|
logger.debug(f"Request params: {params}")
|
|
try:
|
|
response = await client.request(
|
|
method=method.upper(),
|
|
url=endpoint,
|
|
params=params,
|
|
json=body if body else None,
|
|
)
|
|
response.raise_for_status()
|
|
logger.debug(f"Response status: {response.status_code}")
|
|
|
|
# Handle empty responses
|
|
if not response.content:
|
|
return {"status": "ok", "statusCode": response.status_code}
|
|
|
|
return response.json()
|
|
except httpx.HTTPStatusError as e:
|
|
logger.warning(
|
|
f"Plex API HTTP error: {e.response.status_code} for {method.upper()} {endpoint}"
|
|
)
|
|
return {
|
|
"error": True,
|
|
"statusCode": e.response.status_code,
|
|
"message": str(e),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Plex API error: {e}", exc_info=True)
|
|
return {"error": True, "message": str(e)}
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Check if connection to Plex is working."""
|
|
try:
|
|
result = await self.request("GET", "/identity")
|
|
is_healthy = "error" not in result
|
|
logger.debug(f"Health check result: {is_healthy}")
|
|
return is_healthy
|
|
except Exception as e:
|
|
logger.warning(f"Health check failed: {e}")
|
|
return False
|
|
|
|
|
|
# Initialize client and MCP server
|
|
plex_client = PlexClient(PLEX_URL, PLEX_TOKEN, PLEX_CLIENT_ID)
|
|
mcp = FastMCP(
|
|
name="Plex MCP Server",
|
|
instructions="MCP server for interacting with Plex Media Server",
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Specific Tools (5 high-value operations)
|
|
# =============================================================================
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_libraries() -> str:
|
|
"""List all Plex library sections (Movies, TV Shows, Music, etc.).
|
|
|
|
Returns a list of all libraries with their IDs, names, types, and item counts.
|
|
Use the section ID from this response with other tools like search_library or refresh_library.
|
|
"""
|
|
result = await plex_client.request("GET", "/library/sections/all")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_library(
|
|
query: str, limit: int = 10, section_id: Optional[int] = None
|
|
) -> str:
|
|
"""Search for media across all libraries or within a specific library.
|
|
|
|
Args:
|
|
query: Search term (title, artist, etc.)
|
|
limit: Maximum results per media type (default: 10)
|
|
section_id: Optional library section ID to restrict search
|
|
|
|
Returns search results grouped by media type (movies, shows, episodes, etc.)
|
|
"""
|
|
params = {"query": query, "limit": limit}
|
|
if section_id is not None:
|
|
params["sectionId"] = section_id
|
|
|
|
result = await plex_client.request("GET", "/hubs/search", params=params)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_metadata(rating_key: str, include_children: bool = False) -> str:
|
|
"""Get detailed metadata for a specific media item.
|
|
|
|
Args:
|
|
rating_key: The unique identifier (ratingKey) of the media item
|
|
include_children: If True, include child items (seasons for shows, episodes for seasons)
|
|
|
|
Returns detailed metadata including title, summary, ratings, cast, and more.
|
|
"""
|
|
endpoint = f"/library/metadata/{rating_key}"
|
|
params = {}
|
|
if include_children:
|
|
params["includeChildren"] = 1
|
|
|
|
result = await plex_client.request("GET", endpoint, params=params)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_recently_added(section_id: Optional[int] = None, limit: int = 20) -> str:
|
|
"""Get recently added media items.
|
|
|
|
Args:
|
|
section_id: Optional library section ID to filter results
|
|
limit: Maximum number of items to return (default: 20)
|
|
|
|
Returns recently added items across all libraries or for a specific library.
|
|
"""
|
|
if section_id is not None:
|
|
# Get recently added for specific library via hubs
|
|
endpoint = f"/hubs/sections/{section_id}"
|
|
else:
|
|
# Get global recently added
|
|
endpoint = "/hubs"
|
|
|
|
params = {"X-Plex-Container-Size": limit}
|
|
result = await plex_client.request("GET", endpoint, params=params)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def refresh_library(
|
|
section_id: int,
|
|
force: bool = False,
|
|
path: Optional[str] = None,
|
|
) -> str:
|
|
"""Trigger a library scan to detect new or changed media files.
|
|
|
|
Args:
|
|
section_id: The library section ID to refresh (get from get_libraries)
|
|
force: If True, force metadata refresh even if files appear unchanged
|
|
path: Optional path to restrict the scan to a specific directory
|
|
|
|
Returns confirmation of the refresh request.
|
|
"""
|
|
endpoint = f"/library/sections/{section_id}/refresh"
|
|
params = {}
|
|
if force:
|
|
params["force"] = 1
|
|
if path:
|
|
params["path"] = path
|
|
|
|
result = await plex_client.request("POST", endpoint, params=params)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# =============================================================================
|
|
# API Pass-through Tool
|
|
# =============================================================================
|
|
|
|
|
|
@mcp.tool()
|
|
async def plex_api_call(
|
|
endpoint: str,
|
|
method: str = "GET",
|
|
params: str = "{}",
|
|
body: str = "{}",
|
|
) -> str:
|
|
"""Execute any Plex API call directly.
|
|
|
|
This is the escape hatch for accessing any Plex API endpoint not covered
|
|
by the specific tools. Refer to the 'plex://api-reference' resource or
|
|
use search_api_docs() to find available endpoints.
|
|
|
|
Args:
|
|
endpoint: API path (e.g., '/playlists', '/:scrobble', '/library/metadata/123')
|
|
method: HTTP method (GET, POST, PUT, DELETE)
|
|
params: JSON string of query parameters (e.g., '{"query": "test", "limit": 10}')
|
|
body: JSON string of request body for POST/PUT requests
|
|
|
|
Returns the API response as JSON.
|
|
|
|
Examples:
|
|
- Mark item watched: plex_api_call('/:scrobble', params='{"key": "12345", "identifier": "com.plexapp.plugins.library"}')
|
|
- Get playlists: plex_api_call('/playlists')
|
|
- Rate item: plex_api_call('/:rate', 'PUT', params='{"key": "12345", "identifier": "com.plexapp.plugins.library", "rating": 8}')
|
|
"""
|
|
try:
|
|
parsed_params = json.loads(params) if params and params != "{}" else None
|
|
parsed_body = json.loads(body) if body and body != "{}" else None
|
|
except json.JSONDecodeError as e:
|
|
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
|
|
|
|
result = await plex_client.request(
|
|
method, endpoint, params=parsed_params, body=parsed_body
|
|
)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# =============================================================================
|
|
# Documentation Tools & Resources
|
|
# =============================================================================
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_api_docs(query: str, limit: int = 20) -> str:
|
|
"""Search the Plex OpenAPI specification for endpoints matching a query.
|
|
|
|
Args:
|
|
query: Search term to find in endpoint paths, summaries, or descriptions
|
|
limit: Maximum number of results to return (default: 20)
|
|
|
|
Returns matching endpoints with their methods, summaries, and parameters.
|
|
"""
|
|
if not OPENAPI_PATH.exists():
|
|
return json.dumps({"error": True, "message": "OpenAPI spec not found"})
|
|
|
|
try:
|
|
with open(OPENAPI_PATH) as f:
|
|
spec = json.load(f)
|
|
except Exception as e:
|
|
return json.dumps(
|
|
{"error": True, "message": f"Failed to load OpenAPI spec: {e}"}
|
|
)
|
|
|
|
query_lower = query.lower()
|
|
results = []
|
|
|
|
for path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method.startswith("x-"): # Skip OpenAPI extensions
|
|
continue
|
|
|
|
# Search in path, summary, description, and tags
|
|
searchable = " ".join(
|
|
[
|
|
path,
|
|
details.get("summary", ""),
|
|
details.get("description", ""),
|
|
" ".join(details.get("tags", [])),
|
|
]
|
|
).lower()
|
|
|
|
if query_lower in searchable:
|
|
# Extract parameter info
|
|
params = []
|
|
for param in details.get("parameters", []):
|
|
params.append(
|
|
{
|
|
"name": param.get("name"),
|
|
"in": param.get("in"),
|
|
"required": param.get("required", False),
|
|
"description": param.get("description", ""),
|
|
}
|
|
)
|
|
|
|
results.append(
|
|
{
|
|
"path": path,
|
|
"method": method.upper(),
|
|
"summary": details.get("summary", ""),
|
|
"description": details.get("description", ""),
|
|
"tags": details.get("tags", []),
|
|
"parameters": params,
|
|
}
|
|
)
|
|
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
return json.dumps(
|
|
{
|
|
"query": query,
|
|
"count": len(results),
|
|
"results": results,
|
|
},
|
|
indent=2,
|
|
)
|
|
|
|
|
|
@mcp.resource("plex://api-reference")
|
|
async def get_api_reference() -> str:
|
|
"""Get the curated Plex API quick reference documentation.
|
|
|
|
This resource provides a human-readable guide to the most commonly used
|
|
Plex API endpoints, organized by category. Use this to understand how
|
|
to use the plex_api_call tool for operations not covered by specific tools.
|
|
"""
|
|
if not API_REFERENCE_PATH.exists():
|
|
return "API reference documentation not found."
|
|
|
|
return API_REFERENCE_PATH.read_text()
|
|
|
|
|
|
# =============================================================================
|
|
# Health Check & Application Setup
|
|
# =============================================================================
|
|
|
|
|
|
async def health_check(request):
|
|
"""Health check endpoint for Docker/orchestration."""
|
|
is_healthy = await plex_client.health_check()
|
|
status = "ok" if is_healthy else "degraded"
|
|
logger.debug(f"Health check endpoint called: status={status}")
|
|
return JSONResponse(
|
|
{"status": status, "plex_connected": is_healthy},
|
|
status_code=200 if is_healthy else 503,
|
|
)
|
|
|
|
|
|
def create_app() -> Starlette:
|
|
"""Create the Starlette application wrapping the MCP server."""
|
|
mcp_app = mcp.http_app()
|
|
|
|
# Combine our custom lifespan with MCP's lifespan
|
|
@asynccontextmanager
|
|
async def combined_lifespan(app):
|
|
# Our startup logic
|
|
logger.info("Plex MCP Server starting up")
|
|
# Nest MCP's lifespan inside ours
|
|
async with mcp_app.lifespan(app):
|
|
yield
|
|
# Our shutdown logic
|
|
logger.info("Plex MCP Server shutting down")
|
|
await plex_client.close()
|
|
|
|
return Starlette(
|
|
routes=[
|
|
Route("/health", health_check),
|
|
Mount("/", app=mcp_app),
|
|
],
|
|
lifespan=combined_lifespan,
|
|
)
|
|
|
|
|
|
app = create_app()
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
# Validate configuration before starting
|
|
validate_config()
|
|
|
|
logger.info(f"Starting Plex MCP Server on port {PORT}")
|
|
logger.info(f"Plex URL: {PLEX_URL}")
|
|
logger.info(f"Health check: http://localhost:{PORT}/health")
|
|
uvicorn.run(app, host="0.0.0.0", port=PORT)
|