Compare commits

...

10 Commits

Author SHA1 Message Date
cc1226defe fix(ci): use git -C instead of cd to avoid breaking working directory
All checks were successful
Build and Push Docker Image / build (push) Successful in 42s
The previous cd vendor/schwab-scraper caused the subsequent echo to write
mcp-server-commit.txt into the wrong path, which made the CI build fail.
2026-05-04 14:36:52 +00:00
4982b7d09f feat: log schwab-scraper and mcp-server commit SHAs at container startup
Some checks failed
Build and Push Docker Image / build (push) Failing after 39s
Bake commit SHAs into the Docker image via CI and log them on server
startup so it's easy to verify which version of schwab-scraper is running.
2026-05-04 14:31:01 +00:00
8c196b7f65 fix(server): repair login tool and harden upload_cookies
All checks were successful
Build and Push Docker Image / build (push) Successful in 38s
- login tool was calling api.login() which did not exist in unified_api,
  causing AttributeError on every invocation. Now calls login_to_schwab
  directly with proper credential fallback to config.json.
- upload_cookies hardcoded 'cookies.json' instead of get_cookies_path(),
  and did not handle wrapped export formats ({cookies: [...]}). Both fixed.
- Result envelopes now match the standard {success, data, error, error_type,
  retryable} shape used by other tools.
2026-04-28 04:15:18 +00:00
9f799ee264 feat(logging): trace credential source and config path in login tool
All checks were successful
Build and Push Docker Image / build (push) Successful in 39s
Add diagnostic logging to the MCP login tool handler:
- Log whether username/password were provided explicitly
- If falling back to config, log the resolved config path and whether it exists
- This complements upstream scraper v0.6.18 credential diagnostics

Bumps version to 0.2.1.
2026-04-28 02:52:09 +00:00
d28b9d32f6 test(option-a): point SCHWAB_PLAYWRIGHT_URL to CLI's browserless endpoint
All checks were successful
Build and Push Docker Image / build (push) Successful in 38s
Temporarily switch from the local schwab-browser sidecar to the
browserless endpoint used by the working CLI (browser.local.ben.io).
This tests whether /assert 403 is caused by browser environment drift.
2026-04-28 02:39:20 +00:00
f51e61b8d7 fix(logging): configure stderr logging + tee capture, add debug confirmation
All checks were successful
Build and Push Docker Image / build (push) Successful in 37s
- Set up logging.basicConfig() at module load so scraper logs reach stderr
  (visible in docker logs instead of silently dropped)
- Replace StringIO-only capture with TeeHandler that writes to BOTH stderr
  and the StringIO buffer, so logs remain visible in docker while also
  being returned in tool responses
- Add explicit 'LOGIN TOOL CALLED' and 'DEBUG MODE ENABLED' log lines
  at the start of the login tool so users can verify logging is active
2026-04-28 02:16:31 +00:00
1999392df7 fix(mcp): capture scraper logs and return them in tool responses
All checks were successful
Build and Push Docker Image / build (push) Successful in 38s
Scraper debug output goes to stderr which is invisible in MCP stdio mode.
Add capture_logs context manager that attaches a StringIO handler to the
schwab_scraper logger during tool execution, then includes captured logs
in the response envelope when debug=True or on failure.

Applied to login() and refresh_session() which are the critical paths
for authentication diagnostics.
2026-04-28 02:04:58 +00:00
0c23b0e261 fix(ci): use CRT_READ_ONLY for cross-repo clone
All checks were successful
Build and Push Docker Image / build (push) Successful in 41s
actions/checkout@v3's Basic auth header pattern fails with 403 when
accessing a different private repository. Switch to a plain git clone
with the CRT_READ_ONLY token embedded in the HTTPS URL.
2026-04-28 01:40:42 +00:00
89bb29e563 v0.2.0 — remove vendored fork, upstream login feature
Some checks failed
Build and Push Docker Image / build (push) Failing after 39s
- Delete vendor/schwab-scraper/ (now fetched at CI build time)
- Delete schwab_mcp_custom/ package (LoginManager moved into server.py)
- server.py: add inline LoginManager with env-configurable rate limits
- server.py: orchestrate login safety checks at MCP layer, not in scraper
- Dockerfile: restore vendor-based build with fresh upstream checkout
- pyproject.toml: bump mcp>=1.27.0, playwright>=1.54.0
2026-04-28 00:36:46 +00:00
2de3b709d8 feat: expose automated login and session refresh with safety status tool 2026-04-27 19:47:55 +00:00
7 changed files with 397 additions and 44 deletions

