feat: add automatic re-authentication with MFA support
All checks were successful
Build and Push Monarch MCP Docker Image / build (push) Successful in 8s

Implement automatic token refresh using stored credentials and TOTP MFA secret. When an API call fails with a 401/unauthorized error, the system now transparently re-authenticates using MONARCH_EMAIL, MONARCH_PASSWORD, and MONARCH_MFA_SECRET, then retries the original request.

Changes:
- Add refresh_authentication() function in auth.py for credential-based login
- Create @retry_on_auth_error decorator to handle and retry failed auth calls
- Apply decorator to all MCP tools (get_accounts, get_transactions, etc.)
- Add MONARCH_MFA_SECRET to .env.example with documentation
- Update login_setup.py to instruct users about required env vars
- Replace PROBLEM.md with PLAN.md documenting the implementation
This commit is contained in:
Ben
2025-12-24 15:45:43 +00:00
parent 27ef7f0e1e
commit 6fc09d956f
6 changed files with 153 additions and 65 deletions

View File

@@ -6,10 +6,14 @@ Prioritizes environment variables for Docker compatibility.
import os
import logging
from typing import Optional
from monarchmoney import MonarchMoney
from functools import wraps
from monarchmoney import MonarchMoney, LoginFailedException, RequestFailedException
import pyotp
logger = logging.getLogger(__name__)
_client_instance: Optional[MonarchMoney] = None
def load_token() -> Optional[str]:
"""
@@ -43,28 +47,93 @@ async def get_authenticated_client() -> MonarchMoney:
Returns an authenticated MonarchMoney client.
Raises RuntimeError if no authentication is found.
"""
global _client_instance
if _client_instance:
return _client_instance
token = load_token()
if token:
try:
# The monarchmoney library supports passing the token directly
return MonarchMoney(token=token)
_client_instance = MonarchMoney(token=token)
return _client_instance
except Exception as e:
logger.error(f"❌ Failed to initialize MonarchMoney with token: {e}")
raise
# Fallback to email/password if token is missing (only if both are present)
email = os.getenv("MONARCH_EMAIL")
password = os.getenv("MONARCH_PASSWORD")
if email and password:
try:
mm = MonarchMoney()
await mm.login(email, password)
logger.info("✅ Logged in using email/password credentials")
return mm
except Exception as e:
logger.error(f"❌ Login failed: {e}")
raise
raise RuntimeError(
"🔐 Authentication required. Please provide MONARCH_TOKEN or run login_setup.py"
)
async def refresh_authentication() -> MonarchMoney:
"""
Re-authenticate using stored credentials and MFA secret.
Returns a new authenticated client instance.
"""
global _client_instance
email = os.getenv("MONARCH_EMAIL")
password = os.getenv("MONARCH_PASSWORD")
mfa_secret = os.getenv("MONARCH_MFA_SECRET")
if not email or not password:
raise RuntimeError(
"MONARCH_EMAIL and MONARCH_PASSWORD are required for re-authentication"
)
mm = MonarchMoney()
try:
await mm.login(email, password, mfa_secret_key=mfa_secret, save_session=False)
logger.info("✅ Re-authentication successful")
_client_instance = mm
return mm
except Exception as e:
logger.error(f"❌ Re-authentication failed: {e}")
raise
def generate_totp_secret_and_uri(email: str, issuer_name: str) -> tuple[str, str]:
"""
Generates a TOTP secret and its provisioning URI for a given email and issuer.
"""
secret = pyotp.random_base32()
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(name=email, issuer_name=issuer_name)
return secret, provisioning_uri
def retry_on_auth_error(max_retries: int = 1):
"""
Decorator to retry functions on authentication failures.
Catches RequestFailedException and re-authenticates before retrying.
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except (RequestFailedException, LoginFailedException, Exception) as e:
error_str = str(e).lower()
is_auth_error = (
"401" in error_str
or "unauthorized" in error_str
or "authentication" in error_str
or isinstance(e, (RequestFailedException, LoginFailedException))
)
if is_auth_error and attempt < max_retries:
logger.warning(
f"⚠️ Authentication failure detected (attempt {attempt + 1}/{max_retries + 1}), re-authenticating..."
)
await refresh_authentication()
continue
else:
raise
return wrapper
return decorator

View File

@@ -2,7 +2,6 @@
Monarch Money MCP Server - Custom SSE Implementation.
"""
import os
import logging
import json
from typing import Optional, Any
@@ -13,7 +12,7 @@ from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
from monarch_mcp_custom.auth import get_authenticated_client
from monarch_mcp_custom.auth import get_authenticated_client, retry_on_auth_error
# Load environment variables
load_dotenv()
@@ -39,6 +38,7 @@ def serialize_json(data: Any) -> str:
@mcp.tool()
@retry_on_auth_error()
async def get_accounts(reason: Optional[str] = None) -> str:
"""Get all financial accounts from Monarch Money."""
try:
@@ -64,6 +64,7 @@ async def get_accounts(reason: Optional[str] = None) -> str:
@mcp.tool()
@retry_on_auth_error()
async def get_transactions(
limit: int = 50,
offset: int = 0,
@@ -112,6 +113,7 @@ async def get_transactions(
@mcp.tool()
@retry_on_auth_error()
async def get_budgets(reason: Optional[str] = None) -> str:
"""Get current budget information."""
try:
@@ -137,6 +139,7 @@ async def get_budgets(reason: Optional[str] = None) -> str:
@mcp.tool()
@retry_on_auth_error()
async def get_account_holdings(account_id: str, reason: Optional[str] = None) -> str:
"""Get investment holdings for a specific account."""
try:
@@ -150,6 +153,7 @@ async def get_account_holdings(account_id: str, reason: Optional[str] = None) ->
@mcp.tool()
@retry_on_auth_error()
async def refresh_accounts(reason: Optional[str] = None) -> str:
"""Request a refresh of account data from financial institutions."""
try: