v0.2.0 — remove vendored fork, upstream login feature
Some checks failed
Build and Push Docker Image / build (push) Failing after 39s

- Delete vendor/schwab-scraper/ (now fetched at CI build time)
- Delete schwab_mcp_custom/ package (LoginManager moved into server.py)
- server.py: add inline LoginManager with env-configurable rate limits
- server.py: orchestrate login safety checks at MCP layer, not in scraper
- Dockerfile: restore vendor-based build with fresh upstream checkout
- pyproject.toml: bump mcp>=1.27.0, playwright>=1.54.0
This commit is contained in:
2026-04-28 00:36:46 +00:00
parent 2de3b709d8
commit 89bb29e563
5 changed files with 150 additions and 58 deletions

189
server.py
View File

@@ -1,17 +1,8 @@
import json
import logging
import os
import sys
from typing import Optional, Any
# Ensure local vendor and package modules are in path
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
vendor_path = os.path.join(project_root, "vendor", "schwab-scraper")
if vendor_path not in sys.path:
sys.path.insert(0, vendor_path)
import time
from typing import Optional, Any, Tuple
from fastmcp import FastMCP
from starlette.applications import Starlette
@@ -19,7 +10,6 @@ from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
import uvicorn
# Import the unified API from the schwab_scraper dependency
import schwab_scraper.unified_api as api
# ---------------------------------------------------------------------------
@@ -56,85 +46,169 @@ def _patch_request_responder():
_patch_request_responder()
# Initialize FastMCP
# ---------------------------------------------------------------------------
# Login safety manager — lives in the MCP server layer, not the scraper.
# Provides rate-limiting and backoff for automated login attempts.
# ---------------------------------------------------------------------------
class LoginManager:
"""Tracks login attempts and enforces safety limits to avoid account lockouts."""
def __init__(self):
self.max_attempts = int(os.getenv("SCHWAB_LOGIN_MAX_ATTEMPTS", "3"))
self.window_minutes = int(os.getenv("SCHWAB_LOGIN_WINDOW_MIN", "60"))
self.backoff_minutes = int(os.getenv("SCHWAB_LOGIN_BACKOFF_MIN", "30"))
self._attempts: list[tuple[float, bool]] = []
def _trim_window(self) -> None:
cutoff = time.time() - (self.window_minutes * 60)
self._attempts = [(ts, success) for ts, success in self._attempts if ts > cutoff]
def can_login(self) -> Tuple[bool, str]:
"""Return (allowed: bool, reason: str)."""
self._trim_window()
failure_count = sum(1 for _, success in self._attempts if not success)
if failure_count >= self.max_attempts:
# Compute remaining backoff from most recent failure
last_failure_ts = max(ts for ts, success in self._attempts if not success)
elapsed = time.time() - last_failure_ts
remaining = (self.backoff_minutes * 60) - elapsed
if remaining > 0:
return (
False,
f"Login blocked: {failure_count} failures in window. "
f"Wait {int(remaining / 60)}m {int(remaining % 60)}s.",
)
recent_count = len(self._attempts)
return True, f"Allowed ({recent_count} attempts in last {self.window_minutes}m)"
def record_attempt(self, success: bool) -> None:
self._trim_window()
self._attempts.append((time.time(), success))
def get_status(self) -> dict:
self._trim_window()
failure_count = sum(1 for _, success in self._attempts if not success)
recent_count = len(self._attempts)
if failure_count >= self.max_attempts:
last_failure_ts = max(ts for ts, success in self._attempts if not success)
elapsed = time.time() - last_failure_ts
remaining = (self.backoff_minutes * 60) - elapsed
blocked = remaining > 0
else:
remaining = 0
blocked = False
return {
"blocked": blocked,
"remaining_backoff_seconds": max(0, int(remaining)),
"recent_attempts": recent_count,
"recent_failures": failure_count,
"max_attempts_per_window": self.max_attempts,
"window_minutes": self.window_minutes,
"backoff_minutes": self.backoff_minutes,
}
login_manager = LoginManager()
mcp = FastMCP("SchwabScraper")
def serialize(obj: Any) -> str:
"""Safely serialize Pydantic models or datclasses to JSON string."""
"""Safely serialize Pydantic models or dataclasses to JSON string."""
if hasattr(obj, "model_dump_json"):
return obj.model_dump_json()
elif hasattr(obj, "model_dump"):
return json.dumps(obj.model_dump(), default=str)
elif isinstance(obj, list):
# Handle lists of models
return json.dumps([
o.model_dump() if hasattr(o, "model_dump") else o
o.model_dump() if hasattr(o, "model_dump") else o
for o in obj
], default=str)
return json.dumps(obj, default=str)
# ---------------------------------------------------------------------------
# MCP tools
# ---------------------------------------------------------------------------
@mcp.tool()
async def get_session_status(debug: bool = False) -> str:
"""Get the current session status of the Schwab scraper.
Args:
debug: Enable debug logging
"""
result = await api.get_session_status(debug=debug)
# Enrich with login safety status
if result.get("success"):
data = result.get("data", {})
data["login_safety"] = login_manager.get_status()
return serialize(result)
@mcp.tool()
async def get_login_safety_status() -> str:
"""Get the current login safety status, including any active backoffs or limits.
Useful to check if a login attempt is likely to be blocked.
"""
try:
result_str = await api.get_session_status()
result = json.loads(result_str)
if result.get("success") and "login_safety" in result.get("data", {}):
return json.dumps(result["data"]["login_safety"])
return json.dumps({"status": "unknown", "message": "Login safety info not available"})
except Exception as e:
return json.dumps({"error": str(e)})
return json.dumps(login_manager.get_status())
@mcp.tool()
async def login(username: Optional[str] = None, password: Optional[str] = None, debug: bool = False) -> str:
async def login(
username: Optional[str] = None, password: Optional[str] = None, debug: bool = False
) -> str:
"""Perform an automated login to Schwab to establish a new session.
Args:
username: Schwab username (optional, will use env if omitted)
password: Schwab password (optional, will use env if omitted)
username: Schwab username (optional, will use env/config if omitted)
password: Schwab password (optional, will use env/config if omitted)
debug: Enable debug logging
"""
allowed, reason = login_manager.can_login()
if not allowed:
return json.dumps({
"success": False,
"error": f"Login blocked by safety safeguards: {reason}",
"error_type": "AUTHENTICATION",
"retryable": False,
"data": None,
})
result = await api.login(username=username, password=password, debug=debug)
success = result.get("success", False)
login_manager.record_attempt(success)
return serialize(result)
@mcp.tool()
async def refresh_session(debug: bool = False) -> str:
"""Refresh the current Schwab session to prevent expiration.
Args:
debug: Enable debug logging
"""
result = await api.refresh_session(debug=debug)
return serialize(result)
@mcp.tool()
async def list_accounts(debug: bool = False) -> str:
"""List all Schwab accounts.
Args:
debug: Enable debug logging
"""
result = await api.list_accounts(debug=debug)
return serialize(result)
@mcp.tool()
async def get_account_overview(account: Optional[str] = None, debug: bool = False) -> str:
"""Get the overview for a specific account.
Args:
account: Account summary or ID (optional)
debug: Enable debug logging
@@ -142,28 +216,36 @@ async def get_account_overview(account: Optional[str] = None, debug: bool = Fals
result = await api.get_account_overview(account=account, debug=debug)
return serialize(result)
@mcp.tool()
async def get_positions(account: Optional[str] = None, include_non_equity: bool = False, debug: bool = False) -> str:
async def get_positions(
account: Optional[str] = None,
include_non_equity: bool = False,
debug: bool = False,
) -> str:
"""Get positions for a specific account.
Args:
account: Account summary or ID (optional)
include_non_equity: Whether to include non-equity positions
debug: Enable debug logging
"""
result = await api.get_positions(account=account, include_non_equity=include_non_equity, debug=debug)
result = await api.get_positions(
account=account, include_non_equity=include_non_equity, debug=debug
)
return serialize(result)
@mcp.tool()
async def get_transactions(
account: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
time_period: Optional[str] = None,
debug: bool = False
debug: bool = False,
) -> str:
"""Get transaction history.
Args:
account: Account ID (optional)
start_date: Start date for transactions (optional)
@@ -176,14 +258,15 @@ async def get_transactions(
start_date=start_date,
end_date=end_date,
time_period=time_period,
debug=debug
debug=debug,
)
return serialize(result)
@mcp.tool()
async def get_morningstar_data(ticker: str, debug: bool = False) -> str:
"""Get Morningstar data for a ticker.
Args:
ticker: Stock ticker symbol
debug: Enable debug logging
@@ -191,29 +274,29 @@ async def get_morningstar_data(ticker: str, debug: bool = False) -> str:
result = await api.get_morningstar_data(ticker, debug=debug)
return serialize(result)
@mcp.tool()
async def upload_cookies(cookies_json: str) -> str:
"""Upload session cookies to the server to assist with authentication.
Args:
cookies_json: JSON string of cookies exported from a browser (Playwright format)
"""
try:
# Validate JSON
cookies = json.loads(cookies_json)
# Write to cookies.json
with open("cookies.json", "w") as f:
json.dump(cookies, f)
return json.dumps({"status": "success", "message": "cookies.json updated successfully"})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
@mcp.tool()
async def api_call(endpoint: str, method: str = "GET", params: str = "{}") -> str:
"""Executes a raw API call to the Schwab service (Dummy implementation).
"""Executes a raw API call to the Schwab service (placeholder).
Refer to the 'api-reference' resource for available endpoints and parameters.
Args:
endpoint: The API path
method: HTTP method (GET, POST, etc.)
@@ -221,23 +304,29 @@ async def api_call(endpoint: str, method: str = "GET", params: str = "{}") -> st
"""
return json.dumps({"status": "not_implemented", "message": "API pass-through not supported for scraper"})
@mcp.resource("service://api-reference")
def get_api_docs() -> str:
"""Returns the API documentation for using the 'api_call' tool."""
return "Schwab Scraper MCP Server - Unified API Documentation\n\nThis server provides tools to interact with Schwab accounts via scraping. The 'api_call' tool is a placeholder."
return (
"Schwab Scraper MCP Server — Unified API Documentation\n\n"
"This server provides tools to interact with Schwab accounts via scraping.\n"
"The 'api_call' tool is a placeholder."
)
async def health(request):
"""Health check endpoint."""
return JSONResponse({"status": "ok"})
# Create the Starlette application
mcp_app = mcp.http_app()
app = Starlette(
routes=[
Route("/health", health),
Mount("/", app=mcp_app)
Mount("/", app=mcp_app),
],
lifespan=mcp_app.lifespan
lifespan=mcp_app.lifespan,
)
if __name__ == "__main__":