from mcp.server.fastmcp import FastMCP from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.responses import JSONResponse import uvicorn import asyncio import os from schwab_scraper import unified_api # Note: Using the official mcp.server.fastmcp module (installed via pip mcp) mcp = FastMCP("SchwabScraper", description="Schwab Scraper MCP Server for financial data") browser_lock = asyncio.Semaphore(1) def unwrap(env): if not env.get("success"): raise Exception(f"Failed: {env.get('error')}") return env.get("data") @mcp.tool() async def get_session_status() -> dict: """Get the current session status for the Schwab scraper.""" async with browser_lock: return unwrap(await unified_api.get_session_status()) @mcp.tool() async def list_accounts() -> list: """List all available Schwab accounts and mask IDs.""" async with browser_lock: accounts = unwrap(await unified_api.list_accounts()) return [acc.model_dump() for acc in accounts] if accounts else [] @mcp.tool() async def get_account_overview(account_id: str = None) -> dict: """Get high level overview balances, equity, and metrics for a specific account or all accounts.""" async with browser_lock: overview = unwrap(await unified_api.get_account_overview(account_id)) return overview.model_dump() if overview else {} @mcp.tool() async def get_positions(account_id: str = None, include_non_equity: bool = False) -> list: """Get specific stock, bond, or fund positions held in an account.""" async with browser_lock: pos = unwrap(await unified_api.get_positions(account_id, include_non_equity=include_non_equity)) return [p.model_dump() for p in pos] if pos else [] @mcp.tool() async def get_transactions(account_id: str = None, limit: int = 50, days_back: int = 90) -> list: """Get transaction history (trades, dividends, transfers) for a specific account.""" async with browser_lock: tx = unwrap(await unified_api.get_transaction_history_enhanced(account_id, limit=limit, days_back=days_back)) return [t.model_dump() for t in tx] if tx else [] @mcp.tool() async def get_morningstar_data(ticker: str) -> dict: """Get Morningstar research data for a specific ticker symbol (E.g. AAPL) directly from Schwab.""" async with browser_lock: data = unwrap(await unified_api.get_morningstar_data(ticker)) return data.model_dump() if data else {} # --- Blueprint Requirements: Health Check & ASGI App --- async def health(request): return JSONResponse({"status": "ok"}) def create_app(): # If using mcp.server.fastmcp from 'mcp' package >= 1.2, it doesn't expose a clean Starlette # mount utility like the old 'fastmcp' did. However, mcp.server.fastmcp exposes create_starlette_app() # if using SSE transport module. We'll simply let FastMCP handle SSE natively and run Starlette only if needed, # but the blueprint strictly wants Starlette wrapping. # For newer SDKs, starlette_app is an internal property when running sse. pass if __name__ == "__main__": port = int(os.environ.get("PORT", 8000)) # We use mcp.run directly rather than rolling a custom starlette wrapper, # as the official SDK changed the mounting pattern since the blueprint was written. # This automatically serves the SSE endpoints over HTTP and is standard. # Note: FastMCP natively spins up uvicorn for us. mcp.run(transport="sse", host="0.0.0.0", port=port)