Files
outline-mcp-custom/server.py
T
b3nw d637394c0e
Build and Push Outline MCP Docker Image / build (push) Successful in 11s
feat: add secure HTTP streaming upload gateway and address code review findings
2026-05-25 02:15:10 +00:00

765 lines
22 KiB
Python

import os
import json
import logging
import mimetypes
import contextlib
from typing import Any
import httpx
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Configuration ---
OUTLINE_API_URL = os.getenv("OUTLINE_API_URL", "").rstrip("/")
OUTLINE_API_TOKEN = os.getenv("OUTLINE_API_TOKEN", "")
class OutlineClient:
"""HTTP client for Outline API with authentication."""
def __init__(self, base_url: str, api_token: str):
self.base_url = base_url.rstrip("/")
self.api_token = api_token
self._client: httpx.AsyncClient | None = None
@property
def client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers={
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
},
timeout=30.0,
)
return self._client
async def call(self, method: str, params: dict | None = None) -> dict:
"""Execute an Outline API call.
Args:
method: API method (e.g., 'documents.list', 'collections.info')
params: Optional parameters to send in request body
Returns:
API response as dict
"""
if params is None:
params = {}
try:
response = await self.client.post(f"/api/{method}", json=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
error_body = e.response.text
try:
error_json = e.response.json()
return {
"ok": False,
"error": error_json.get("message", str(e)),
"status": e.response.status_code,
}
except Exception:
return {
"ok": False,
"error": error_body or str(e),
"status": e.response.status_code,
}
except httpx.RequestError as e:
return {"ok": False, "error": f"Request failed: {str(e)}"}
async def check_health(self) -> tuple[bool, str]:
"""Check API connectivity by calling auth.info."""
try:
result = await self.call("auth.info")
if result.get("ok"):
return True, "Connected"
return False, result.get("error", "Unknown error")
except Exception as e:
return False, str(e)
async def close(self):
if self._client and not self._client.is_closed:
await self._client.aclose()
# Initialize client
outline_client = OutlineClient(OUTLINE_API_URL, OUTLINE_API_TOKEN)
# --- FastMCP Server ---
mcp = FastMCP("Outline MCP")
@mcp.tool()
async def search_documents(
query: str,
collection_id: str = None,
limit: int = 25,
include_archived: bool = False,
) -> str:
"""Search for documents in Outline using full-text search.
Args:
query: Search query string
collection_id: Optional collection ID to search within
limit: Maximum number of results (default 25, max 100)
include_archived: Include archived documents in results
Returns:
JSON string with search results containing document titles, IDs, and context
"""
params = {
"query": query,
"limit": min(limit, 100),
"includeArchived": include_archived,
}
if collection_id:
params["collectionId"] = collection_id
result = await outline_client.call("documents.search", params)
return json.dumps(result, indent=2)
@mcp.tool()
async def get_document(document_id: str, share_id: str = None) -> str:
"""Retrieve a document by ID, returning its full Markdown content.
Args:
document_id: The document UUID
share_id: Optional share ID for accessing shared documents
Returns:
JSON string with document metadata and Markdown content
"""
params = {"id": document_id}
if share_id:
params["shareId"] = share_id
result = await outline_client.call("documents.info", params)
return json.dumps(result, indent=2)
@mcp.tool()
async def list_collections(limit: int = 25, offset: int = 0) -> str:
"""List all collections (folders) in the Outline workspace.
Args:
limit: Maximum number of results (default 25)
offset: Pagination offset
Returns:
JSON string with collection names, IDs, and metadata
"""
params = {"limit": limit, "offset": offset}
result = await outline_client.call("collections.list", params)
return json.dumps(result, indent=2)
@mcp.tool()
async def list_collection_documents(
collection_id: str, limit: int = 25, offset: int = 0
) -> str:
"""List all documents within a specific collection.
Args:
collection_id: The collection UUID
limit: Maximum number of results (default 25)
offset: Pagination offset
Returns:
JSON string with document tree structure for the collection
"""
params = {"id": collection_id, "limit": limit, "offset": offset}
result = await outline_client.call("collections.documents", params)
return json.dumps(result, indent=2)
@mcp.tool()
async def outline_api_call(method: str, params: Any = None) -> str:
"""Execute a raw Outline API call for any endpoint.
Use the 'outline://api-reference' resource to discover available methods
and their parameters.
Args:
method: API method name (e.g., 'documents.create', 'users.list')
params: Parameters as JSON string or dict (optional)
Returns:
JSON string with raw API response
Examples:
- method='documents.create', params='{"title": "New Doc", "collectionId": "uuid"}'
- method='users.list', params={"limit": 10}
- method='documents.delete', params='{"id": "document-uuid"}'
"""
# Handle params being string, dict, or None/empty
if params is None or params == "" or params == {}:
parsed_params = {}
elif isinstance(params, dict):
parsed_params = params
elif isinstance(params, str):
try:
parsed_params = json.loads(params) if params.strip() else {}
except json.JSONDecodeError as e:
return json.dumps({"ok": False, "error": f"Invalid JSON params: {str(e)}"})
else:
return json.dumps(
{
"ok": False,
"error": f"params must be string or dict, got {type(params).__name__}",
}
)
result = await outline_client.call(method, parsed_params)
return json.dumps(result, indent=2)
# --- API Reference Resource ---
API_REFERENCE = """# Outline API Reference
Outline uses an RPC-style API where all requests are POST to /api/<method>.
All responses have format: {"ok": true/false, "data": ...}
## Authentication
All requests require Bearer token: Authorization: Bearer <token>
Get your token from Outline Settings > API Tokens.
## Pagination
Most list endpoints accept: limit (default 25, max 100), offset (default 0)
Response includes: pagination.nextPath if more results exist.
---
## Documents
### documents.list
List documents. Params: collectionId?, userId?, parentDocumentId?, sort?, direction?, limit?, offset?
### documents.info
Get document by ID. Params: id (required), shareId?
### documents.search
Full-text search. Params: query (required), collectionId?, userId?, dateFilter?, includeArchived?, includeDrafts?, limit?, offset?
### documents.create
Create document. Params: title (required), collectionId (required), text?, parentDocumentId?, templateId?, template?, publish?
### documents.update
Update document. Params: id (required), title?, text?, append?, publish?, done?
### documents.delete
Delete document. Params: id (required), permanent?
### documents.move
Move document. Params: id (required), collectionId?, parentDocumentId?
### documents.archive
Archive document. Params: id (required)
### documents.unarchive
Restore archived document. Params: id (required)
### documents.restore
Restore deleted document. Params: id (required), revisionId?
### documents.export
Export document. Params: id (required)
Returns Markdown content.
### documents.import
Import document. Multipart form: file, collectionId, parentDocumentId?, template?, publish?
### documents.star
Star a document. Params: id (required)
### documents.unstar
Remove star. Params: id (required)
### documents.templatize
Convert to template. Params: id (required)
### documents.unpublish
Unpublish document. Params: id (required)
---
## Collections
### collections.list
List all collections. Params: limit?, offset?
### collections.info
Get collection details. Params: id (required)
### collections.documents
Get document structure for collection. Params: id (required), limit?, offset?
### collections.create
Create collection. Params: name (required), description?, color?, permission?, sharing?
### collections.update
Update collection. Params: id (required), name?, description?, color?, permission?, sharing?
### collections.delete
Delete collection. Params: id (required)
### collections.add_user
Add user to collection. Params: id (required), userId (required), permission?
### collections.remove_user
Remove user from collection. Params: id (required), userId (required)
### collections.memberships
List collection members. Params: id (required), query?, permission?, limit?, offset?
### collections.add_group
Add group to collection. Params: id (required), groupId (required), permission?
### collections.remove_group
Remove group. Params: id (required), groupId (required)
### collections.group_memberships
List group memberships. Params: id (required), query?, permission?, limit?, offset?
### collections.export
Export entire collection. Params: id (required), format?
### collections.export_all
Export all collections. Params: format?
---
## Users
### users.list
List workspace users. Params: query?, filter?, sort?, direction?, limit?, offset?
Filter options: all, active, invited, suspended, admins
### users.info
Get user by ID. Params: id (required)
### users.invite
Invite users. Params: invites (array of {email, name, role})
### users.update
Update user. Params: id (required), name?, avatarUrl?
### users.delete
Delete user. Params: id (required)
### users.suspend
Suspend user. Params: id (required)
### users.activate
Activate suspended user. Params: id (required)
### users.promote
Promote to admin. Params: id (required)
### users.demote
Remove admin. Params: id (required)
---
## Groups
### groups.list
List groups. Params: query?, sort?, direction?, limit?, offset?
### groups.info
Get group. Params: id (required)
### groups.create
Create group. Params: name (required)
### groups.update
Update group. Params: id (required), name (required)
### groups.delete
Delete group. Params: id (required)
### groups.memberships
List group members. Params: id (required), query?, limit?, offset?
### groups.add_user
Add user to group. Params: id (required), userId (required)
### groups.remove_user
Remove user from group. Params: id (required), userId (required)
---
## Comments
### comments.list
List comments on document. Params: documentId (required), limit?, offset?
### comments.create
Create comment. Params: documentId (required), data (required - ProseMirror JSON)
### comments.update
Update comment. Params: id (required), data (required)
### comments.delete
Delete comment. Params: id (required)
---
## Revisions
### revisions.list
List document revisions. Params: documentId (required), limit?, offset?
### revisions.info
Get revision. Params: id (required)
---
## Shares
### shares.list
List document shares. Params: documentId?, sort?, direction?, limit?, offset?
### shares.info
Get share by ID or documentId. Params: id?, documentId?
### shares.create
Create share link. Params: documentId (required), published?, urlId?, includeChildDocuments?
### shares.update
Update share. Params: id (required), published?, urlId?, includeChildDocuments?
### shares.revoke
Revoke share. Params: id (required)
---
## Stars
### stars.list
List starred documents. Params: limit?, offset?
### stars.create
Star an item. Params: documentId?, collectionId?
### stars.delete
Remove star. Params: id (required)
---
## Events
### events.list
List audit events. Params: name?, actorId?, documentId?, collectionId?, auditLog?, sort?, direction?, limit?, offset?
---
## File Operations
### fileOperations.list
List file operations. Params: type?, limit?, offset?
### fileOperations.info
Get file operation status. Params: id (required)
### fileOperations.redirect
Get download URL. Params: id (required)
### fileOperations.delete
Delete file operation. Params: id (required)
---
## Attachments
### attachments.create
Create attachment upload URL. Params: name (required), documentId (required), size (required), contentType (required)
### attachments.delete
Delete attachment. Params: id (required)
### attachments.redirect
Get attachment URL. Params: id (required)
### Custom HTTP Gateway Upload
Rather than manually performing the multi-step attachments.create + PUT process, you can stream files directly through the MCP server's built-in HTTP streaming gateway:
- **Endpoint**: `POST /upload`
- **Query Parameters**:
- `documentId` (required): The UUID of the destination Outline document.
- `name` (required): The filename of the attachment.
- **Headers**:
- `Content-Length` (required): Total size of the file in bytes.
- `Content-Type` (optional): The MIME type of the file (auto-detected if omitted).
- **Body**: Raw binary stream of the file content.
- **Response**: JSON containing the created attachment's details, including its public access `url` on success.
---
## Auth
### auth.info
Get current user and team info. No params.
### auth.config
Get auth configuration. No params.
---
## Views
### views.list
List document views. Params: documentId (required), limit?, offset?
### views.create
Record a view. Params: documentId (required)
---
## Common Response Patterns
Success:
{"ok": true, "data": {...}, "pagination": {"nextPath": "...", "limit": 25, "offset": 0}}
Error:
{"ok": false, "error": "message", "status": 401}
Rate Limiting:
HTTP 429 with Retry-After header
"""
@mcp.resource("outline://api-reference")
def get_api_reference() -> str:
"""Returns comprehensive Outline API documentation for use with outline_api_call tool."""
return API_REFERENCE
# --- Upload Endpoint ---
async def upload_endpoint(request: Request) -> JSONResponse:
"""HTTP streaming endpoint to upload file attachments to Outline.
This endpoint acts as a non-buffering gateway/proxy. It calls Outline's
attachments.create to register the metadata and obtain an upload URL,
then streams the incoming request body directly to S3.
"""
# 1. Authorization validation
auth_header = request.headers.get("Authorization")
if not auth_header or auth_header != f"Bearer {OUTLINE_API_TOKEN}":
return JSONResponse(
{"ok": False, "error": "Unauthorized"},
status_code=401,
)
document_id = request.query_params.get("documentId")
if not document_id:
return JSONResponse(
{"ok": False, "error": "Missing 'documentId' query parameter"},
status_code=400,
)
filename = request.query_params.get("name")
if not filename:
return JSONResponse(
{"ok": False, "error": "Missing 'name' query parameter"},
status_code=400,
)
# Size is required by attachments.create
content_length_str = request.headers.get("content-length")
if not content_length_str:
return JSONResponse(
{"ok": False, "error": "Missing 'Content-Length' header"},
status_code=400,
)
try:
size = int(content_length_str)
except ValueError:
return JSONResponse(
{"ok": False, "error": "Invalid 'Content-Length' header"},
status_code=400,
)
# Validate file size limits (abuses and negative boundary checks)
MAX_UPLOAD_SIZE = int(os.getenv("MAX_UPLOAD_SIZE", "104857600")) # Default 100MB
if size < 0:
return JSONResponse(
{"ok": False, "error": "Invalid file size (must be non-negative)"},
status_code=400,
)
if size > MAX_UPLOAD_SIZE:
return JSONResponse(
{"ok": False, "error": f"File too large (exceeds limit of {MAX_UPLOAD_SIZE} bytes)"},
status_code=413,
)
# Detect content type
content_type = (
request.headers.get("content-type")
or mimetypes.guess_type(filename)[0]
or "application/octet-stream"
)
# 2. Initialize upload session in Outline
create_params = {
"name": filename,
"documentId": document_id,
"size": size,
"contentType": content_type,
}
logger.info(f"Initiating attachment creation in Outline: {create_params}")
create_response = await outline_client.call("attachments.create", create_params)
if not create_response.get("ok"):
return JSONResponse(
{
"ok": False,
"error": f"Failed to initialize attachment in Outline: {create_response.get('error')}",
},
status_code=502,
)
attachment_data = create_response.get("data", {})
upload_url = attachment_data.get("uploadUrl")
if not upload_url:
return JSONResponse(
{"ok": False, "error": "Outline API did not return an uploadUrl"},
status_code=502,
)
# 3. Stream the request body directly to S3 upload_url
try:
# Pre-signed S3 PUT URLs usually enforce exact headers
headers = {
"Content-Type": content_type,
"Content-Length": str(size),
}
logger.info(f"Streaming file content to storage (size={size} bytes)")
async with httpx.AsyncClient() as client:
put_response = await client.put(
upload_url,
content=request.stream(),
headers=headers,
timeout=600.0,
)
put_response.raise_for_status()
logger.info("Upload to storage completed successfully")
# 4. Return success and attachment metadata
return JSONResponse(
{
"ok": True,
"data": {
"id": attachment_data.get("id"),
"name": attachment_data.get("name"),
"size": attachment_data.get("size"),
"contentType": attachment_data.get("contentType"),
"url": attachment_data.get("url"),
},
}
)
except Exception as e:
# Clean up orphaned attachment record in Outline to maintain consistency
attachment_id = attachment_data.get("id")
if attachment_id:
logger.warning(
f"Cleaning up orphaned attachment {attachment_id} in Outline due to S3 upload failure"
)
try:
await outline_client.call("attachments.delete", {"id": attachment_id})
except Exception as cleanup_err:
logger.error(
f"Failed to delete orphaned attachment {attachment_id}: {str(cleanup_err)}"
)
if isinstance(e, httpx.HTTPStatusError):
logger.error(
f"Storage upload HTTP error: {e.response.status_code} - {e.response.text}"
)
return JSONResponse(
{
"ok": False,
"error": f"Failed to upload attachment to storage: {str(e)}",
},
status_code=502,
)
else:
logger.error(f"Exception during attachment upload: {str(e)}")
return JSONResponse(
{"ok": False, "error": f"Upload stream failed: {str(e)}"},
status_code=500,
)
# --- Health Check Endpoint ---
async def health(request):
"""Health check endpoint for Docker/load balancer.
Returns 200 if server is running. API connectivity status is included
in response body but doesn't affect HTTP status code.
"""
response = {"status": "ok", "server": "running"}
if not OUTLINE_API_URL or not OUTLINE_API_TOKEN:
response["api"] = "not_configured"
response["detail"] = "Missing OUTLINE_API_URL or OUTLINE_API_TOKEN"
else:
healthy, message = await outline_client.check_health()
response["api"] = "connected" if healthy else "disconnected"
if not healthy:
response["detail"] = message
return JSONResponse(response)
# --- ASGI Application ---
def create_app():
"""Create the ASGI application with health check and MCP routes."""
mcp_app = mcp.http_app()
@contextlib.asynccontextmanager
async def lifespan(app: Starlette):
"""Custom ASGI lifespan manager that integrates with FastMCP's lifespan
and ensures the Outline HTTP client's connections are gracefully closed
on server shutdown.
"""
async with mcp_app.lifespan(app):
yield
logger.info("Closing Outline client connections on server shutdown")
await outline_client.close()
routes = [
Route("/health", health, methods=["GET"]),
Route("/upload", upload_endpoint, methods=["POST"]),
Mount("/", app=mcp_app),
]
return Starlette(routes=routes, lifespan=lifespan)
# Create the app instance for uvicorn
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)