From a0e808a16fdbfd9d7ac418b9cdf3960febe015f7 Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 24 Dec 2025 01:57:38 +0000 Subject: [PATCH] Refactor SSE endpoints to /mcp and fix library type errors --- README.md | 2 +- src/monarch_mcp_custom/server.py | 32 ++++++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 01c95be..a7b5721 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ docker-compose up -d ## 🔌 Connection The server will be available at: -- **SSE Endpoint**: `http://localhost:8000/mcp/sse` +- **MCP Endpoint**: `http://localhost:8000/mcp` - **Health Check**: `http://localhost:8000/health` ## 🛠️ Tools Included diff --git a/src/monarch_mcp_custom/server.py b/src/monarch_mcp_custom/server.py index 16d04eb..2290e5d 100644 --- a/src/monarch_mcp_custom/server.py +++ b/src/monarch_mcp_custom/server.py @@ -13,7 +13,8 @@ from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse -from starlette.routing import Route, Mount +from starlette.routing import Route +from mcp.server.sse import SseServerTransport import uvicorn from monarch_mcp_custom.auth import get_authenticated_client @@ -144,8 +145,8 @@ async def get_account_holdings(account_id: str) -> str: """Get investment holdings for a specific account.""" try: client = await get_authenticated_client() - # Ensure account_id is treated correctly (usually string ID in Monarch) - holdings = await client.get_account_holdings(account_id) + # The library expects an int for account_id + holdings = await client.get_account_holdings(int(account_id)) return serialize_json(holdings) except Exception as e: logger.error(f"Failed to get holdings: {e}") @@ -157,7 +158,8 @@ async def refresh_accounts() -> str: """Request a refresh of account data from financial institutions.""" try: client = await get_authenticated_client() - result = await client.request_accounts_refresh() + # Request refresh for all accounts (empty list often means all in this library) + result = await client.request_accounts_refresh([]) return serialize_json(result) except Exception as e: logger.error(f"Failed to refresh accounts: {e}") @@ -176,15 +178,29 @@ async def health_check(request): def create_app(): - """Create the Starlette application with MCP mounted at /mcp.""" - mcp_app = mcp.http_app() + """Create the Starlette application with MCP at /mcp.""" + # SSE Transport following the "newer http-streamable" convention: + # Stream endpoint at /mcp, messages at /mcp/messages + sse = SseServerTransport("/mcp/messages") + + async def handle_sse(request): + async with sse.connect_sse(request.scope, request.receive, request._send) as ( + read_stream, + write_stream, + ): + await mcp.run( + read_stream, + write_stream, + mcp.create_initialization_options(), + ) routes = [ Route("/health", health_check, methods=["GET"]), - Mount("/mcp", app=mcp_app), + Route("/mcp", endpoint=handle_sse, methods=["GET"]), + Route("/mcp/messages", endpoint=sse.handle_post_message, methods=["POST"]), ] - return Starlette(routes=routes, lifespan=mcp_app.lifespan) + return Starlette(routes=routes, lifespan=mcp.lifespan) app = create_app()