Initial commit: Monarch MCP Custom SSE server

This commit is contained in:
Ben
2025-12-24 01:54:42 +00:00
commit 714897276b
20 changed files with 1933 additions and 0 deletions

View File

@@ -0,0 +1,200 @@
"""
Monarch Money MCP Server - Custom SSE Implementation.
"""
import os
import logging
import json
import asyncio
from typing import Optional, List, Dict, Any
from datetime import datetime
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
import uvicorn
from monarch_mcp_custom.auth import get_authenticated_client
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize FastMCP
mcp = FastMCP("Monarch Money Custom")
# --- Helpers ---
def serialize_json(data: Any) -> str:
"""Helper to serialize data to JSON safely."""
return json.dumps(data, indent=2, default=str)
# --- MCP Tools ---
@mcp.tool()
async def get_accounts() -> str:
"""Get all financial accounts from Monarch Money."""
try:
client = await get_authenticated_client()
accounts = await client.get_accounts()
account_list = []
for account in accounts.get("accounts", []):
account_info = {
"id": account.get("id"),
"name": account.get("displayName") or account.get("name"),
"type": (account.get("type") or {}).get("name"),
"balance": account.get("currentBalance"),
"institution": (account.get("institution") or {}).get("name"),
"is_active": not account.get("deactivatedAt"),
}
account_list.append(account_info)
return serialize_json(account_list)
except Exception as e:
logger.error(f"Failed to get accounts: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def get_transactions(
limit: int = 50,
offset: int = 0,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
account_id: Optional[str] = None,
) -> str:
"""
Get transactions from Monarch Money.
Dates should be in YYYY-MM-DD format.
"""
try:
client = await get_authenticated_client()
filters = {}
if start_date:
filters["start_date"] = start_date
if end_date:
filters["end_date"] = end_date
if account_id:
filters["account_id"] = account_id
transactions = await client.get_transactions(
limit=limit, offset=offset, **filters
)
results = transactions.get("allTransactions", {}).get("results", [])
formatted = []
for txn in results:
formatted.append(
{
"id": txn.get("id"),
"date": txn.get("date"),
"amount": txn.get("amount"),
"description": txn.get("description"),
"category": (txn.get("category") or {}).get("name"),
"account": (txn.get("account") or {}).get("displayName"),
"merchant": (txn.get("merchant") or {}).get("name"),
"is_pending": txn.get("isPending", False),
}
)
return serialize_json(formatted)
except Exception as e:
logger.error(f"Failed to get transactions: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def get_budgets() -> str:
"""Get current budget information."""
try:
client = await get_authenticated_client()
budgets = await client.get_budgets()
budget_list = []
for b in budgets.get("budgets", []):
budget_list.append(
{
"name": b.get("name"),
"amount": b.get("amount"),
"spent": b.get("spent"),
"remaining": b.get("remaining"),
"category": (b.get("category") or {}).get("name"),
}
)
return serialize_json(budget_list)
except Exception as e:
logger.error(f"Failed to get budgets: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def get_account_holdings(account_id: str) -> str:
"""Get investment holdings for a specific account."""
try:
client = await get_authenticated_client()
# Ensure account_id is treated correctly (usually string ID in Monarch)
holdings = await client.get_account_holdings(account_id)
return serialize_json(holdings)
except Exception as e:
logger.error(f"Failed to get holdings: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def refresh_accounts() -> str:
"""Request a refresh of account data from financial institutions."""
try:
client = await get_authenticated_client()
result = await client.request_accounts_refresh()
return serialize_json(result)
except Exception as e:
logger.error(f"Failed to refresh accounts: {e}")
return f"Error: {str(e)}"
# --- Health Check ---
async def health_check(request):
"""Simple health check endpoint."""
return JSONResponse({"status": "ok", "timestamp": datetime.now().isoformat()})
# --- ASGI App Setup ---
def create_app():
"""Create the Starlette application with MCP mounted at /mcp."""
mcp_app = mcp.http_app()
routes = [
Route("/health", health_check, methods=["GET"]),
Mount("/mcp", app=mcp_app),
]
return Starlette(routes=routes, lifespan=mcp_app.lifespan)
app = create_app()
def main():
"""Entry point for running the server."""
port = int(os.getenv("PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
main()