""" Transmission MCP Server - Multi-instance Transmission daemon management via MCP. Follows the "Hybrid MCP Light" pattern: - Minimal specific tools for common operations - Raw RPC pass-through for full API access - Embedded API documentation as a resource """ from __future__ import annotations import json import os from dataclasses import dataclass from typing import Any import httpx from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Mount, Route load_dotenv() # ============================================================================= # Configuration # ============================================================================= @dataclass class InstanceConfig: """Configuration for a single Transmission instance.""" name: str url: str username: str | None = None password: str | None = None @property def rpc_url(self) -> str: """Full RPC endpoint URL.""" base = self.url.rstrip("/") if not base.endswith("/transmission/rpc"): base = f"{base}/transmission/rpc" return base def load_instances() -> list[InstanceConfig]: """Load instance configurations from TRANSMISSION_INSTANCES env var.""" raw = os.getenv("TRANSMISSION_INSTANCES", "[]") try: data = json.loads(raw) except json.JSONDecodeError as e: raise ValueError(f"Invalid TRANSMISSION_INSTANCES JSON: {e}") from e instances = [] for item in data: if not isinstance(item, dict): raise ValueError(f"Each instance must be an object, got: {type(item)}") if "name" not in item or "url" not in item: raise ValueError("Each instance requires 'name' and 'url' fields") instances.append( InstanceConfig( name=item["name"], url=item["url"], username=item.get("username"), password=item.get("password"), ) ) return instances def get_default_instance() -> str | None: """Get the default instance name from env.""" return os.getenv("TRANSMISSION_DEFAULT_INSTANCE") # ============================================================================= # Transmission Client # ============================================================================= class TransmissionClient: """ Async client for a single Transmission daemon instance. Handles: - HTTP Basic authentication (optional) - CSRF token management (X-Transmission-Session-Id) - JSON-RPC 2.0 protocol (new) and legacy protocol (old) - Auto-detection of Transmission version """ # Method name mapping: new (4.1.0+) -> old (pre-4.1.0) METHOD_ALIASES = { "session_get": "session-get", "session_set": "session-set", "session_stats": "session-stats", "session_close": "session-close", "torrent_get": "torrent-get", "torrent_set": "torrent-set", "torrent_add": "torrent-add", "torrent_remove": "torrent-remove", "torrent_start": "torrent-start", "torrent_start_now": "torrent-start-now", "torrent_stop": "torrent-stop", "torrent_verify": "torrent-verify", "torrent_reannounce": "torrent-reannounce", "torrent_set_location": "torrent-set-location", "torrent_rename_path": "torrent-rename-path", "blocklist_update": "blocklist-update", "port_test": "port-test", "free_space": "free-space", "group_get": "group-get", "group_set": "group-set", "queue_move_top": "queue-move-top", "queue_move_up": "queue-move-up", "queue_move_down": "queue-move-down", "queue_move_bottom": "queue-move-bottom", } def __init__(self, config: InstanceConfig): self.config = config self._session_id: str | None = None self._request_id = 0 self._use_legacy_methods: bool | None = None # Auto-detected # Build auth if credentials provided auth = None if config.username: auth = httpx.BasicAuth(config.username, config.password or "") self._client = httpx.AsyncClient( auth=auth, timeout=30.0, headers={"Content-Type": "application/json"}, ) async def close(self) -> None: """Close the HTTP client.""" await self._client.aclose() def _next_request_id(self) -> int: """Generate unique request ID.""" self._request_id += 1 return self._request_id def _get_method_name(self, method: str) -> str: """Get the appropriate method name based on detected version.""" if self._use_legacy_methods and method in self.METHOD_ALIASES: return self.METHOD_ALIASES[method] return method def _convert_params_to_legacy( self, params: dict[str, Any] | None ) -> dict[str, Any] | None: """Convert snake_case param keys to camelCase for legacy Transmission.""" if not params or not self._use_legacy_methods: return params def to_camel_case(snake_str: str) -> str: components = snake_str.split("_") return components[0] + "".join(x.title() for x in components[1:]) converted = {} for key, value in params.items(): # Convert snake_case to camelCase new_key = to_camel_case(key) # Handle nested arrays of field names if isinstance(value, list) and key == "fields": value = [to_camel_case(f) if "_" in f else f for f in value] converted[new_key] = value return converted def _convert_result_from_legacy(self, result: Any) -> Any: """Convert camelCase result keys to snake_case from legacy Transmission.""" if not self._use_legacy_methods: return result def to_snake_case(camel_str: str) -> str: import re s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", camel_str) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower() if isinstance(result, dict): return { to_snake_case(k): self._convert_result_from_legacy(v) for k, v in result.items() } elif isinstance(result, list): return [self._convert_result_from_legacy(item) for item in result] return result async def _raw_rpc( self, method: str, params: dict[str, Any] | None = None, use_jsonrpc: bool = True, ) -> tuple[dict[str, Any], bool]: """ Execute a raw RPC call and return (data, success). Args: method: RPC method name params: Method parameters use_jsonrpc: Whether to use JSON-RPC 2.0 format Returns: Tuple of (response data, whether call succeeded) """ payload: dict[str, Any] if use_jsonrpc: payload = { "jsonrpc": "2.0", "method": method, "id": self._next_request_id(), } if params: payload["params"] = params else: # Legacy format payload = {"method": method} if params: payload["arguments"] = params response = None for attempt in range(2): headers = {} if self._session_id: headers["X-Transmission-Session-Id"] = self._session_id response = await self._client.post( self.config.rpc_url, json=payload, headers=headers, ) if response.status_code == 409: self._session_id = response.headers.get("X-Transmission-Session-Id") if not self._session_id: raise ValueError("Got 409 but no X-Transmission-Session-Id header") continue response.raise_for_status() break if response is None: raise RuntimeError("No response received from Transmission") return response.json(), True async def rpc( self, method: str, params: dict[str, Any] | None = None ) -> dict[str, Any]: """ Execute an RPC call to the Transmission daemon. Automatically detects and handles both: - JSON-RPC 2.0 with snake_case (Transmission 4.1.0+) - Legacy protocol with camelCase (older versions) Args: method: RPC method name (use snake_case, e.g., "torrent_get", "session_stats") params: Method parameters (use snake_case keys) Returns: The result from the response (converted to snake_case if legacy) """ # Auto-detect protocol on first call if self._use_legacy_methods is None: await self._detect_version() # Convert method name and params for legacy if needed actual_method = self._get_method_name(method) actual_params = self._convert_params_to_legacy(params) data, _ = await self._raw_rpc( actual_method, actual_params, use_jsonrpc=not self._use_legacy_methods ) # Handle response based on protocol if self._use_legacy_methods: # Legacy format: {"result": "success", "arguments": {...}} if data.get("result") != "success": return { "error": True, "message": data.get("result", "Unknown error"), } result = data.get("arguments", {}) return self._convert_result_from_legacy(result) else: # JSON-RPC 2.0 format if "error" in data: error = data["error"] if isinstance(error, dict): return { "error": True, "code": error.get("code"), "message": error.get("message"), "data": error.get("data"), } return {"error": True, "message": str(error)} return data.get("result", {}) async def _detect_version(self) -> None: """Detect whether this is a legacy or modern Transmission daemon.""" # Try modern JSON-RPC 2.0 first try: payload = { "jsonrpc": "2.0", "method": "session_get", "id": self._next_request_id(), "params": {"fields": ["version"]}, } response = None for attempt in range(2): headers = {} if self._session_id: headers["X-Transmission-Session-Id"] = self._session_id response = await self._client.post( self.config.rpc_url, json=payload, headers=headers, ) if response.status_code == 409: self._session_id = response.headers.get("X-Transmission-Session-Id") continue break if response is not None: data = response.json() # Check if this looks like a successful JSON-RPC 2.0 response if "result" in data and isinstance(data["result"], dict): self._use_legacy_methods = False return # Check for method not recognized error - means we need legacy if "error" in data: error = data["error"] if ( isinstance(error, dict) and "method name not recognized" in str(error.get("message", "")).lower() ): self._use_legacy_methods = True return if ( isinstance(error, str) and "method name not recognized" in error.lower() ): self._use_legacy_methods = True return except Exception: pass # Try legacy format try: payload = {"method": "session-get", "arguments": {"fields": ["version"]}} response = None for attempt in range(2): headers = {} if self._session_id: headers["X-Transmission-Session-Id"] = self._session_id response = await self._client.post( self.config.rpc_url, json=payload, headers=headers, ) if response.status_code == 409: self._session_id = response.headers.get("X-Transmission-Session-Id") continue break if response is not None: data = response.json() if data.get("result") == "success": self._use_legacy_methods = True return except Exception: pass # Default to modern self._use_legacy_methods = False async def health_check(self) -> dict[str, Any]: """Check connectivity to this instance.""" try: result = await self.rpc( "session_get", {"fields": ["version", "rpc_version_semver"]} ) if "error" in result: return {"healthy": False, "error": result["message"]} return { "healthy": True, "version": result.get("version"), "rpc_version": result.get("rpc_version_semver"), } except Exception as e: return {"healthy": False, "error": str(e)} # ============================================================================= # Instance Registry # ============================================================================= class InstanceRegistry: """Manages multiple TransmissionClient instances.""" def __init__(self, configs: list[InstanceConfig], default: str | None = None): self._clients: dict[str, TransmissionClient] = {} self._default = default for config in configs: self._clients[config.name] = TransmissionClient(config) # Validate default exists if self._default and self._default not in self._clients: raise ValueError( f"Default instance '{self._default}' not found in configured instances: " f"{list(self._clients.keys())}" ) def get(self, name: str | None = None) -> TransmissionClient: """ Get a client by name, or the default if name is None. Raises: KeyError: If instance not found or no default configured """ if name is None: if self._default is None: raise KeyError("No instance name provided and no default configured") name = self._default if name not in self._clients: raise KeyError( f"Instance '{name}' not found. Available: {list(self._clients.keys())}" ) return self._clients[name] def list_names(self) -> list[str]: """List all configured instance names.""" return list(self._clients.keys()) def get_default(self) -> str | None: """Get the default instance name.""" return self._default async def close_all(self) -> None: """Close all clients.""" for client in self._clients.values(): await client.close() # ============================================================================= # Initialize Registry # ============================================================================= try: _instances = load_instances() _default = get_default_instance() registry = InstanceRegistry(_instances, _default) except Exception as e: # Allow server to start even with bad config for debugging print(f"WARNING: Failed to load instances: {e}") registry = InstanceRegistry([]) # ============================================================================= # MCP Server # ============================================================================= mcp = FastMCP( name="transmission-mcp", instructions=""" Transmission MCP Server - Manage multiple Transmission BitTorrent daemon instances. Use 'list_instances' to see available instances and their status. Use 'get_torrents' and 'get_session_stats' for common operations. Use 'rpc_call' for any Transmission RPC method not covered by specific tools. Refer to the 'transmission://rpc-reference' resource for API documentation. """, ) # ============================================================================= # MCP Tools # ============================================================================= @mcp.tool() async def list_instances() -> str: """ List all configured Transmission instances and their connectivity status. Returns a JSON object with: - instances: Array of instance objects with name, url, healthy status, and version info - default: The default instance name (if configured) """ results = [] for name in registry.list_names(): client = registry.get(name) health = await client.health_check() results.append( { "name": name, "url": client.config.url, "authenticated": client.config.username is not None, **health, } ) return json.dumps( { "instances": results, "default": registry.get_default(), }, indent=2, ) @mcp.tool() async def get_torrents( instance: str | None = None, ids: str | None = None, ) -> str: """ Get torrent list with common fields from a Transmission instance. Args: instance: Instance name (uses default if not specified) ids: Optional torrent IDs - can be: - Omitted for all torrents - A single ID number as string - Comma-separated IDs (e.g., "1,5,10") - "recently_active" for recently changed torrents Returns JSON with torrents array containing: id, name, status, percent_done, rate_download, rate_upload, total_size, error, error_string, labels """ try: client = registry.get(instance) except KeyError as e: return json.dumps({"error": str(e)}) params: dict[str, Any] = { "fields": [ "id", "name", "status", "percent_done", "rate_download", "rate_upload", "total_size", "error", "error_string", "labels", "eta", "download_dir", ] } # Parse ids parameter if ids: if ids == "recently_active": params["ids"] = "recently_active" elif "," in ids: params["ids"] = [int(i.strip()) for i in ids.split(",")] else: params["ids"] = [int(ids)] result = await client.rpc("torrent_get", params) return json.dumps(result, indent=2) @mcp.tool() async def get_session_stats(instance: str | None = None) -> str: """ Get session statistics from a Transmission instance. Args: instance: Instance name (uses default if not specified) Returns JSON with: active_torrent_count, paused_torrent_count, torrent_count, download_speed, upload_speed, cumulative_stats, current_stats """ try: client = registry.get(instance) except KeyError as e: return json.dumps({"error": str(e)}) result = await client.rpc("session_stats") return json.dumps(result, indent=2) @mcp.tool() async def rpc_call( method: str, params: str = "{}", instance: str | None = None, ) -> str: """ Execute a raw JSON-RPC 2.0 call to a Transmission instance. This is the "escape hatch" for any RPC method not covered by specific tools. Refer to the 'transmission://rpc-reference' resource for available methods. Args: method: RPC method name (e.g., "torrent_add", "torrent_set", "session_set") params: JSON string of method parameters (default: "{}") instance: Instance name (uses default if not specified) Examples: - Add torrent: method="torrent_add", params='{"filename": "magnet:?xt=..."}' - Start torrent: method="torrent_start", params='{"ids": [1, 2]}' - Set speed limit: method="session_set", params='{"speed_limit_down": 1000}' """ try: client = registry.get(instance) except KeyError as e: return json.dumps({"error": str(e)}) try: parsed_params = json.loads(params) except json.JSONDecodeError as e: return json.dumps({"error": f"Invalid JSON params: {e}"}) result = await client.rpc(method, parsed_params if parsed_params else None) return json.dumps(result, indent=2) # ============================================================================= # MCP Resource - API Reference # ============================================================================= RPC_REFERENCE = """ # Transmission RPC Reference (JSON-RPC 2.0) ## Overview Transmission 4.1.0+ uses JSON-RPC 2.0 with snake_case naming. All methods require the `method` name and optional `params` object. ## Torrent Actions | Method | Description | Key Params | |--------|-------------|------------| | `torrent_start` | Start torrents | `ids` | | `torrent_start_now` | Start immediately (skip queue) | `ids` | | `torrent_stop` | Stop torrents | `ids` | | `torrent_verify` | Verify torrent data | `ids` | | `torrent_reannounce` | Re-announce to trackers | `ids` | **`ids` parameter**: integer, array of integers/hashes, or "recently_active" ## Torrent Accessors | Method | Description | |--------|-------------| | `torrent_get` | Get torrent info (requires `fields` array) | | `torrent_set` | Modify torrent settings | | `torrent_add` | Add new torrent (`filename` or `metainfo`) | | `torrent_remove` | Remove torrent (`delete_local_data`: bool) | | `torrent_set_location` | Move torrent (`location`, `move`: bool) | | `torrent_rename_path` | Rename file/folder (`path`, `name`) | ### Common `torrent_get` fields: - `id`, `name`, `status`, `hash_string` - `percent_done`, `percent_complete`, `eta` - `rate_download`, `rate_upload` (bytes/sec) - `total_size`, `size_when_done`, `left_until_done` - `download_dir`, `labels`, `error`, `error_string` - `files`, `file_stats`, `peers`, `trackers` ### Status values: | Value | Meaning | |-------|---------| | 0 | Stopped | | 1 | Queued to verify | | 2 | Verifying | | 3 | Queued to download | | 4 | Downloading | | 5 | Queued to seed | | 6 | Seeding | ## torrent_add params: - `filename`: URL or path to .torrent file - `metainfo`: Base64-encoded .torrent content - `download_dir`: Download location - `paused`: Start paused (bool) - `labels`: Array of string labels ## Session Methods | Method | Description | |--------|-------------| | `session_get` | Get session settings (optional `fields`) | | `session_set` | Modify session settings | | `session_stats` | Get statistics | | `session_close` | Shutdown daemon | | `free_space` | Check free space (`path`) | ### Common session settings: - `download_dir`, `incomplete_dir` - `speed_limit_down`, `speed_limit_down_enabled` - `speed_limit_up`, `speed_limit_up_enabled` - `peer_limit_global`, `peer_limit_per_torrent` ## Queue Methods | Method | Params | |--------|--------| | `queue_move_top` | `ids` | | `queue_move_up` | `ids` | | `queue_move_down` | `ids` | | `queue_move_bottom` | `ids` | ## Bandwidth Groups | Method | Description | |--------|-------------| | `group_get` | Get bandwidth groups (optional `group` name/array) | | `group_set` | Create/modify group (`name`, speed limits) | ## Full Documentation See: https://github.com/transmission/transmission/blob/main/docs/rpc-spec.md """ @mcp.resource("transmission://rpc-reference") def get_rpc_reference() -> str: """ Returns the Transmission RPC API reference documentation. Use this to learn available methods and parameters for the rpc_call tool. """ return RPC_REFERENCE # ============================================================================= # Health Check & ASGI App # ============================================================================= async def health(request) -> JSONResponse: """Health check endpoint for container orchestration.""" # Quick check - just verify we can list instances instance_count = len(registry.list_names()) return JSONResponse( { "status": "ok", "instances_configured": instance_count, "default_instance": registry.get_default(), } ) def create_app() -> Starlette: """Create the Starlette ASGI application wrapping the MCP server.""" mcp_app = mcp.http_app() return Starlette( routes=[ Route("/health", health), Mount("/", app=mcp_app), ], lifespan=mcp_app.lifespan, ) app = create_app() if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "8000")) host = os.getenv("HOST", "0.0.0.0") uvicorn.run(app, host=host, port=port)