Files
monarch-mcp-custom/src/monarch_mcp_custom/server.py
b3nw 44b2f553d6
All checks were successful
Build and Push Monarch MCP Docker Image / build (push) Successful in 20s
debug: log URL before API call
2026-05-05 02:59:31 +00:00

353 lines
12 KiB
Python

"""
Monarch Money MCP Server - Custom SSE Implementation.
"""
import logging
import json
import os
from typing import Optional, Any
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
from monarch_mcp_custom.auth import get_authenticated_client, retry_on_auth_error
# Load environment variables
load_dotenv()
# Configure logging with LOG_LEVEL from environment
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def run_startup_diagnostics():
"""Print startup diagnostics for debugging."""
import monarchmoney
import monarchmoney.monarchmoney as mm_lib
from monarchmoney import MonarchMoneyEndpoints
print("\n" + "=" * 50)
print("Monarch MCP Server - Startup Diagnostics")
print("=" * 50)
print(f"\n📦 Library Version: monarchmoney {monarchmoney.__version__}")
print(f"📍 Package Location: {monarchmoney.__file__}")
print(f"\n🔗 API Endpoints:")
print(f" GraphQL: {MonarchMoneyEndpoints.getGraphQL()}")
print(f" Login: {MonarchMoneyEndpoints.getLoginEndpoint()}")
# Verify the actual BASE_URL in the source
print(f"\n🔍 BASE_URL from source: {mm_lib.MonarchMoneyEndpoints.BASE_URL}")
print(f"\n🔐 Environment Variables:")
env_vars = ['MONARCH_TOKEN', 'MONARCH_EMAIL', 'MONARCH_PASSWORD', 'MONARCH_MFA_SECRET']
for var in env_vars:
value = os.getenv(var)
if value:
masked = value[:8] + "..." if len(value) > 8 else "***"
print(f" {var}: ✓ ({masked})")
else:
print(f" {var}: ✗ (not set or empty)")
print("\n" + "=" * 50 + "\n")
# Run startup diagnostics
run_startup_diagnostics()
# Initialize FastMCP
mcp = FastMCP("Monarch Money Custom")
# --- Helpers ---
def serialize_json(data: Any) -> str:
"""Helper to serialize data to JSON safely."""
return json.dumps(data, indent=2, default=str)
# --- MCP Tools ---
@mcp.tool()
@retry_on_auth_error()
async def get_accounts(reason: Optional[str] = None) -> str:
"""Get all financial accounts from Monarch Money."""
client = await get_authenticated_client()
accounts = await client.get_accounts()
account_list = []
for account in accounts.get("accounts", []):
account_info = {
"id": account.get("id"),
"name": account.get("displayName") or account.get("name"),
"type": (account.get("type") or {}).get("name"),
"balance": account.get("currentBalance"),
"institution": (account.get("institution") or {}).get("name"),
"is_active": not account.get("deactivatedAt"),
}
account_list.append(account_info)
return serialize_json(account_list)
@mcp.tool()
@retry_on_auth_error()
async def get_transactions(
limit: int = 50,
offset: int = 0,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
account_id: Optional[str] = None,
) -> str:
"""
Get transactions from Monarch Money.
Dates should be in YYYY-MM-DD format.
"""
client = await get_authenticated_client()
filters = {}
if start_date:
filters["start_date"] = start_date
if end_date:
filters["end_date"] = end_date
if account_id:
filters["account_id"] = account_id
transactions = await client.get_transactions(limit=limit, offset=offset, **filters)
results = transactions.get("allTransactions", {}).get("results", [])
formatted = []
for txn in results:
formatted.append(
{
"id": txn.get("id"),
"date": txn.get("date"),
"amount": txn.get("amount"),
"description": txn.get("description"),
"category": (txn.get("category") or {}).get("name"),
"account": (txn.get("account") or {}).get("displayName"),
"merchant": (txn.get("merchant") or {}).get("name"),
"is_pending": txn.get("isPending", False),
}
)
return serialize_json(formatted)
@mcp.tool()
@retry_on_auth_error()
async def get_budgets(reason: Optional[str] = None) -> str:
"""Get current budget information."""
client = await get_authenticated_client()
budgets = await client.get_budgets()
# Build a category lookup from categoryGroups
category_lookup = {}
for group in budgets.get("categoryGroups", []):
for cat in group.get("categories", []):
category_lookup[cat.get("id")] = {
"name": cat.get("name"),
"group": group.get("name"),
"variability": cat.get("budgetVariability"),
}
# Process monthly amounts by category
budget_list = []
for item in budgets.get("budgetData", {}).get("monthlyAmountsByCategory", []):
cat_id = (item.get("category") or {}).get("id")
cat_info = category_lookup.get(cat_id, {})
for monthly in item.get("monthlyAmounts", []):
budget_list.append(
{
"month": monthly.get("month"),
"category": cat_info.get("name"),
"group": cat_info.get("group"),
"planned": monthly.get("plannedCashFlowAmount"),
"actual": monthly.get("actualAmount"),
"remaining": monthly.get("remainingAmount"),
"rollover": monthly.get("previousMonthRolloverAmount"),
}
)
# Also include monthly totals summary
totals = []
for total in budgets.get("budgetData", {}).get("totalsByMonth", []):
totals.append(
{
"month": total.get("month"),
"income_planned": (total.get("totalIncome") or {}).get("plannedAmount"),
"income_actual": (total.get("totalIncome") or {}).get("actualAmount"),
"expenses_planned": (total.get("totalExpenses") or {}).get(
"plannedAmount"
),
"expenses_actual": (total.get("totalExpenses") or {}).get(
"actualAmount"
),
}
)
return serialize_json({"budgets": budget_list, "totals": totals})
def validate_account_id(account_id: str) -> int:
"""Validate and convert account_id to integer."""
if not account_id or not account_id.strip():
raise ValueError("account_id must be provided and cannot be empty")
try:
return int(account_id.strip())
except (ValueError, TypeError):
raise ValueError("account_id must be a valid integer")
@mcp.tool()
@retry_on_auth_error()
async def get_account_holdings(account_id: str, reason: Optional[str] = None) -> str:
"""Get investment holdings for a specific account."""
validated_id = validate_account_id(account_id)
client = await get_authenticated_client()
holdings = await client.get_account_holdings(validated_id)
return serialize_json(holdings)
@mcp.tool()
@retry_on_auth_error()
async def refresh_accounts(reason: Optional[str] = None) -> str:
"""Request a refresh of account data from financial institutions."""
client = await get_authenticated_client()
# Request refresh for all accounts (empty list often means all in this library)
result = await client.request_accounts_refresh([])
return serialize_json(result)
# --- API Reference Resource ---
API_REFERENCE = """
# Monarch Money API Reference
Full method signatures: https://github.com/hammem/monarchmoney
## Transaction Operations
- get_transactions(limit=100, offset=0, start_date=None, end_date=None, search='', category_ids=[], account_ids=[])
- get_transaction_splits(transaction_id: str)
- update_transaction_splits(transaction_id: str, split_data: List[Dict])
- create_transaction(date, account_id, amount, merchant_name, category_id, notes='', update_balance=False)
- update_transaction(transaction_id, category_id=None, merchant_name=None, amount=None, date=None, notes=None)
- delete_transaction(transaction_id: str)
- get_transaction_details(transaction_id: str)
## Categories & Tags
- get_transaction_categories()
- get_transaction_category_groups()
- create_transaction_category(group_id, transaction_category_name, ...)
- create_transaction_tag(name, color)
- set_transaction_tags(transaction_id, tag_ids)
- get_transaction_tags()
## Accounts
- get_accounts()
- get_account_holdings(account_id: int)
- create_manual_account(account_type, account_sub_type, is_in_net_worth, account_name, account_balance=0)
- update_account(account_id, account_name=None, account_balance=None, ...)
## Budget & Cashflow
- get_budgets(start_date=None, end_date=None)
- get_cashflow(limit=100, start_date=None, end_date=None)
- get_recurring_transactions(start_date=None, end_date=None)
## Sync
- request_accounts_refresh(account_ids: List[str])
- is_accounts_refresh_complete(account_ids=None)
"""
@mcp.resource("monarch://api-reference")
def get_api_docs() -> str:
"""Returns the API documentation for using the 'api_call' tool."""
return API_REFERENCE
# --- Pass-through Tool ---
@mcp.tool()
@retry_on_auth_error()
async def api_call(method: str, params: str = "{}") -> str:
"""
Execute a raw method call to the Monarch Money API.
Args:
method: The method name to call (e.g., 'get_transaction_splits', 'update_transaction_splits')
params: JSON string of parameters to pass to the method
Returns:
JSON result from the API call
Example:
api_call(method="get_transaction_categories", params="{}")
api_call(method="get_transaction_splits", params='{"transaction_id": "12345"}')
api_call(method="update_transaction_splits", params='{"transaction_id": "12345", "split_data": [{"amount": -50.00, "categoryId": "100", "merchantName": "Groceries"}]}')
"""
client = await get_authenticated_client()
method_func = getattr(client, method, None)
if not method_func:
return serialize_json({"error": f"Method '{method}' not found"})
try:
parsed_params = json.loads(params) if params else {}
except json.JSONDecodeError as e:
return serialize_json({"error": f"Invalid JSON in params: {e}"})
# Debug: log what URL the client will use
from monarchmoney import MonarchMoneyEndpoints
logger.info(f"api_call: method={method}, GraphQL URL={MonarchMoneyEndpoints.getGraphQL()}")
try:
result = await method_func(**parsed_params)
return serialize_json(result)
except Exception as e:
logger.error(f"api_call failed for method={method}: {e}")
return serialize_json({"error": str(e), "method": method})
# --- Health Check Endpoint ---
async def health(request):
"""Health check endpoint for Docker."""
return JSONResponse({"status": "ok"})
# --- ASGI Application ---
def create_app():
"""Create the ASGI application with health check and MCP routes."""
mcp_app = mcp.http_app()
# Wrapper app: /health is standalone, everything else goes to MCP
# IMPORTANT: Must pass mcp_app.lifespan for task group initialization
routes = [
Route("/health", health, methods=["GET"]),
Mount("/", app=mcp_app), # MCP handles /mcp endpoint
]
return Starlette(routes=routes, lifespan=mcp_app.lifespan)
# Create the app instance for uvicorn
app = create_app()
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
uvicorn.run(app, host="0.0.0.0", port=port)