Initial commit: multi-instance Transmission MCP server
Some checks failed
Build and Push Docker Image / build (push) Failing after 42s

This commit is contained in:
Ben
2025-12-30 04:38:44 +00:00
commit 7812a3c42d
10 changed files with 2122 additions and 0 deletions

32
.env.example Normal file
View File

@@ -0,0 +1,32 @@
# Transmission MCP Server Configuration
# Copy this file to .env and configure your instances
# JSON array of Transmission instances
# Each instance requires: name, url
# Optional: username, password (for authenticated instances)
TRANSMISSION_INSTANCES='[
{
"name": "home",
"url": "http://192.168.1.100:9091",
"username": "admin",
"password": "secretpassword"
},
{
"name": "seedbox",
"url": "https://seedbox.example.com:443/transmission",
"username": "user",
"password": "pass"
},
{
"name": "local",
"url": "http://localhost:9091"
}
]'
# Default instance to use when no instance is specified
# Must match a "name" from TRANSMISSION_INSTANCES
TRANSMISSION_DEFAULT_INSTANCE=home
# Server settings
HOST=0.0.0.0
PORT=8000

View File

@@ -0,0 +1,46 @@
name: Build and Push Docker Image
on:
push:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
container:
image: catthehacker/ubuntu:act-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Gitea Container Registry
uses: docker/login-action@v3
with:
registry: git.nixc.us
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: git.nixc.us/${{ github.repository }}
tags: |
type=ref,event=branch
type=sha,prefix=
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max

1
BLUEPRINT.md Symbolic link
View File

@@ -0,0 +1 @@
../BLUEPRINT.md

18
Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
COPY pyproject.toml .
RUN pip install --no-cache-dir .
# Copy application
COPY server.py .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8000/health').raise_for_status()"
EXPOSE 8000
CMD ["python", "server.py"]

180
IMPLEMENTATION.md Normal file
View File