View File

@@ -15,12 +15,16 @@ jobs:
- name: Checkout
uses: actions/checkout@v3
- name: Checkout schwab-scraper
uses: actions/checkout@v3
with:
repository: b3nw/schwab-scraper
path: vendor/schwab-scraper
token: ${{ secrets.CR_PAT }}
- name: Clone schwab-scraper
env:
CLONE_TOKEN: ${{ secrets.CRT_READ_ONLY }}
run: |
mkdir -p vendor
git clone --depth=1 --branch main \
"https://x-access-token:${CLONE_TOKEN}@gitea.ext.ben.io/b3nw/schwab-scraper.git" \
vendor/schwab-scraper
git -C vendor/schwab-scraper rev-parse HEAD > vendor/schwab-scraper-commit.txt
echo "${{ gitea.sha }}" > vendor/mcp-server-commit.txt
- name: Login to Gitea Container Registry
uses: docker/login-action@v2

View File

@@ -4,11 +4,12 @@ ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy
WORKDIR /app
COPY pyproject.toml uv.lock ./
# Copy vendored schwab-scraper (checked out cleanly by CI) and pyproject.toml
COPY vendor/schwab-scraper /tmp/schwab-scraper
COPY pyproject.toml uv.lock ./
# Install schwab-scraper from vendored source, then all other deps.
# We strip the git dependency from pyproject.toml so uv doesn't try to fetch it.
# Install schwab-scraper from the clean build-time checkout, then remaining deps.
# We strip the git dependency line so uv doesn't try to fetch over the network.
RUN uv venv && \
uv pip install /tmp/schwab-scraper && \
sed -i '/schwab-scraper/d' pyproject.toml && \
@@ -16,11 +17,16 @@ RUN uv venv && \
uv pip install --upgrade playwright && \
rm -rf /tmp/schwab-scraper
COPY vendor/schwab-scraper-commit.txt /app/schwab-scraper-commit.txt
COPY vendor/mcp-server-commit.txt /app/mcp-server-commit.txt
COPY . .
FROM python:3.12-slim-bookworm
RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*
RUN apt-get update && \
apt-get install -y --no-install-recommends curl && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY --from=builder /app /app

View File

@@ -21,7 +21,7 @@ services:
memory: 128M
cpus: '0.1'
environment:
- SCHWAB_PLAYWRIGHT_URL=ws://schwab-browser:3000/playwright/chromium
- SCHWAB_PLAYWRIGHT_URL=ws://browser.local.ben.io:3000/playwright/chromium?timeout=300000
- PORT=8000
volumes:
- ./cookies.json:/app/cookies.json

View File

@@ -1,12 +1,12 @@
[project]
name = "schwab-mcp-custom"
version = "0.1.0"
description = "Hybrid MCP Light server for Schwab scraper"
version = "0.2.1"
description = "MCP server wrapping schwab-scraper"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"schwab-scraper @ git+ssh://gitea@git.local.ben.io/b3nw/schwab-scraper.git",
"mcp>=1.2.0",
"mcp>=1.27.0",
"fastmcp>=0.4.1",
"starlette>=0.41.0",
"uvicorn>=0.32.0",

381
server.py
View File

