""" Monarch Money MCP Server - Custom SSE Implementation. """ import os import logging import json import asyncio from typing import Optional, List, Dict, Any from datetime import datetime from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Route, Mount from mcp.server.sse import SseServerTransport import uvicorn from monarch_mcp_custom.auth import get_authenticated_client # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize FastMCP mcp = FastMCP("Monarch Money Custom") # --- Helpers --- def serialize_json(data: Any) -> str: """Helper to serialize data to JSON safely.""" return json.dumps(data, indent=2, default=str) # --- MCP Tools --- @mcp.tool() async def get_accounts() -> str: """Get all financial accounts from Monarch Money.""" try: client = await get_authenticated_client() accounts = await client.get_accounts() account_list = [] for account in accounts.get("accounts", []): account_info = { "id": account.get("id"), "name": account.get("displayName") or account.get("name"), "type": (account.get("type") or {}).get("name"), "balance": account.get("currentBalance"), "institution": (account.get("institution") or {}).get("name"), "is_active": not account.get("deactivatedAt"), } account_list.append(account_info) return serialize_json(account_list) except Exception as e: logger.error(f"Failed to get accounts: {e}") return f"Error: {str(e)}" @mcp.tool() async def get_transactions( limit: int = 50, offset: int = 0, start_date: Optional[str] = None, end_date: Optional[str] = None, account_id: Optional[str] = None, ) -> str: """ Get transactions from Monarch Money. Dates should be in YYYY-MM-DD format. """ try: client = await get_authenticated_client() filters = {} if start_date: filters["start_date"] = start_date if end_date: filters["end_date"] = end_date if account_id: filters["account_id"] = account_id transactions = await client.get_transactions( limit=limit, offset=offset, **filters ) results = transactions.get("allTransactions", {}).get("results", []) formatted = [] for txn in results: formatted.append( { "id": txn.get("id"), "date": txn.get("date"), "amount": txn.get("amount"), "description": txn.get("description"), "category": (txn.get("category") or {}).get("name"), "account": (txn.get("account") or {}).get("displayName"), "merchant": (txn.get("merchant") or {}).get("name"), "is_pending": txn.get("isPending", False), } ) return serialize_json(formatted) except Exception as e: logger.error(f"Failed to get transactions: {e}") return f"Error: {str(e)}" @mcp.tool() async def get_budgets() -> str: """Get current budget information.""" try: client = await get_authenticated_client() budgets = await client.get_budgets() budget_list = [] for b in budgets.get("budgets", []): budget_list.append( { "name": b.get("name"), "amount": b.get("amount"), "spent": b.get("spent"), "remaining": b.get("remaining"), "category": (b.get("category") or {}).get("name"), } ) return serialize_json(budget_list) except Exception as e: logger.error(f"Failed to get budgets: {e}") return f"Error: {str(e)}" @mcp.tool() async def get_account_holdings(account_id: str) -> str: """Get investment holdings for a specific account.""" try: client = await get_authenticated_client() # The library expects an int for account_id holdings = await client.get_account_holdings(int(account_id)) return serialize_json(holdings) except Exception as e: logger.error(f"Failed to get holdings: {e}") return f"Error: {str(e)}" @mcp.tool() async def refresh_accounts() -> str: """Request a refresh of account data from financial institutions.""" try: client = await get_authenticated_client() # Request refresh for all accounts (empty list often means all in this library) result = await client.request_accounts_refresh([]) return serialize_json(result) except Exception as e: logger.error(f"Failed to refresh accounts: {e}") return f"Error: {str(e)}" # --- Health Check --- async def health_check(request): """Simple health check endpoint.""" return JSONResponse({"status": "ok", "timestamp": datetime.now().isoformat()}) # --- ASGI App Setup --- def create_app(): """Create the Starlette application with MCP at /.""" mcp_app = mcp.http_app() # FastMCP.http_app() already includes its own health check and SSE routes. # By default it uses /sse for the stream and /messages for POSTs. return Starlette( routes=[ Route("/health", health_check, methods=["GET"]), Mount("/", app=mcp_app), ], lifespan=mcp_app.lifespan, ) app = create_app() def main(): """Entry point for running the server.""" port = int(os.getenv("PORT", 8000)) uvicorn.run(app, host="0.0.0.0", port=port) if __name__ == "__main__": main()