@@ -0,0 +1,180 @@
# Transmission MCP Server - Implementation Guide
This document provides technical details for AI agents and developers working with this MCP server.
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Starlette App │
├─────────────┬───────────────────────────────────────────┤
│ /health │ FastMCP App │
│ endpoint │ ┌─────────────────────────────────────┐ │
│ │ │ Tools: │ │
│ │ │ - list_instances │ │
│ │ │ - get_torrents │ │
│ │ │ - get_session_stats │ │
│ │ │ - rpc_call (pass-through) │ │
│ │ ├─────────────────────────────────────┤ │
│ │ │ Resources: │ │
│ │ │ - transmission://api-reference │ │
│ │ └─────────────────────────────────────┘ │
└─────────────┴───────────────────────────────────────────┘
┌───────────────────────────────┐
│ InstanceRegistry │
│ ┌─────────────────────────┐ │
│ │ TransmissionClient[home]│ │
│ │ TransmissionClient[seed]│ │
│ │ TransmissionClient[...] │ │
│ └─────────────────────────┘ │
└───────────────────────────────┘
┌───────────────────────────────┐
│ Transmission Daemons (RPC) │
└───────────────────────────────┘
```
## Key Classes
### TransmissionClient
Handles communication with a single Transmission daemon:
- Manages CSRF token (`X-Transmission-Session-Id`)
- Automatic retry on 409 responses
- HTTP Basic Auth support (optional)
- JSON-RPC 2.0 protocol (Transmission 4.1+)
### InstanceRegistry
Manages multiple TransmissionClient instances:
- Loads configuration from `TRANSMISSION_INSTANCES` env var
- Provides lookup by instance name
- Tracks default instance
- Health check aggregation
## Transmission RPC Protocol
This server uses the **JSON-RPC 2.0** protocol introduced in Transmission 4.1.0. Key points:
1. All method/field names use `snake_case` (not the old camelCase/kebab-case)
2. Request format:
```json
{
"jsonrpc": "2.0",
"method": "torrent_get",
"params": {"fields": ["name", "status"]},
"id": 1
}
```
3. CSRF protection requires `X-Transmission-Session-Id` header
4. On 409 response, extract session ID from response header and retry
## Common RPC Methods
### Torrent Operations
| Method | Description |
|--------|-------------|
| `torrent_get` | Get torrent info (requires `fields` array) |
| `torrent_add` | Add torrent (requires `filename` or `metainfo`) |
| `torrent_remove` | Remove torrent (optional `delete_local_data`) |
| `torrent_start` | Start torrents |
| `torrent_stop` | Stop/pause torrents |
| `torrent_set` | Modify torrent settings |
| `torrent_set_location` | Move torrent data |
### Session Operations
| Method | Description |
|--------|-------------|
| `session_get` | Get session settings (optional `fields`) |
| `session_set` | Modify session settings |
| `session_stats` | Get transfer statistics |
| `free_space` | Check disk space (requires `path`) |
### Torrent Status Values
| Value | Meaning |
|-------|---------|
| 0 | Stopped |
| 1 | Queued to verify |
| 2 | Verifying |
| 3 | Queued to download |
| 4 | Downloading |
| 5 | Queued to seed |
| 6 | Seeding |
## Common torrent_get Fields
```python
TORRENT_FIELDS = [
"id", "name", "status", "hash_string",
"percent_done", "rate_download", "rate_upload",
"total_size", "downloaded_ever", "uploaded_ever",
"eta", "error", "error_string", "is_finished",
"peers_connected", "labels", "download_dir"
]
```
## Error Handling
The server returns JSON error responses:
```json
{
"error": "Instance 'unknown' not found",
"available_instances": ["home", "seedbox"]
}
```
For RPC errors, the Transmission error is passed through:
```json
{
"error": {
"code": -32602,
"message": "Invalid params",
"data": {"errorString": "..."}
}
}
```
## Adding New Specific Tools
To add a new convenience tool, follow this pattern:
```python
@mcp.tool()
async def my_new_tool(instance: str = "") -> str:
"""Tool description for AI agents.
Args:
instance: Target instance name (uses default if empty)
"""
try:
client = registry.get(instance)
result = await client.rpc("some_method", {"param": "value"})
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)})
```
## Testing
```bash
# Install dependencies
pip install -e .
# Create test config
cp .env.example .env
# Edit .env with your Transmission instance
# Run server
python server.py
# Test health endpoint
curl http://localhost:8000/health
# Test with MCP client or use transmission-remote for RPC comparison
transmission-remote --debug localhost:9091 -l
```

136
README.md Normal file
View File

@@ -0,0 +1,136 @@
# Transmission MCP Server
A multi-instance Transmission BitTorrent client MCP server following the "Hybrid MCP Light" pattern.
## Features
- **Multi-instance support**: Connect to multiple Transmission daemons simultaneously
- **Default instance**: Configure a default instance for convenience
- **Full API coverage**: Pass-through tool for any Transmission RPC method
- **Embedded documentation**: API reference available as an MCP resource
## Quick Start
1. Copy the environment template:
```bash
cp .env.example .env
```
2. Edit `.env` with your Transmission instances:
```bash
TRANSMISSION_INSTANCES='[
{"name": "home", "url": "http://localhost:9091", "username": "admin", "password": "secret"},
{"name": "seedbox", "url": "https://seedbox.example.com/transmission"}
]'
TRANSMISSION_DEFAULT_INSTANCE=home
```
3. Run with Docker:
```bash
docker-compose up -d
```
Or run directly:
```bash
pip install .
python server.py
```
## Configuration
### Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `TRANSMISSION_INSTANCES` | Yes | JSON array of instance configurations |
| `TRANSMISSION_DEFAULT_INSTANCE` | No | Name of the default instance |
| `HOST` | No | Server bind address (default: `0.0.0.0`) |
| `PORT` | No | Server port (default: `8000`) |
### Instance Configuration
Each instance in `TRANSMISSION_INSTANCES` supports:
| Field | Required | Description |
|-------|----------|-------------|
| `name` | Yes | Unique identifier for this instance |
| `url` | Yes | Full URL to Transmission RPC endpoint |
| `username` | No | Username for HTTP Basic Auth |
| `password` | No | Password for HTTP Basic Auth |
## MCP Tools
### `list_instances`
Returns all configured Transmission instances with connectivity status.
### `get_torrents`
Get torrent list with common fields for a specific instance.
**Parameters:**
- `instance` (optional): Instance name (uses default if not specified)
- `ids` (optional): Specific torrent IDs to fetch
### `get_session_stats`
Get session statistics including speeds and torrent counts.
**Parameters:**
- `instance` (optional): Instance name (uses default if not specified)
### `rpc_call`
Execute any Transmission RPC method directly.
**Parameters:**
- `method`: RPC method name (e.g., `torrent_get`, `session_set`)
- `params` (optional): JSON string of method parameters
- `instance` (optional): Instance name (uses default if not specified)
## MCP Resources
### `transmission://api-reference`
Embedded Transmission RPC API documentation for AI agent reference.
## Examples
### List all torrents on default instance
```json
{"tool": "get_torrents"}
```
### Get torrents from specific instance
```json
{"tool": "get_torrents", "arguments": {"instance": "seedbox"}}
```
### Add a torrent via pass-through
```json
{
"tool": "rpc_call",
"arguments": {
"method": "torrent_add",
"params": "{\"filename\": \"magnet:?xt=urn:btih:...\"}"
}
}
```
### Pause all torrents
```json
{
"tool": "rpc_call",
"arguments": {
"method": "torrent_stop",
"params": "{}"
}
}
```
## Health Check
The server exposes a `/health` endpoint for Docker health checks and monitoring.
```bash
curl http://localhost:8000/health
```
## License
MIT