@@ -1,7 +1,11 @@
import io
import json
import logging
import os
from typing import Optional, Any
import sys
import time
from contextlib import contextmanager
from typing import Optional, Any, Tuple
from fastmcp import FastMCP
from starlette.applications import Starlette
@@ -9,9 +13,126 @@ from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
import uvicorn
# Import the unified API from the schwab_scraper dependency
import schwab_scraper.unified_api as api
# ---------------------------------------------------------------------------
# Configure logging so it actually reaches stderr (visible in docker logs).
# The scraper and MCP libraries log extensively but don't set up handlers
# when imported as a module, so messages are silently dropped.
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
# Ensure the scraper logger propagates to our root handler
_scraper_logger = logging.getLogger("schwab_scraper")
_scraper_logger.setLevel(logging.DEBUG if os.getenv("SCHWAB_DEBUG", "").lower() in ("1", "true") else logging.INFO)
_scraper_logger.propagate = True
_startup_logger = logging.getLogger("schwab_mcp_custom")
def _read_commit_file(path: str) -> str | None:
try:
with open(path) as f:
return f.read().strip() or None
except FileNotFoundError:
return None
_scraper_commit = _read_commit_file(
os.path.join(os.path.dirname(__file__), "schwab-scraper-commit.txt")
)
_mcp_commit = _read_commit_file(
os.path.join(os.path.dirname(__file__), "mcp-server-commit.txt")
)
if _scraper_commit:
_startup_logger.info("schwab-scraper commit: %s", _scraper_commit)
else:
_startup_logger.info("schwab-scraper commit: (not available)")
if _mcp_commit:
_startup_logger.info("mcp-server commit: %s", _mcp_commit)
else:
_startup_logger.info("mcp-server commit: (not available)")
try:
from importlib.metadata import version as _pkg_version
_startup_logger.info("schwab-scraper package version: %s", _pkg_version("schwab-scraper"))
except Exception:
_startup_logger.info("schwab-scraper package version: (unknown)")
# ---------------------------------------------------------------------------
# Log capture helper — captures scraper logs to a string buffer AND tees
# them to stderr so they remain visible in docker logs.
# ---------------------------------------------------------------------------
class _TeeHandler(logging.StreamHandler):
"""Handler that copies every record to a secondary (StringIO) buffer."""
def __init__(self, stream, extra_buf: io.StringIO, level=logging.NOTSET):
super().__init__(stream)
self.extra_buf = extra_buf
self.tee_level = level
def emit(self, record):
super().emit(record)
if record.levelno >= self.tee_level:
try:
msg = self.format(record)
self.extra_buf.write(msg + "\n")
self.extra_buf.flush()
except Exception:
pass
@contextmanager
def capture_logs(logger_name: str = "schwab_scraper", level: int = logging.DEBUG):
"""
Context manager that captures log output to a string buffer
while still writing to stderr (docker-visible).
Yields the buffer so callers can read captured logs after the block.
"""
logger = logging.getLogger(logger_name)
old_level = logger.level
if old_level > level:
logger.setLevel(level)
buf = io.StringIO()
handler = _TeeHandler(sys.stderr, buf, level=level)
handler.setLevel(level)
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
# Also tee the root logger in case scraper logs through sub-loggers
root_old_level = logging.getLogger().level
if root_old_level > level:
logging.getLogger().setLevel(level)
try:
yield buf
finally:
logger.removeHandler(handler)
if old_level != logger.level:
logger.setLevel(old_level)
if root_old_level != logging.getLogger().level:
logging.getLogger().setLevel(root_old_level)
def _enrich_with_logs(result: dict, log_buffer: io.StringIO, debug: bool) -> dict:
"""Attach captured logs to a result dict when debug=True or on error."""
logs = log_buffer.getvalue()
if logs and (debug or not result.get("success", False)):
result["logs"] = logs
return result
# ---------------------------------------------------------------------------
# Monkey-patch mcp.shared.session.RequestResponder to work around a
# cancellation race in mcp==1.27.0 (github.com/modelcontextprotocol/
@@ -46,17 +167,82 @@ def _patch_request_responder():
_patch_request_responder()
# Initialize FastMCP
# ---------------------------------------------------------------------------
# Login safety manager — lives in the MCP server layer, not the scraper.
# Provides rate-limiting and backoff for automated login attempts.
# ---------------------------------------------------------------------------
class LoginManager:
"""Tracks login attempts and enforces safety limits to avoid account lockouts."""
def __init__(self):
self.max_attempts = int(os.getenv("SCHWAB_LOGIN_MAX_ATTEMPTS", "3"))
self.window_minutes = int(os.getenv("SCHWAB_LOGIN_WINDOW_MIN", "60"))
self.backoff_minutes = int(os.getenv("SCHWAB_LOGIN_BACKOFF_MIN", "30"))
self._attempts: list[tuple[float, bool]] = []
def _trim_window(self) -> None:
cutoff = time.time() - (self.window_minutes * 60)
self._attempts = [(ts, success) for ts, success in self._attempts if ts > cutoff]
def can_login(self) -> Tuple[bool, str]:
"""Return (allowed: bool, reason: str)."""
self._trim_window()
failure_count = sum(1 for _, success in self._attempts if not success)
if failure_count >= self.max_attempts:
# Compute remaining backoff from most recent failure
last_failure_ts = max(ts for ts, success in self._attempts if not success)
elapsed = time.time() - last_failure_ts
remaining = (self.backoff_minutes * 60) - elapsed
if remaining > 0:
return (
False,
f"Login blocked: {failure_count} failures in window. "
f"Wait {int(remaining / 60)}m {int(remaining % 60)}s.",
)
recent_count = len(self._attempts)
return True, f"Allowed ({recent_count} attempts in last {self.window_minutes}m)"
def record_attempt(self, success: bool) -> None:
self._trim_window()
self._attempts.append((time.time(), success))
def get_status(self) -> dict:
self._trim_window()
failure_count = sum(1 for _, success in self._attempts if not success)
recent_count = len(self._attempts)
if failure_count >= self.max_attempts:
last_failure_ts = max(ts for ts, success in self._attempts if not success)
elapsed = time.time() - last_failure_ts
remaining = (self.backoff_minutes * 60) - elapsed
blocked = remaining > 0
else:
remaining = 0
blocked = False
return {
"blocked": blocked,
"remaining_backoff_seconds": max(0, int(remaining)),
"recent_attempts": recent_count,
"recent_failures": failure_count,
"max_attempts_per_window": self.max_attempts,
"window_minutes": self.window_minutes,
"backoff_minutes": self.backoff_minutes,
}
login_manager = LoginManager()
mcp = FastMCP("SchwabScraper")
def serialize(obj: Any) -> str:
"""Safely serialize Pydantic models or datclasses to JSON string."""
"""Safely serialize Pydantic models or dataclasses to JSON string."""
if hasattr(obj, "model_dump_json"):
return obj.model_dump_json()
elif hasattr(obj, "model_dump"):
return json.dumps(obj.model_dump(), default=str)
elif isinstance(obj, list):
# Handle lists of models
return json.dumps([
o.model_dump() if hasattr(o, "model_dump") else o
for o in obj
@@ -64,6 +250,9 @@ def serialize(obj: Any) -> str:
return json.dumps(obj, default=str)
# ---------------------------------------------------------------------------
# MCP tools
# ---------------------------------------------------------------------------
@mcp.tool()
async def get_session_status(debug: bool = False) -> str:
"""Get the current session status of the Schwab scraper.
@@ -72,8 +261,124 @@ async def get_session_status(debug: bool = False) -> str:
debug: Enable debug logging
"""
result = await api.get_session_status(debug=debug)
# Enrich with login safety status
if result.get("success"):
data = result.get("data", {})
data["login_safety"] = login_manager.get_status()
return serialize(result)
@mcp.tool()
async def get_login_safety_status() -> str:
"""Get the current login safety status, including any active backoffs or limits.
Useful to check if a login attempt is likely to be blocked.
"""
return json.dumps(login_manager.get_status())
@mcp.tool()
async def login(
username: Optional[str] = None, password: Optional[str] = None, debug: bool = False
) -> str:
"""Perform an automated login to Schwab to establish a new session.
Args:
username: Schwab username (optional, will use env/config if omitted)
password: Schwab password (optional, will use env/config if omitted)
debug: Enable debug logging
"""
allowed, reason = login_manager.can_login()
if not allowed:
return json.dumps({
"success": False,
"error": f"Login blocked by safety safeguards: {reason}",
"error_type": "AUTHENTICATION",
"retryable": False,
"data": None,
})
mcp_logger = logging.getLogger("schwab_mcp_custom")
mcp_logger.info("=== LOGIN TOOL CALLED ===")
mcp_logger.info(f"debug={debug}, username_provided={bool(username)}, password_provided={bool(password)}")
# Diagnostic: if credentials not provided, show what config path would be used
if not username or not password:
from schwab_scraper.core.config import get_config_path
config_path = get_config_path()
config_exists = os.path.exists(config_path)
mcp_logger.info(f"Config fallback: path={config_path}, exists={config_exists}")
with capture_logs(level=logging.DEBUG if debug else logging.INFO) as log_buf:
mcp_logger.info("capture_logs context entered")
if debug:
mcp_logger.info("DEBUG MODE ENABLED — verbose logging active")
# api.login does not exist in unified_api; call the underlying scraper directly
from schwab_scraper.browser.auth import login_to_schwab
from schwab_scraper.core.config import get_schwab_credentials, load_config
if not username or not password:
config = load_config()
username, password = get_schwab_credentials(config)
if not username or not password:
result = {
"success": False,
"error": "Username and password are required (or set in config.json)",
"error_type": "AUTHENTICATION",
"retryable": False,
"data": None,
}
else:
try:
cookies = await login_to_schwab(username, password)
if cookies:
result = {
"success": True,
"data": {"cookies_count": len(cookies)},
"error": None,
"error_type": None,
"retryable": False,
}
else:
result = {
"success": False,
"error": "Login failed — no cookies returned. Check credentials or 2FA status.",
"error_type": "AUTHENTICATION",
"retryable": True,
"data": None,
}
except Exception as exc:
result = {
"success": False,
"error": str(exc),
"error_type": "UNKNOWN",
"retryable": True,
"data": None,
}
success = result.get("success", False)
login_manager.record_attempt(success)
mcp_logger.info(f"login completed — success={success}")
result = _enrich_with_logs(result, log_buf, debug)
mcp_logger.info("capture_logs context exited, returning result")
return serialize(result)
@mcp.tool()
async def refresh_session(debug: bool = False) -> str:
"""Refresh the current Schwab session to prevent expiration.
Args:
debug: Enable debug logging
"""
with capture_logs(level=logging.DEBUG if debug else logging.INFO) as log_buf:
result = await api.refresh_session(debug=debug)
result = _enrich_with_logs(result, log_buf, debug)
return serialize(result)
@mcp.tool()
async def list_accounts(debug: bool = False) -> str:
"""List all Schwab accounts.
@@ -84,6 +389,7 @@ async def list_accounts(debug: bool = False) -> str:
result = await api.list_accounts(debug=debug)
return serialize(result)
@mcp.tool()
async def get_account_overview(account: Optional[str] = None, debug: bool = False) -> str:
"""Get the overview for a specific account.
@@ -95,8 +401,13 @@ async def get_account_overview(account: Optional[str] = None, debug: bool = Fals
result = await api.get_account_overview(account=account, debug=debug)
return serialize(result)
@mcp.tool()
async def get_positions(account: Optional[str] = None, include_non_equity: bool = False, debug: bool = False) -> str:
async def get_positions(
account: Optional[str] = None,
include_non_equity: bool = False,
debug: bool = False,
) -> str:
"""Get positions for a specific account.
Args:
@@ -104,16 +415,19 @@ async def get_positions(account: Optional[str] = None, include_non_equity: bool
include_non_equity: Whether to include non-equity positions
debug: Enable debug logging
"""
result = await api.get_positions(account=account, include_non_equity=include_non_equity, debug=debug)
result = await api.get_positions(
account=account, include_non_equity=include_non_equity, debug=debug
)
return serialize(result)
@mcp.tool()
async def get_transactions(
account: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
time_period: Optional[str] = None,
debug: bool = False
debug: bool = False,
) -> str:
"""Get transaction history.
@@ -129,10 +443,11 @@ async def get_transactions(
start_date=start_date,
end_date=end_date,
time_period=time_period,
debug=debug
debug=debug,
)
return serialize(result)
@mcp.tool()
async def get_morningstar_data(ticker: str, debug: bool = False) -> str:
"""Get Morningstar data for a ticker.
@@ -144,6 +459,7 @@ async def get_morningstar_data(ticker: str, debug: bool = False) -> str:
result = await api.get_morningstar_data(ticker, debug=debug)
return serialize(result)
@mcp.tool()
async def upload_cookies(cookies_json: str) -> str:
"""Upload session cookies to the server to assist with authentication.
@@ -152,18 +468,39 @@ async def upload_cookies(cookies_json: str) -> str:
cookies_json: JSON string of cookies exported from a browser (Playwright format)
"""
try:
# Validate JSON
cookies = json.loads(cookies_json)
# Write to cookies.json
with open("cookies.json", "w") as f:
json.dump(cookies, f)
return json.dumps({"status": "success", "message": "cookies.json updated successfully"})
# Some browser extensions wrap cookies in an object (e.g. {"cookies": [...]})
if isinstance(cookies, dict):
if "cookies" in cookies:
cookies = cookies["cookies"]
else:
return json.dumps({
"status": "error",
"message": "Expected a list of cookies or an object with a 'cookies' key",
})
if not isinstance(cookies, list):
return json.dumps({
"status": "error",
"message": f"Expected a list of cookies, got {type(cookies).__name__}",
})
from schwab_scraper.core.config import get_cookies_path
cookies_path = get_cookies_path()
with open(cookies_path, "w") as f:
json.dump(cookies, f, indent=2)
return json.dumps({
"status": "success",
"message": f"{cookies_path} updated with {len(cookies)} cookies",
})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
@mcp.tool()
async def api_call(endpoint: str, method: str = "GET", params: str = "{}") -> str:
"""Executes a raw API call to the Schwab service (Dummy implementation).
"""Executes a raw API call to the Schwab service (placeholder).
Refer to the 'api-reference' resource for available endpoints and parameters.
@@ -174,23 +511,29 @@ async def api_call(endpoint: str, method: str = "GET", params: str = "{}") -> st
"""
return json.dumps({"status": "not_implemented", "message": "API pass-through not supported for scraper"})
@mcp.resource("service://api-reference")
def get_api_docs() -> str:
"""Returns the API documentation for using the 'api_call' tool."""
return "Schwab Scraper MCP Server - Unified API Documentation\n\nThis server provides tools to interact with Schwab accounts via scraping. The 'api_call' tool is a placeholder."
return (
"Schwab Scraper MCP Server — Unified API Documentation\n\n"
"This server provides tools to interact with Schwab accounts via scraping.\n"
"The 'api_call' tool is a placeholder."
)
async def health(request):
"""Health check endpoint."""
return JSONResponse({"status": "ok"})
# Create the Starlette application
mcp_app = mcp.http_app()
app = Starlette(
routes=[
Route("/health", health),
Mount("/", app=mcp_app)
Mount("/", app=mcp_app),
],
lifespan=mcp_app.lifespan
lifespan=mcp_app.lifespan,
)
if __name__ == "__main__":

2
uv.lock generated
View File

@@ -1733,7 +1733,7 @@ requires-dist = [
{ name = "greenlet", specifier = ">=3.2.3" },
{ name = "mcp", specifier = ">=1.2.0" },
{ name = "pdfplumber", specifier = ">=0.11.4" },
{ name = "playwright", specifier = "==1.54.0" },
{ name = "playwright", specifier = ">=1.54.0" },
{ name = "pyee", specifier = ">=13.0.0" },
{ name = "schwab-scraper", git = "ssh://git.local.ben.io/b3nw/schwab-scraper.git" },
{ name = "starlette", specifier = ">=0.41.0" },