Files
plex-mcp/server.py
Ben 9931bdf2e4
Some checks failed
Build and Push Docker Image / build (push) Failing after 0s
Initial commit: Plex MCP server with 6 tools and API passthrough
- get_libraries: List all library sections
- search_library: Search for media by title
- get_metadata: Get detailed item info by rating key
- get_recently_added: Get recently added content
- refresh_library: Trigger library scan
- plex_api_call: Raw API passthrough for any endpoint
- search_api_docs: Search OpenAPI spec for endpoint documentation

Includes Docker support and Gitea Actions workflow for container builds.
2025-12-28 05:26:43 +00:00

403 lines
13 KiB
Python

"""
Plex MCP Server - A lightweight MCP server for Plex Media Server API.
Follows the Hybrid MCP Light pattern:
- 5 specific tools for common operations
- 1 API pass-through for full API coverage
- Documentation resources for AI agent reference
"""
import json
import os
from pathlib import Path
from typing import Optional
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
load_dotenv()
# Configuration
PLEX_URL = os.getenv("PLEX_URL", "http://localhost:32400")
PLEX_TOKEN = os.getenv("PLEX_TOKEN", "")
PLEX_CLIENT_ID = os.getenv("PLEX_CLIENT_ID", "plex-mcp-server")
PORT = int(os.getenv("PORT", "8000"))
# Paths
SCRIPT_DIR = Path(__file__).parent
OPENAPI_PATH = SCRIPT_DIR / "openapi.json"
API_REFERENCE_PATH = SCRIPT_DIR / "docs" / "api_reference.md"
class PlexClient:
"""HTTP client for Plex Media Server API."""
def __init__(self, base_url: str, token: str, client_id: str):
self.base_url = base_url.rstrip("/")
self.token = token
self.client_id = client_id
self._client: Optional[httpx.AsyncClient] = None
@property
def headers(self) -> dict:
return {
"X-Plex-Token": self.token,
"X-Plex-Client-Identifier": self.client_id,
"Accept": "application/json",
}
async def get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=30.0,
)
return self._client
async def close(self):
if self._client and not self._client.is_closed:
await self._client.aclose()
async def request(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
body: Optional[dict] = None,
) -> dict:
"""Execute an API request to Plex."""
client = await self.get_client()
try:
response = await client.request(
method=method.upper(),
url=endpoint,
params=params,
json=body if body else None,
)
response.raise_for_status()
# Handle empty responses
if not response.content:
return {"status": "ok", "statusCode": response.status_code}
return response.json()
except httpx.HTTPStatusError as e:
return {
"error": True,
"statusCode": e.response.status_code,
"message": str(e),
}
except Exception as e:
return {"error": True, "message": str(e)}
async def health_check(self) -> bool:
"""Check if connection to Plex is working."""
try:
result = await self.request("GET", "/identity")
return "error" not in result
except Exception:
return False
# Initialize client and MCP server
plex_client = PlexClient(PLEX_URL, PLEX_TOKEN, PLEX_CLIENT_ID)
mcp = FastMCP(
"Plex MCP Server",
description="MCP server for interacting with Plex Media Server",
)
# =============================================================================
# Specific Tools (5 high-value operations)
# =============================================================================
@mcp.tool()
async def get_libraries() -> str:
"""List all Plex library sections (Movies, TV Shows, Music, etc.).
Returns a list of all libraries with their IDs, names, types, and item counts.
Use the section ID from this response with other tools like search_library or refresh_library.
"""
result = await plex_client.request("GET", "/library/sections/all")
return json.dumps(result, indent=2)
@mcp.tool()
async def search_library(
query: str, limit: int = 10, section_id: Optional[int] = None
) -> str:
"""Search for media across all libraries or within a specific library.
Args:
query: Search term (title, artist, etc.)
limit: Maximum results per media type (default: 10)
section_id: Optional library section ID to restrict search
Returns search results grouped by media type (movies, shows, episodes, etc.)
"""
params = {"query": query, "limit": limit}
if section_id is not None:
params["sectionId"] = section_id
result = await plex_client.request("GET", "/hubs/search", params=params)
return json.dumps(result, indent=2)
@mcp.tool()
async def get_metadata(rating_key: str, include_children: bool = False) -> str:
"""Get detailed metadata for a specific media item.
Args:
rating_key: The unique identifier (ratingKey) of the media item
include_children: If True, include child items (seasons for shows, episodes for seasons)
Returns detailed metadata including title, summary, ratings, cast, and more.
"""
endpoint = f"/library/metadata/{rating_key}"
params = {}
if include_children:
params["includeChildren"] = 1
result = await plex_client.request("GET", endpoint, params=params)
return json.dumps(result, indent=2)
@mcp.tool()
async def get_recently_added(section_id: Optional[int] = None, limit: int = 20) -> str:
"""Get recently added media items.
Args:
section_id: Optional library section ID to filter results
limit: Maximum number of items to return (default: 20)
Returns recently added items across all libraries or for a specific library.
"""
if section_id is not None:
# Get recently added for specific library via hubs
endpoint = f"/hubs/sections/{section_id}"
else:
# Get global recently added
endpoint = "/hubs"
params = {"X-Plex-Container-Size": limit}
result = await plex_client.request("GET", endpoint, params=params)
return json.dumps(result, indent=2)
@mcp.tool()
async def refresh_library(
section_id: int,
force: bool = False,
path: Optional[str] = None,
) -> str:
"""Trigger a library scan to detect new or changed media files.
Args:
section_id: The library section ID to refresh (get from get_libraries)
force: If True, force metadata refresh even if files appear unchanged
path: Optional path to restrict the scan to a specific directory
Returns confirmation of the refresh request.
"""
endpoint = f"/library/sections/{section_id}/refresh"
params = {}
if force:
params["force"] = 1
if path:
params["path"] = path
result = await plex_client.request("POST", endpoint, params=params)
return json.dumps(result, indent=2)
# =============================================================================
# API Pass-through Tool
# =============================================================================
@mcp.tool()
async def plex_api_call(
endpoint: str,
method: str = "GET",
params: str = "{}",
body: str = "{}",
) -> str:
"""Execute any Plex API call directly.
This is the escape hatch for accessing any Plex API endpoint not covered
by the specific tools. Refer to the 'plex://api-reference' resource or
use search_api_docs() to find available endpoints.
Args:
endpoint: API path (e.g., '/playlists', '/:scrobble', '/library/metadata/123')
method: HTTP method (GET, POST, PUT, DELETE)
params: JSON string of query parameters (e.g., '{"query": "test", "limit": 10}')
body: JSON string of request body for POST/PUT requests
Returns the API response as JSON.
Examples:
- Mark item watched: plex_api_call('/:scrobble', params='{"key": "12345", "identifier": "com.plexapp.plugins.library"}')
- Get playlists: plex_api_call('/playlists')
- Rate item: plex_api_call('/:rate', 'PUT', params='{"key": "12345", "identifier": "com.plexapp.plugins.library", "rating": 8}')
"""
try:
parsed_params = json.loads(params) if params and params != "{}" else None
parsed_body = json.loads(body) if body and body != "{}" else None
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
result = await plex_client.request(
method, endpoint, params=parsed_params, body=parsed_body
)
return json.dumps(result, indent=2)
# =============================================================================
# Documentation Tools & Resources
# =============================================================================
@mcp.tool()
async def search_api_docs(query: str, limit: int = 20) -> str:
"""Search the Plex OpenAPI specification for endpoints matching a query.
Args:
query: Search term to find in endpoint paths, summaries, or descriptions
limit: Maximum number of results to return (default: 20)
Returns matching endpoints with their methods, summaries, and parameters.
"""
if not OPENAPI_PATH.exists():
return json.dumps({"error": True, "message": "OpenAPI spec not found"})
try:
with open(OPENAPI_PATH) as f:
spec = json.load(f)
except Exception as e:
return json.dumps(
{"error": True, "message": f"Failed to load OpenAPI spec: {e}"}
)
query_lower = query.lower()
results = []
for path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method.startswith("x-"): # Skip OpenAPI extensions
continue
# Search in path, summary, description, and tags
searchable = " ".join(
[
path,
details.get("summary", ""),
details.get("description", ""),
" ".join(details.get("tags", [])),
]
).lower()
if query_lower in searchable:
# Extract parameter info
params = []
for param in details.get("parameters", []):
params.append(
{
"name": param.get("name"),
"in": param.get("in"),
"required": param.get("required", False),
"description": param.get("description", ""),
}
)
results.append(
{
"path": path,
"method": method.upper(),
"summary": details.get("summary", ""),
"description": details.get("description", ""),
"tags": details.get("tags", []),
"parameters": params,
}
)
if len(results) >= limit:
break
if len(results) >= limit:
break
return json.dumps(
{
"query": query,
"count": len(results),
"results": results,
},
indent=2,
)
@mcp.resource("plex://api-reference")
async def get_api_reference() -> str:
"""Get the curated Plex API quick reference documentation.
This resource provides a human-readable guide to the most commonly used
Plex API endpoints, organized by category. Use this to understand how
to use the plex_api_call tool for operations not covered by specific tools.
"""
if not API_REFERENCE_PATH.exists():
return "API reference documentation not found."
return API_REFERENCE_PATH.read_text()
# =============================================================================
# Health Check & Application Setup
# =============================================================================
async def health_check(request):
"""Health check endpoint for Docker/orchestration."""
is_healthy = await plex_client.health_check()
status = "ok" if is_healthy else "degraded"
return JSONResponse(
{"status": status, "plex_connected": is_healthy},
status_code=200 if is_healthy else 503,
)
def create_app() -> Starlette:
"""Create the Starlette application wrapping the MCP server."""
mcp_app = mcp.http_app()
async def lifespan(app):
# Startup
yield
# Shutdown
await plex_client.close()
return Starlette(
routes=[
Route("/health", health_check),
Mount("/", app=mcp_app),
],
lifespan=lifespan,
)
app = create_app()
if __name__ == "__main__":
import uvicorn
print(f"Starting Plex MCP Server on port {PORT}")
print(f"Plex URL: {PLEX_URL}")
print(f"Health check: http://localhost:{PORT}/health")
uvicorn.run(app, host="0.0.0.0", port=PORT)