All checks were successful
Build and Push Docker Image / build (push) Successful in 34s
80 lines
3.5 KiB
Python
80 lines
3.5 KiB
Python
from mcp.server.fastmcp import FastMCP
|
|
from starlette.applications import Starlette
|
|
from starlette.routing import Route, Mount
|
|
from starlette.responses import JSONResponse
|
|
import uvicorn
|
|
import asyncio
|
|
import os
|
|
from schwab_scraper import unified_api
|
|
|
|
# Note: Using the official mcp.server.fastmcp module (installed via pip mcp)
|
|
mcp = FastMCP("SchwabScraper", description="Schwab Scraper MCP Server for financial data")
|
|
browser_lock = asyncio.Semaphore(1)
|
|
|
|
def unwrap(env):
|
|
if not env.get("success"):
|
|
raise Exception(f"Failed: {env.get('error')}")
|
|
return env.get("data")
|
|
|
|
@mcp.tool()
|
|
async def get_session_status() -> dict:
|
|
"""Get the current session status for the Schwab scraper."""
|
|
async with browser_lock:
|
|
return unwrap(await unified_api.get_session_status())
|
|
|
|
@mcp.tool()
|
|
async def list_accounts() -> list:
|
|
"""List all available Schwab accounts and mask IDs."""
|
|
async with browser_lock:
|
|
accounts = unwrap(await unified_api.list_accounts())
|
|
return [acc.model_dump() for acc in accounts] if accounts else []
|
|
|
|
@mcp.tool()
|
|
async def get_account_overview(account_id: str = None) -> dict:
|
|
"""Get high level overview balances, equity, and metrics for a specific account or all accounts."""
|
|
async with browser_lock:
|
|
overview = unwrap(await unified_api.get_account_overview(account_id))
|
|
return overview.model_dump() if overview else {}
|
|
|
|
@mcp.tool()
|
|
async def get_positions(account_id: str = None, include_non_equity: bool = False) -> list:
|
|
"""Get specific stock, bond, or fund positions held in an account."""
|
|
async with browser_lock:
|
|
pos = unwrap(await unified_api.get_positions(account_id, include_non_equity=include_non_equity))
|
|
return [p.model_dump() for p in pos] if pos else []
|
|
|
|
@mcp.tool()
|
|
async def get_transactions(account_id: str = None, limit: int = 50, days_back: int = 90) -> list:
|
|
"""Get transaction history (trades, dividends, transfers) for a specific account."""
|
|
async with browser_lock:
|
|
tx = unwrap(await unified_api.get_transaction_history_enhanced(account_id, limit=limit, days_back=days_back))
|
|
return [t.model_dump() for t in tx] if tx else []
|
|
|
|
@mcp.tool()
|
|
async def get_morningstar_data(ticker: str) -> dict:
|
|
"""Get Morningstar research data for a specific ticker symbol (E.g. AAPL) directly from Schwab."""
|
|
async with browser_lock:
|
|
data = unwrap(await unified_api.get_morningstar_data(ticker))
|
|
return data.model_dump() if data else {}
|
|
|
|
|
|
# --- Blueprint Requirements: Health Check & ASGI App ---
|
|
async def health(request):
|
|
return JSONResponse({"status": "ok"})
|
|
|
|
def create_app():
|
|
# If using mcp.server.fastmcp from 'mcp' package >= 1.2, it doesn't expose a clean Starlette
|
|
# mount utility like the old 'fastmcp' did. However, mcp.server.fastmcp exposes create_starlette_app()
|
|
# if using SSE transport module. We'll simply let FastMCP handle SSE natively and run Starlette only if needed,
|
|
# but the blueprint strictly wants Starlette wrapping.
|
|
# For newer SDKs, starlette_app is an internal property when running sse.
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
port = int(os.environ.get("PORT", 8000))
|
|
# We use mcp.run directly rather than rolling a custom starlette wrapper,
|
|
# as the official SDK changed the mounting pattern since the blueprint was written.
|
|
# This automatically serves the SSE endpoints over HTTP and is standard.
|
|
# Note: FastMCP natively spins up uvicorn for us.
|
|
mcp.run(transport="sse", host="0.0.0.0", port=port)
|