Initial commit: Plex MCP server with 6 tools and API passthrough
All checks were successful
Build and Push Plex MCP Docker Image / build (push) Successful in 35s
All checks were successful
Build and Push Plex MCP Docker Image / build (push) Successful in 35s
- get_libraries: List all library sections - search_library: Search for media by title - get_metadata: Get detailed item info by rating key - get_recently_added: Get recently added content - refresh_library: Trigger library scan - plex_api_call: Raw API passthrough for any endpoint - search_api_docs: Search OpenAPI spec for endpoint documentation Includes Docker support and Gitea Actions workflow for container builds.
This commit is contained in:
450
server.py
Normal file
450
server.py
Normal file
@@ -0,0 +1,450 @@
|
||||
"""
|
||||
Plex MCP Server - A lightweight MCP server for Plex Media Server API.
|
||||
|
||||
Follows the Hybrid MCP Light pattern:
|
||||
- 5 specific tools for common operations
|
||||
- 1 API pass-through for full API coverage
|
||||
- Documentation resources for AI agent reference
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
from dotenv import load_dotenv
|
||||
from fastmcp import FastMCP
|
||||
from starlette.applications import Starlette
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.routing import Mount, Route
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, LOG_LEVEL, logging.INFO),
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
stream=sys.stdout,
|
||||
)
|
||||
logger = logging.getLogger("plex-mcp")
|
||||
|
||||
# Configuration
|
||||
PLEX_URL = os.getenv("PLEX_URL", "http://localhost:32400")
|
||||
PLEX_TOKEN = os.getenv("PLEX_TOKEN", "")
|
||||
PLEX_CLIENT_ID = os.getenv("PLEX_CLIENT_ID", "plex-mcp-server")
|
||||
PORT = int(os.getenv("PORT", "8000"))
|
||||
|
||||
|
||||
def validate_config() -> None:
|
||||
"""Validate required configuration on startup."""
|
||||
errors = []
|
||||
|
||||
if not PLEX_TOKEN:
|
||||
errors.append("PLEX_TOKEN environment variable is required")
|
||||
|
||||
if not PLEX_URL:
|
||||
errors.append("PLEX_URL environment variable is required")
|
||||
|
||||
if errors:
|
||||
for error in errors:
|
||||
logger.error(f"Configuration error: {error}")
|
||||
raise ValueError(f"Configuration validation failed: {'; '.join(errors)}")
|
||||
|
||||
|
||||
# Paths
|
||||
SCRIPT_DIR = Path(__file__).parent
|
||||
OPENAPI_PATH = SCRIPT_DIR / "openapi.json"
|
||||
API_REFERENCE_PATH = SCRIPT_DIR / "docs" / "api_reference.md"
|
||||
|
||||
|
||||
class PlexClient:
|
||||
"""HTTP client for Plex Media Server API."""
|
||||
|
||||
def __init__(self, base_url: str, token: str, client_id: str):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
self.client_id = client_id
|
||||
self._client: Optional[httpx.AsyncClient] = None
|
||||
logger.debug(f"PlexClient initialized for {self.base_url}")
|
||||
|
||||
@property
|
||||
def headers(self) -> dict:
|
||||
return {
|
||||
"X-Plex-Token": self.token,
|
||||
"X-Plex-Client-Identifier": self.client_id,
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
async def get_client(self) -> httpx.AsyncClient:
|
||||
if self._client is None or self._client.is_closed:
|
||||
logger.debug("Creating new HTTP client")
|
||||
self._client = httpx.AsyncClient(
|
||||
base_url=self.base_url,
|
||||
headers=self.headers,
|
||||
timeout=30.0,
|
||||
)
|
||||
return self._client
|
||||
|
||||
async def close(self):
|
||||
if self._client and not self._client.is_closed:
|
||||
logger.debug("Closing HTTP client")
|
||||
await self._client.aclose()
|
||||
|
||||
async def request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
params: Optional[dict] = None,
|
||||
body: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""Execute an API request to Plex."""
|
||||
client = await self.get_client()
|
||||
logger.info(f"Plex API request: {method.upper()} {endpoint}")
|
||||
if params:
|
||||
logger.debug(f"Request params: {params}")
|
||||
try:
|
||||
response = await client.request(
|
||||
method=method.upper(),
|
||||
url=endpoint,
|
||||
params=params,
|
||||
json=body if body else None,
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.debug(f"Response status: {response.status_code}")
|
||||
|
||||
# Handle empty responses
|
||||
if not response.content:
|
||||
return {"status": "ok", "statusCode": response.status_code}
|
||||
|
||||
return response.json()
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.warning(
|
||||
f"Plex API HTTP error: {e.response.status_code} for {method.upper()} {endpoint}"
|
||||
)
|
||||
return {
|
||||
"error": True,
|
||||
"statusCode": e.response.status_code,
|
||||
"message": str(e),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Plex API error: {e}", exc_info=True)
|
||||
return {"error": True, "message": str(e)}
|
||||
|
||||
async def health_check(self) -> bool:
|
||||
"""Check if connection to Plex is working."""
|
||||
try:
|
||||
result = await self.request("GET", "/identity")
|
||||
is_healthy = "error" not in result
|
||||
logger.debug(f"Health check result: {is_healthy}")
|
||||
return is_healthy
|
||||
except Exception as e:
|
||||
logger.warning(f"Health check failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# Initialize client and MCP server
|
||||
plex_client = PlexClient(PLEX_URL, PLEX_TOKEN, PLEX_CLIENT_ID)
|
||||
mcp = FastMCP(
|
||||
"Plex MCP Server",
|
||||
description="MCP server for interacting with Plex Media Server",
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Specific Tools (5 high-value operations)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_libraries() -> str:
|
||||
"""List all Plex library sections (Movies, TV Shows, Music, etc.).
|
||||
|
||||
Returns a list of all libraries with their IDs, names, types, and item counts.
|
||||
Use the section ID from this response with other tools like search_library or refresh_library.
|
||||
"""
|
||||
result = await plex_client.request("GET", "/library/sections/all")
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def search_library(
|
||||
query: str, limit: int = 10, section_id: Optional[int] = None
|
||||
) -> str:
|
||||
"""Search for media across all libraries or within a specific library.
|
||||
|
||||
Args:
|
||||
query: Search term (title, artist, etc.)
|
||||
limit: Maximum results per media type (default: 10)
|
||||
section_id: Optional library section ID to restrict search
|
||||
|
||||
Returns search results grouped by media type (movies, shows, episodes, etc.)
|
||||
"""
|
||||
params = {"query": query, "limit": limit}
|
||||
if section_id is not None:
|
||||
params["sectionId"] = section_id
|
||||
|
||||
result = await plex_client.request("GET", "/hubs/search", params=params)
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_metadata(rating_key: str, include_children: bool = False) -> str:
|
||||
"""Get detailed metadata for a specific media item.
|
||||
|
||||
Args:
|
||||
rating_key: The unique identifier (ratingKey) of the media item
|
||||
include_children: If True, include child items (seasons for shows, episodes for seasons)
|
||||
|
||||
Returns detailed metadata including title, summary, ratings, cast, and more.
|
||||
"""
|
||||
endpoint = f"/library/metadata/{rating_key}"
|
||||
params = {}
|
||||
if include_children:
|
||||
params["includeChildren"] = 1
|
||||
|
||||
result = await plex_client.request("GET", endpoint, params=params)
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_recently_added(section_id: Optional[int] = None, limit: int = 20) -> str:
|
||||
"""Get recently added media items.
|
||||
|
||||
Args:
|
||||
section_id: Optional library section ID to filter results
|
||||
limit: Maximum number of items to return (default: 20)
|
||||
|
||||
Returns recently added items across all libraries or for a specific library.
|
||||
"""
|
||||
if section_id is not None:
|
||||
# Get recently added for specific library via hubs
|
||||
endpoint = f"/hubs/sections/{section_id}"
|
||||
else:
|
||||
# Get global recently added
|
||||
endpoint = "/hubs"
|
||||
|
||||
params = {"X-Plex-Container-Size": limit}
|
||||
result = await plex_client.request("GET", endpoint, params=params)
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def refresh_library(
|
||||
section_id: int,
|
||||
force: bool = False,
|
||||
path: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Trigger a library scan to detect new or changed media files.
|
||||
|
||||
Args:
|
||||
section_id: The library section ID to refresh (get from get_libraries)
|
||||
force: If True, force metadata refresh even if files appear unchanged
|
||||
path: Optional path to restrict the scan to a specific directory
|
||||
|
||||
Returns confirmation of the refresh request.
|
||||
"""
|
||||
endpoint = f"/library/sections/{section_id}/refresh"
|
||||
params = {}
|
||||
if force:
|
||||
params["force"] = 1
|
||||
if path:
|
||||
params["path"] = path
|
||||
|
||||
result = await plex_client.request("POST", endpoint, params=params)
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# API Pass-through Tool
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def plex_api_call(
|
||||
endpoint: str,
|
||||
method: str = "GET",
|
||||
params: str = "{}",
|
||||
body: str = "{}",
|
||||
) -> str:
|
||||
"""Execute any Plex API call directly.
|
||||
|
||||
This is the escape hatch for accessing any Plex API endpoint not covered
|
||||
by the specific tools. Refer to the 'plex://api-reference' resource or
|
||||
use search_api_docs() to find available endpoints.
|
||||
|
||||
Args:
|
||||
endpoint: API path (e.g., '/playlists', '/:scrobble', '/library/metadata/123')
|
||||
method: HTTP method (GET, POST, PUT, DELETE)
|
||||
params: JSON string of query parameters (e.g., '{"query": "test", "limit": 10}')
|
||||
body: JSON string of request body for POST/PUT requests
|
||||
|
||||
Returns the API response as JSON.
|
||||
|
||||
Examples:
|
||||
- Mark item watched: plex_api_call('/:scrobble', params='{"key": "12345", "identifier": "com.plexapp.plugins.library"}')
|
||||
- Get playlists: plex_api_call('/playlists')
|
||||
- Rate item: plex_api_call('/:rate', 'PUT', params='{"key": "12345", "identifier": "com.plexapp.plugins.library", "rating": 8}')
|
||||
"""
|
||||
try:
|
||||
parsed_params = json.loads(params) if params and params != "{}" else None
|
||||
parsed_body = json.loads(body) if body and body != "{}" else None
|
||||
except json.JSONDecodeError as e:
|
||||
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
|
||||
|
||||
result = await plex_client.request(
|
||||
method, endpoint, params=parsed_params, body=parsed_body
|
||||
)
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Documentation Tools & Resources
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def search_api_docs(query: str, limit: int = 20) -> str:
|
||||
"""Search the Plex OpenAPI specification for endpoints matching a query.
|
||||
|
||||
Args:
|
||||
query: Search term to find in endpoint paths, summaries, or descriptions
|
||||
limit: Maximum number of results to return (default: 20)
|
||||
|
||||
Returns matching endpoints with their methods, summaries, and parameters.
|
||||
"""
|
||||
if not OPENAPI_PATH.exists():
|
||||
return json.dumps({"error": True, "message": "OpenAPI spec not found"})
|
||||
|
||||
try:
|
||||
with open(OPENAPI_PATH) as f:
|
||||
spec = json.load(f)
|
||||
except Exception as e:
|
||||
return json.dumps(
|
||||
{"error": True, "message": f"Failed to load OpenAPI spec: {e}"}
|
||||
)
|
||||
|
||||
query_lower = query.lower()
|
||||
results = []
|
||||
|
||||
for path, methods in spec.get("paths", {}).items():
|
||||
for method, details in methods.items():
|
||||
if method.startswith("x-"): # Skip OpenAPI extensions
|
||||
continue
|
||||
|
||||
# Search in path, summary, description, and tags
|
||||
searchable = " ".join(
|
||||
[
|
||||
path,
|
||||
details.get("summary", ""),
|
||||
details.get("description", ""),
|
||||
" ".join(details.get("tags", [])),
|
||||
]
|
||||
).lower()
|
||||
|
||||
if query_lower in searchable:
|
||||
# Extract parameter info
|
||||
params = []
|
||||
for param in details.get("parameters", []):
|
||||
params.append(
|
||||
{
|
||||
"name": param.get("name"),
|
||||
"in": param.get("in"),
|
||||
"required": param.get("required", False),
|
||||
"description": param.get("description", ""),
|
||||
}
|
||||
)
|
||||
|
||||
results.append(
|
||||
{
|
||||
"path": path,
|
||||
"method": method.upper(),
|
||||
"summary": details.get("summary", ""),
|
||||
"description": details.get("description", ""),
|
||||
"tags": details.get("tags", []),
|
||||
"parameters": params,
|
||||
}
|
||||
)
|
||||
|
||||
if len(results) >= limit:
|
||||
break
|
||||
|
||||
if len(results) >= limit:
|
||||
break
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"query": query,
|
||||
"count": len(results),
|
||||
"results": results,
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
|
||||
|
||||
@mcp.resource("plex://api-reference")
|
||||
async def get_api_reference() -> str:
|
||||
"""Get the curated Plex API quick reference documentation.
|
||||
|
||||
This resource provides a human-readable guide to the most commonly used
|
||||
Plex API endpoints, organized by category. Use this to understand how
|
||||
to use the plex_api_call tool for operations not covered by specific tools.
|
||||
"""
|
||||
if not API_REFERENCE_PATH.exists():
|
||||
return "API reference documentation not found."
|
||||
|
||||
return API_REFERENCE_PATH.read_text()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Health Check & Application Setup
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def health_check(request):
|
||||
"""Health check endpoint for Docker/orchestration."""
|
||||
is_healthy = await plex_client.health_check()
|
||||
status = "ok" if is_healthy else "degraded"
|
||||
logger.debug(f"Health check endpoint called: status={status}")
|
||||
return JSONResponse(
|
||||
{"status": status, "plex_connected": is_healthy},
|
||||
status_code=200 if is_healthy else 503,
|
||||
)
|
||||
|
||||
|
||||
def create_app() -> Starlette:
|
||||
"""Create the Starlette application wrapping the MCP server."""
|
||||
mcp_app = mcp.http_app()
|
||||
|
||||
async def lifespan(app):
|
||||
# Startup
|
||||
logger.info("Plex MCP Server starting up")
|
||||
yield
|
||||
# Shutdown
|
||||
logger.info("Plex MCP Server shutting down")
|
||||
await plex_client.close()
|
||||
|
||||
return Starlette(
|
||||
routes=[
|
||||
Route("/health", health_check),
|
||||
Mount("/", app=mcp_app),
|
||||
],
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
|
||||
app = create_app()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
# Validate configuration before starting
|
||||
validate_config()
|
||||
|
||||
logger.info(f"Starting Plex MCP Server on port {PORT}")
|
||||
logger.info(f"Plex URL: {PLEX_URL}")
|
||||
logger.info(f"Health check: http://localhost:{PORT}/health")
|
||||
uvicorn.run(app, host="0.0.0.0", port=PORT)
|
||||
Reference in New Issue
Block a user