18
docker-compose.yml Normal file
View File

@@ -0,0 +1,18 @@
services:
transmission-mcp:
build: .
container_name: transmission-mcp
restart: unless-stopped
ports:
- "8000:8000"
env_file:
- .env
environment:
- HOST=0.0.0.0
- PORT=8000
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8000/health').raise_for_status()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 5s

25
pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[project]
name = "transmission-mcp"
version = "0.1.0"
description = "MCP server for managing multiple Transmission daemon instances"
requires-python = ">=3.11"
dependencies = [
"fastmcp>=2.0.0",
"httpx>=0.28.0",
"starlette>=0.45.0",
"uvicorn>=0.34.0",
"python-dotenv>=1.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.24.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["."]

1094
rpc-spec.md Normal file

File diff suppressed because it is too large Load Diff

572
server.py Normal file
View File

@@ -0,0 +1,572 @@
"""
Transmission MCP Server - Multi-instance Transmission daemon management via MCP.
Follows the "Hybrid MCP Light" pattern:
- Minimal specific tools for common operations
- Raw RPC pass-through for full API access
- Embedded API documentation as a resource
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from typing import Any
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
load_dotenv()
# =============================================================================
# Configuration
# =============================================================================
@dataclass
class InstanceConfig:
"""Configuration for a single Transmission instance."""
name: str
url: str
username: str | None = None
password: str | None = None
@property
def rpc_url(self) -> str:
"""Full RPC endpoint URL."""
base = self.url.rstrip("/")
if not base.endswith("/transmission/rpc"):
base = f"{base}/transmission/rpc"
return base
def load_instances() -> list[InstanceConfig]:
"""Load instance configurations from TRANSMISSION_INSTANCES env var."""
raw = os.getenv("TRANSMISSION_INSTANCES", "[]")
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid TRANSMISSION_INSTANCES JSON: {e}") from e
instances = []
for item in data:
if not isinstance(item, dict):
raise ValueError(f"Each instance must be an object, got: {type(item)}")
if "name" not in item or "url" not in item:
raise ValueError("Each instance requires 'name' and 'url' fields")
instances.append(
InstanceConfig(
name=item["name"],
url=item["url"],
username=item.get("username"),
password=item.get("password"),
)
)
return instances
def get_default_instance() -> str | None:
"""Get the default instance name from env."""
return os.getenv("TRANSMISSION_DEFAULT_INSTANCE")
# =============================================================================
# Transmission Client
# =============================================================================
class TransmissionClient:
"""
Async client for a single Transmission daemon instance.
Handles:
- HTTP Basic authentication (optional)
- CSRF token management (X-Transmission-Session-Id)
- JSON-RPC 2.0 protocol
"""
def __init__(self, config: InstanceConfig):
self.config = config
self._session_id: str | None = None
self._request_id = 0
# Build auth if credentials provided
auth = None
if config.username:
auth = httpx.BasicAuth(config.username, config.password or "")
self._client = httpx.AsyncClient(
auth=auth,
timeout=30.0,
headers={"Content-Type": "application/json"},
)
async def close(self) -> None:
"""Close the HTTP client."""
await self._client.aclose()
def _next_request_id(self) -> int:
"""Generate unique request ID."""
self._request_id += 1
return self._request_id
async def rpc(
self, method: str, params: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Execute a JSON-RPC 2.0 call to the Transmission daemon.
Args:
method: RPC method name (e.g., "torrent_get", "session_stats")
params: Method parameters
Returns:
The 'result' field from the response, or error details
Raises:
httpx.HTTPError: On network/HTTP errors after retry
"""
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self._next_request_id(),
}
if params:
payload["params"] = params
# Try request, handle CSRF token refresh on 409
response = None
for attempt in range(2):
headers = {}
if self._session_id:
headers["X-Transmission-Session-Id"] = self._session_id
response = await self._client.post(
self.config.rpc_url,
json=payload,
headers=headers,
)
if response.status_code == 409:
# CSRF token expired/missing - extract new one and retry
self._session_id = response.headers.get("X-Transmission-Session-Id")
if not self._session_id:
raise ValueError("Got 409 but no X-Transmission-Session-Id header")
continue
response.raise_for_status()
break
if response is None:
raise RuntimeError("No response received from Transmission")
data = response.json()
# Handle JSON-RPC error response
if "error" in data:
error = data["error"]
return {
"error": True,
"code": error.get("code"),
"message": error.get("message"),
"data": error.get("data"),
}
return data.get("result", {})
async def health_check(self) -> dict[str, Any]:
"""Check connectivity to this instance."""
try:
result = await self.rpc(
"session_get", {"fields": ["version", "rpc_version_semver"]}
)
if "error" in result:
return {"healthy": False, "error": result["message"]}
return {
"healthy": True,
"version": result.get("version"),
"rpc_version": result.get("rpc_version_semver"),
}
except Exception as e:
return {"healthy": False, "error": str(e)}
# =============================================================================
# Instance Registry
# =============================================================================
class InstanceRegistry:
"""Manages multiple TransmissionClient instances."""
def __init__(self, configs: list[InstanceConfig], default: str | None = None):
self._clients: dict[str, TransmissionClient] = {}
self._default = default
for config in configs:
self._clients[config.name] = TransmissionClient(config)
# Validate default exists
if self._default and self._default not in self._clients:
raise ValueError(
f"Default instance '{self._default}' not found in configured instances: "
f"{list(self._clients.keys())}"
)
def get(self, name: str | None = None) -> TransmissionClient:
"""
Get a client by name, or the default if name is None.
Raises:
KeyError: If instance not found or no default configured
"""
if name is None:
if self._default is None:
raise KeyError("No instance name provided and no default configured")
name = self._default
if name not in self._clients:
raise KeyError(
f"Instance '{name}' not found. Available: {list(self._clients.keys())}"
)
return self._clients[name]
def list_names(self) -> list[str]:
"""List all configured instance names."""
return list(self._clients.keys())
def get_default(self) -> str | None:
"""Get the default instance name."""
return self._default
async def close_all(self) -> None:
"""Close all clients."""
for client in self._clients.values():
await client.close()
# =============================================================================
# Initialize Registry
# =============================================================================
try:
_instances = load_instances()
_default = get_default_instance()
registry = InstanceRegistry(_instances, _default)
except Exception as e:
# Allow server to start even with bad config for debugging
print(f"WARNING: Failed to load instances: {e}")
registry = InstanceRegistry([])
# =============================================================================
# MCP Server
# =============================================================================
mcp = FastMCP(
name="transmission-mcp",
instructions="""
Transmission MCP Server - Manage multiple Transmission BitTorrent daemon instances.
Use 'list_instances' to see available instances and their status.
Use 'get_torrents' and 'get_session_stats' for common operations.
Use 'rpc_call' for any Transmission RPC method not covered by specific tools.
Refer to the 'transmission://rpc-reference' resource for API documentation.
""",
)
# =============================================================================
# MCP Tools
# =============================================================================
@mcp.tool()
async def list_instances() -> str:
"""
List all configured Transmission instances and their connectivity status.
Returns a JSON object with:
- instances: Array of instance objects with name, url, healthy status, and version info
- default: The default instance name (if configured)
"""
results = []
for name in registry.list_names():
client = registry.get(name)
health = await client.health_check()
results.append(
{
"name": name,
"url": client.config.url,
"authenticated": client.config.username is not None,
**health,
}
)
return json.dumps(
{
"instances": results,
"default": registry.get_default(),
},
indent=2,
)
@mcp.tool()
async def get_torrents(
instance: str | None = None,
ids: str | None = None,
) -> str:
"""
Get torrent list with common fields from a Transmission instance.
Args:
instance: Instance name (uses default if not specified)
ids: Optional torrent IDs - can be:
- Omitted for all torrents
- A single ID number as string
- Comma-separated IDs (e.g., "1,5,10")
- "recently_active" for recently changed torrents
Returns JSON with torrents array containing: id, name, status, percent_done,
rate_download, rate_upload, total_size, error, error_string, labels
"""
try:
client = registry.get(instance)
except KeyError as e:
return json.dumps({"error": str(e)})
params: dict[str, Any] = {
"fields": [
"id",
"name",
"status",
"percent_done",
"rate_download",
"rate_upload",
"total_size",
"error",
"error_string",
"labels",
"eta",
"download_dir",
]
}
# Parse ids parameter
if ids:
if ids == "recently_active":
params["ids"] = "recently_active"
elif "," in ids:
params["ids"] = [int(i.strip()) for i in ids.split(",")]
else:
params["ids"] = [int(ids)]
result = await client.rpc("torrent_get", params)
return json.dumps(result, indent=2)
@mcp.tool()
async def get_session_stats(instance: str | None = None) -> str:
"""
Get session statistics from a Transmission instance.
Args:
instance: Instance name (uses default if not specified)
Returns JSON with: active_torrent_count, paused_torrent_count, torrent_count,
download_speed, upload_speed, cumulative_stats, current_stats
"""
try:
client = registry.get(instance)
except KeyError as e:
return json.dumps({"error": str(e)})
result = await client.rpc("session_stats")
return json.dumps(result, indent=2)
@mcp.tool()
async def rpc_call(
method: str,
params: str = "{}",
instance: str | None = None,
) -> str:
"""
Execute a raw JSON-RPC 2.0 call to a Transmission instance.
This is the "escape hatch" for any RPC method not covered by specific tools.
Refer to the 'transmission://rpc-reference' resource for available methods.
Args:
method: RPC method name (e.g., "torrent_add", "torrent_set", "session_set")
params: JSON string of method parameters (default: "{}")
instance: Instance name (uses default if not specified)
Examples:
- Add torrent: method="torrent_add", params='{"filename": "magnet:?xt=..."}'
- Start torrent: method="torrent_start", params='{"ids": [1, 2]}'
- Set speed limit: method="session_set", params='{"speed_limit_down": 1000}'
"""
try:
client = registry.get(instance)
except KeyError as e:
return json.dumps({"error": str(e)})
try:
parsed_params = json.loads(params)
except json.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON params: {e}"})
result = await client.rpc(method, parsed_params if parsed_params else None)
return json.dumps(result, indent=2)
# =============================================================================
# MCP Resource - API Reference
# =============================================================================
RPC_REFERENCE = """
# Transmission RPC Reference (JSON-RPC 2.0)
## Overview
Transmission 4.1.0+ uses JSON-RPC 2.0 with snake_case naming.
All methods require the `method` name and optional `params` object.
## Torrent Actions
| Method | Description | Key Params |
|--------|-------------|------------|
| `torrent_start` | Start torrents | `ids` |
| `torrent_start_now` | Start immediately (skip queue) | `ids` |
| `torrent_stop` | Stop torrents | `ids` |
| `torrent_verify` | Verify torrent data | `ids` |
| `torrent_reannounce` | Re-announce to trackers | `ids` |
**`ids` parameter**: integer, array of integers/hashes, or "recently_active"
## Torrent Accessors
| Method | Description |
|--------|-------------|
| `torrent_get` | Get torrent info (requires `fields` array) |
| `torrent_set` | Modify torrent settings |
| `torrent_add` | Add new torrent (`filename` or `metainfo`) |
| `torrent_remove` | Remove torrent (`delete_local_data`: bool) |
| `torrent_set_location` | Move torrent (`location`, `move`: bool) |
| `torrent_rename_path` | Rename file/folder (`path`, `name`) |
### Common `torrent_get` fields:
- `id`, `name`, `status`, `hash_string`
- `percent_done`, `percent_complete`, `eta`
- `rate_download`, `rate_upload` (bytes/sec)
- `total_size`, `size_when_done`, `left_until_done`
- `download_dir`, `labels`, `error`, `error_string`
- `files`, `file_stats`, `peers`, `trackers`
### Status values:
| Value | Meaning |
|-------|---------|
| 0 | Stopped |
| 1 | Queued to verify |
| 2 | Verifying |
| 3 | Queued to download |
| 4 | Downloading |
| 5 | Queued to seed |
| 6 | Seeding |
## torrent_add params:
- `filename`: URL or path to .torrent file
- `metainfo`: Base64-encoded .torrent content
- `download_dir`: Download location
- `paused`: Start paused (bool)
- `labels`: Array of string labels
## Session Methods
| Method | Description |
|--------|-------------|
| `session_get` | Get session settings (optional `fields`) |
| `session_set` | Modify session settings |
| `session_stats` | Get statistics |
| `session_close` | Shutdown daemon |
| `free_space` | Check free space (`path`) |
### Common session settings:
- `download_dir`, `incomplete_dir`
- `speed_limit_down`, `speed_limit_down_enabled`
- `speed_limit_up`, `speed_limit_up_enabled`
- `peer_limit_global`, `peer_limit_per_torrent`
## Queue Methods
| Method | Params |
|--------|--------|
| `queue_move_top` | `ids` |
| `queue_move_up` | `ids` |
| `queue_move_down` | `ids` |
| `queue_move_bottom` | `ids` |
## Bandwidth Groups
| Method | Description |
|--------|-------------|
| `group_get` | Get bandwidth groups (optional `group` name/array) |
| `group_set` | Create/modify group (`name`, speed limits) |
## Full Documentation
See: https://github.com/transmission/transmission/blob/main/docs/rpc-spec.md
"""
@mcp.resource("transmission://rpc-reference")
def get_rpc_reference() -> str:
"""
Returns the Transmission RPC API reference documentation.
Use this to learn available methods and parameters for the rpc_call tool.
"""
return RPC_REFERENCE
# =============================================================================
# Health Check & ASGI App
# =============================================================================
async def health(request) -> JSONResponse:
"""Health check endpoint for container orchestration."""
# Quick check - just verify we can list instances
instance_count = len(registry.list_names())
return JSONResponse(
{
"status": "ok",
"instances_configured": instance_count,
"default_instance": registry.get_default(),
}
)
def create_app() -> Starlette:
"""Create the Starlette ASGI application wrapping the MCP server."""
mcp_app = mcp.http_app()
return Starlette(
routes=[
Route("/health", health),
Mount("/", app=mcp_app),
],
lifespan=mcp_app.lifespan,
)
app = create_app()
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
host = os.getenv("HOST", "0.0.0.0")
uvicorn.run(app, host=host, port=port)