Files
plex-mcp/server.py
Ben 8fc4f00d27
All checks were successful
Build and Push Plex MCP Docker Image / build (push) Successful in 8s
fix: properly initialize FastMCP lifespan in Starlette app
FastMCP requires its lifespan context to be passed to the parent ASGI app
to initialize the StreamableHTTPSessionManager task group. This fix nests
mcp_app.lifespan() inside our custom lifespan to ensure proper initialization
while maintaining our custom startup/shutdown logic.

Resolves: RuntimeError: Task group is not initialized
2025-12-28 21:00:51 +00:00

456 lines
15 KiB
Python

"""
Plex MCP Server - A lightweight MCP server for Plex Media Server API.
Follows the Hybrid MCP Light pattern:
- 5 specific tools for common operations
- 1 API pass-through for full API coverage
- Documentation resources for AI agent reference
"""
import json
import logging
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
load_dotenv()
# Configure logging
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("plex-mcp")
# Configuration
PLEX_URL = os.getenv("PLEX_URL", "http://localhost:32400")
PLEX_TOKEN = os.getenv("PLEX_TOKEN", "")
PLEX_CLIENT_ID = os.getenv("PLEX_CLIENT_ID", "plex-mcp-server")
PORT = int(os.getenv("PORT", "8000"))
def validate_config() -> None:
"""Validate required configuration on startup."""
errors = []
if not PLEX_TOKEN:
errors.append("PLEX_TOKEN environment variable is required")
if not PLEX_URL:
errors.append("PLEX_URL environment variable is required")
if errors:
for error in errors:
logger.error(f"Configuration error: {error}")
raise ValueError(f"Configuration validation failed: {'; '.join(errors)}")
# Paths
SCRIPT_DIR = Path(__file__).parent
OPENAPI_PATH = SCRIPT_DIR / "openapi.json"
API_REFERENCE_PATH = SCRIPT_DIR / "docs" / "api_reference.md"
class PlexClient:
"""HTTP client for Plex Media Server API."""
def __init__(self, base_url: str, token: str, client_id: str):
self.base_url = base_url.rstrip("/")
self.token = token
self.client_id = client_id
self._client: Optional[httpx.AsyncClient] = None
logger.debug(f"PlexClient initialized for {self.base_url}")
@property
def headers(self) -> dict:
return {
"X-Plex-Token": self.token,
"X-Plex-Client-Identifier": self.client_id,
"Accept": "application/json",
}
async def get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
logger.debug("Creating new HTTP client")
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=30.0,
)
return self._client
async def close(self):
if self._client and not self._client.is_closed:
logger.debug("Closing HTTP client")
await self._client.aclose()
async def request(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
body: Optional[dict] = None,
) -> dict:
"""Execute an API request to Plex."""
client = await self.get_client()
logger.info(f"Plex API request: {method.upper()} {endpoint}")
if params:
logger.debug(f"Request params: {params}")
try:
response = await client.request(
method=method.upper(),
url=endpoint,
params=params,
json=body if body else None,
)
response.raise_for_status()
logger.debug(f"Response status: {response.status_code}")
# Handle empty responses
if not response.content:
return {"status": "ok", "statusCode": response.status_code}
return response.json()
except httpx.HTTPStatusError as e:
logger.warning(
f"Plex API HTTP error: {e.response.status_code} for {method.upper()} {endpoint}"
)
return {
"error": True,
"statusCode": e.response.status_code,
"message": str(e),
}
except Exception as e:
logger.error(f"Plex API error: {e}", exc_info=True)
return {"error": True, "message": str(e)}
async def health_check(self) -> bool:
"""Check if connection to Plex is working."""
try:
result = await self.request("GET", "/identity")
is_healthy = "error" not in result
logger.debug(f"Health check result: {is_healthy}")
return is_healthy
except Exception as e:
logger.warning(f"Health check failed: {e}")
return False
# Initialize client and MCP server
plex_client = PlexClient(PLEX_URL, PLEX_TOKEN, PLEX_CLIENT_ID)
mcp = FastMCP(
name="Plex MCP Server",
instructions="MCP server for interacting with Plex Media Server",
)
# =============================================================================
# Specific Tools (5 high-value operations)
# =============================================================================
@mcp.tool()
async def get_libraries() -> str:
"""List all Plex library sections (Movies, TV Shows, Music, etc.).
Returns a list of all libraries with their IDs, names, types, and item counts.
Use the section ID from this response with other tools like search_library or refresh_library.
"""
result = await plex_client.request("GET", "/library/sections/all")
return json.dumps(result, indent=2)
@mcp.tool()
async def search_library(
query: str, limit: int = 10, section_id: Optional[int] = None
) -> str:
"""Search for media across all libraries or within a specific library.
Args:
query: Search term (title, artist, etc.)
limit: Maximum results per media type (default: 10)
section_id: Optional library section ID to restrict search
Returns search results grouped by media type (movies, shows, episodes, etc.)
"""
params = {"query": query, "limit": limit}
if section_id is not None:
params["sectionId"] = section_id
result = await plex_client.request("GET", "/hubs/search", params=params)
return json.dumps(result, indent=2)
@mcp.tool()
async def get_metadata(rating_key: str, include_children: bool = False) -> str:
"""Get detailed metadata for a specific media item.
Args:
rating_key: The unique identifier (ratingKey) of the media item
include_children: If True, include child items (seasons for shows, episodes for seasons)
Returns detailed metadata including title, summary, ratings, cast, and more.
"""
endpoint = f"/library/metadata/{rating_key}"
params = {}
if include_children:
params["includeChildren"] = 1
result = await plex_client.request("GET", endpoint, params=params)
return json.dumps(result, indent=2)
@mcp.tool()
async def get_recently_added(section_id: Optional[int] = None, limit: int = 20) -> str:
"""Get recently added media items.
Args:
section_id: Optional library section ID to filter results
limit: Maximum number of items to return (default: 20)
Returns recently added items across all libraries or for a specific library.
"""
if section_id is not None:
# Get recently added for specific library via hubs
endpoint = f"/hubs/sections/{section_id}"
else:
# Get global recently added
endpoint = "/hubs"
params = {"X-Plex-Container-Size": limit}
result = await plex_client.request("GET", endpoint, params=params)
return json.dumps(result, indent=2)
@mcp.tool()
async def refresh_library(
section_id: int,
force: bool = False,
path: Optional[str] = None,
) -> str:
"""Trigger a library scan to detect new or changed media files.
Args:
section_id: The library section ID to refresh (get from get_libraries)
force: If True, force metadata refresh even if files appear unchanged
path: Optional path to restrict the scan to a specific directory
Returns confirmation of the refresh request.
"""
endpoint = f"/library/sections/{section_id}/refresh"
params = {}
if force:
params["force"] = 1
if path:
params["path"] = path
result = await plex_client.request("POST", endpoint, params=params)
return json.dumps(result, indent=2)
# =============================================================================
# API Pass-through Tool
# =============================================================================
@mcp.tool()
async def plex_api_call(
endpoint: str,
method: str = "GET",
params: str = "{}",
body: str = "{}",
) -> str:
"""Execute any Plex API call directly.
This is the escape hatch for accessing any Plex API endpoint not covered
by the specific tools. Refer to the 'plex://api-reference' resource or
use search_api_docs() to find available endpoints.
Args:
endpoint: API path (e.g., '/playlists', '/:scrobble', '/library/metadata/123')
method: HTTP method (GET, POST, PUT, DELETE)
params: JSON string of query parameters (e.g., '{"query": "test", "limit": 10}')
body: JSON string of request body for POST/PUT requests
Returns the API response as JSON.
Examples:
- Mark item watched: plex_api_call('/:scrobble', params='{"key": "12345", "identifier": "com.plexapp.plugins.library"}')
- Get playlists: plex_api_call('/playlists')
- Rate item: plex_api_call('/:rate', 'PUT', params='{"key": "12345", "identifier": "com.plexapp.plugins.library", "rating": 8}')
"""
try:
parsed_params = json.loads(params) if params and params != "{}" else None
parsed_body = json.loads(body) if body and body != "{}" else None
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
result = await plex_client.request(
method, endpoint, params=parsed_params, body=parsed_body
)
return json.dumps(result, indent=2)
# =============================================================================
# Documentation Tools & Resources
# =============================================================================
@mcp.tool()
async def search_api_docs(query: str, limit: int = 20) -> str:
"""Search the Plex OpenAPI specification for endpoints matching a query.
Args:
query: Search term to find in endpoint paths, summaries, or descriptions
limit: Maximum number of results to return (default: 20)
Returns matching endpoints with their methods, summaries, and parameters.
"""
if not OPENAPI_PATH.exists():
return json.dumps({"error": True, "message": "OpenAPI spec not found"})
try:
with open(OPENAPI_PATH) as f:
spec = json.load(f)
except Exception as e:
return json.dumps(
{"error": True, "message": f"Failed to load OpenAPI spec: {e}"}
)
query_lower = query.lower()
results = []
for path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method.startswith("x-"): # Skip OpenAPI extensions
continue
# Search in path, summary, description, and tags
searchable = " ".join(
[
path,
details.get("summary", ""),
details.get("description", ""),
" ".join(details.get("tags", [])),
]
).lower()
if query_lower in searchable:
# Extract parameter info
params = []
for param in details.get("parameters", []):
params.append(
{
"name": param.get("name"),
"in": param.get("in"),
"required": param.get("required", False),
"description": param.get("description", ""),
}
)
results.append(
{
"path": path,
"method": method.upper(),
"summary": details.get("summary", ""),
"description": details.get("description", ""),
"tags": details.get("tags", []),
"parameters": params,
}
)
if len(results) >= limit:
break
if len(results) >= limit:
break
return json.dumps(
{
"query": query,
"count": len(results),
"results": results,
},
indent=2,
)
@mcp.resource("plex://api-reference")
async def get_api_reference() -> str:
"""Get the curated Plex API quick reference documentation.
This resource provides a human-readable guide to the most commonly used
Plex API endpoints, organized by category. Use this to understand how
to use the plex_api_call tool for operations not covered by specific tools.
"""
if not API_REFERENCE_PATH.exists():
return "API reference documentation not found."
return API_REFERENCE_PATH.read_text()
# =============================================================================
# Health Check & Application Setup
# =============================================================================
async def health_check(request):
"""Health check endpoint for Docker/orchestration."""
is_healthy = await plex_client.health_check()
status = "ok" if is_healthy else "degraded"
logger.debug(f"Health check endpoint called: status={status}")
return JSONResponse(
{"status": status, "plex_connected": is_healthy},
status_code=200 if is_healthy else 503,
)
def create_app() -> Starlette:
"""Create the Starlette application wrapping the MCP server."""
mcp_app = mcp.http_app()
# Combine our custom lifespan with MCP's lifespan
@asynccontextmanager
async def combined_lifespan(app):
# Our startup logic
logger.info("Plex MCP Server starting up")
# Nest MCP's lifespan inside ours
async with mcp_app.lifespan(app):
yield
# Our shutdown logic
logger.info("Plex MCP Server shutting down")
await plex_client.close()
return Starlette(
routes=[
Route("/health", health_check),
Mount("/", app=mcp_app),
],
lifespan=combined_lifespan,
)
app = create_app()
if __name__ == "__main__":
import uvicorn
# Validate configuration before starting
validate_config()
logger.info(f"Starting Plex MCP Server on port {PORT}")
logger.info(f"Plex URL: {PLEX_URL}")
logger.info(f"Health check: http://localhost:{PORT}/health")
uvicorn.run(app, host="0.0.0.0", port=PORT)