Files
schwab-mcp-custom/schwab_scraper/features/accounts_positions/positions_scraper.py
b3nw a05ba3b8a8
All checks were successful
Build and Push Docker Image / build (push) Successful in 36s
fix(positions): sync latest scraper fixes from main repository
2026-04-24 21:34:38 +00:00

663 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import re
from decimal import Decimal, InvalidOperation
from typing import Any, Optional, Sequence
from ...browser.auth import ensure_cookies
from ...browser.client import connect, new_context, new_page
from ...browser.navigation import goto_with_auth_check
from ...core import AccountSummary, Envelope, ErrorType, Lot, Position, fail, ok
from ...core.config import get_playwright_url, load_config
from ...utils.logging import save_debug_artifact
POSITIONS_URL = "https://client.schwab.com/app/accounts/positions/#/"
def _parse_decimal(value: str | None) -> Optional[Decimal]:
if not value:
return None
cleaned = value.strip()
if not cleaned or cleaned in {"-", "--"}:
return None
negative = False
if cleaned.startswith("(") and cleaned.endswith(")"):
negative = True
cleaned = (
cleaned.replace("$", "")
.replace(",", "")
.replace("(", "")
.replace(")", "")
.replace("", "-")
.replace("%", "")
.strip()
)
if not cleaned:
return None
try:
parsed = Decimal(cleaned)
if negative or parsed < 0:
parsed = -abs(parsed)
return parsed
except InvalidOperation:
return None
def _parse_float(value: str | None) -> Optional[float]:
decimal_value = _parse_decimal(value)
if decimal_value is None:
return None
try:
return float(decimal_value)
except (ValueError, InvalidOperation):
return None
def _normalize_account_label(label: str) -> AccountSummary:
normalized = re.sub(r"\s+", " ", label).strip()
last4_match = re.search(r"(\d{3,4})\b", normalized.replace(" ", ""))
last4 = last4_match.group(1)[-4:] if last4_match else None
type_match = re.search(r"^[A-Za-z&'\- ]+", normalized)
account_type = re.sub(r"\s+", "_", type_match.group(0).strip()) if type_match else "Account"
account_id = f"{account_type}-{last4}" if last4 else account_type
return AccountSummary(
id=account_id,
label=normalized,
type=account_type,
last4=last4,
is_margin="margin" in normalized.lower(),
)
def _match_account(candidate: AccountSummary, requested: AccountSummary | str | None) -> bool:
if requested is None:
return True
if isinstance(requested, AccountSummary):
requested_values = {
requested.id.lower(),
requested.label.lower(),
}
if requested.last4:
requested_values.add(requested.last4.lower())
else:
lookup = requested.strip().lower()
requested_values = {lookup}
candidate_values = {candidate.id.lower(), candidate.label.lower()}
if candidate.last4:
candidate_values.add(candidate.last4.lower())
return bool(candidate_values & requested_values)
def classify_asset(symbol: str | None, description: str | None) -> str:
if symbol:
sym = symbol.strip().upper()
else:
sym = ""
desc = (description or "").strip().upper()
if sym and re.fullmatch(r"[A-Z]{1,5}", sym):
if "ETF" in desc:
return "ETF"
if any(kw in desc for kw in ["FUND", "MUTUAL"]):
return "MUTUAL_FUND"
return "EQUITY"
if sym and re.search(r"\d", sym) and len(sym) > 5:
return "OPTION"
if any(kw in desc for kw in ["BOND", "CD", "TREASURY"]):
return "BOND"
if sym in {"CASH", "MMDA", "SWEEP"} or "CASH" in desc:
return "CASH"
if "ETF" in desc:
return "ETF"
if "FUND" in desc:
return "MUTUAL_FUND"
return "OTHER"
async def _evaluate_table(page) -> dict[str, Any] | None:
return await page.evaluate(
"""
() => {
const table = document.querySelector('#positionsDetails');
if (!table) {
return null;
}
const headers = Array.from(table.querySelectorAll('thead tr th')).map((th) => {
const btn = th.querySelector('button, .sdps-tables__header-text');
if (btn) {
return (btn.innerText || btn.textContent || '').trim();
}
return (th.innerText || th.textContent || '').trim();
});
const rowElements = Array.from(table.querySelectorAll('tbody tr'));
const rows = [];
let current = null;
let currentAccount = null;
const isLotRow = (row) => {
const klass = (row.className || '').toLowerCase();
const tagName = (row.tagName || '').toLowerCase();
return klass.includes('lot') || klass.includes('child') || tagName.includes('app-lot');
};
const isPositionRow = (row) => {
const klass = (row.className || '').toLowerCase();
return klass.includes('position-row');
};
const isAccountHeader = (row) => {
const klass = (row.className || '').toLowerCase();
const text = (row.textContent || '').trim();
return !klass.includes('position-row') &&
(klass.includes('highlight-row') || klass.includes('border-top-dark')) &&
text.includes('account panel');
};
for (const row of rowElements) {
// Check if this is an account header row
if (isAccountHeader(row)) {
const text = row.textContent.trim();
// Extract account name from account panel text
const match = text.match(/account panel[\\s\\n]+([^\\n]+)/);
if (match) {
currentAccount = match[1].trim();
}
continue;
}
const cells = Array.from(row.querySelectorAll('td')).map((cell) => {
// 1. Try to find a title attribute on a span (often has more precise value)
const titledSpan = cell.querySelector('span[title]');
if (titledSpan && titledSpan.getAttribute('title').trim().length > 0) {
const title = titledSpan.getAttribute('title').trim();
if (title.includes('$') || /^[+-]?[\\d,.]+$/.test(title) || title.includes('%')) {
return title;
}
}
// 2. Try to find text directly or within common button/link wrappers
const btn = cell.querySelector('button, a, .sdps-button');
if (btn) {
// Check button title too
if (btn.hasAttribute('title') && btn.getAttribute('title').trim().length > 0) {
return btn.getAttribute('title').trim();
}
// Ignore some internal elements like superscripts if present
const clone = btn.cloneNode(true);
clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove());
return (clone.innerText || clone.textContent || '').trim();
}
// 3. Just clean up the cell text
const clone = cell.cloneNode(true);
clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove());
return (clone.innerText || clone.textContent || '').trim();
});
if (!cells.length) {
continue;
}
if (isLotRow(row)) {
if (current) {
// For lots, we typically skip the first two columns (empty/checkbox)
current.lots.push(cells.slice(2));
}
} else if (isPositionRow(row)) {
// Extract symbol from data-symbol attribute
const symbol = row.getAttribute('data-symbol') || '';
current = {
type: 'position',
cells: cells,
lots: [],
symbol: symbol,
account: currentAccount
};
rows.push(current);
}
}
return { headers, rows };
}
"""
)
def _map_row(headers: Sequence[str], cells: Sequence[str]) -> dict[str, str]:
result: dict[str, str] = {}
# Filter out empty headers to get the list of "real" data columns
data_headers = []
for h in headers:
# Replace non-breaking spaces and other special whitespace with regular spaces
h_clean = h.replace('\u00a0', ' ').replace('\u200b', '').strip()
name = h_clean.split('\n')[0].strip().lower()
if name:
data_headers.append(name)
else:
data_headers.append(f"empty_{len(data_headers)}")
# We skip headers that definitely don't have cells (checkbox, symbol is usually in data-symbol)
# Looking at debug output, 'description' is the first cell.
# So we find where 'description' or 'name' is in our data_headers.
start_idx = -1
for i, h in enumerate(data_headers):
if h in {'description', 'name'}:
start_idx = i
break
if start_idx == -1:
# Fallback to simple index mapping if we can't find description
for i, cell in enumerate(cells):
key = data_headers[i] if i < len(data_headers) else f"column_{i}"
result[key] = cell
return result
# Map cells starting from description
for i, cell in enumerate(cells):
header_idx = start_idx + i
if header_idx < len(data_headers):
key = data_headers[header_idx]
result[key] = cell
else:
result[f"extra_{i}"] = cell
return result
def _parse_lots(lot_rows: Sequence[Sequence[str]]) -> list[Lot]:
lots: list[Lot] = []
for cells in lot_rows:
if not cells:
continue
# New modal table columns:
# 0: Open Date, 1: Quantity, 2: Price, 3: Cost/Share, 4: Market Value, 5: Cost Basis, ...
acquired_date = cells[0].strip() if len(cells) > 0 else None
quantity = _parse_float(cells[1] if len(cells) > 1 else None)
# In the modal table, index 5 is Cost Basis. index 3 is Cost/Share.
# Position-level Lot contract has 'cost_basis' field which typically means total cost.
cost_basis = _parse_decimal(cells[5] if len(cells) > 5 else None)
# lot_id isn't explicitly in the table, we'll use holding period or empty
lot_id = cells[8].strip() if len(cells) > 8 else None
lots.append(
Lot(
acquired_date=acquired_date or None,
quantity=quantity or 0.0,
cost_basis=cost_basis,
lot_id=lot_id or None,
)
)
return lots
def _row_to_position(row_map: dict[str, str], lots_rows: Sequence[Sequence[str]], symbol: str = "") -> Position:
# Symbol is now passed from data-symbol attribute on row
# Description is in the first visible cell
description = row_map.get('description') or row_map.get('name') or row_map.get('column_1') or ""
# Price is typically in column labeled 'price' or similar
# From debug info: 'price chng $' is next, but market price was likely mapped earlier or skipped
# Actually 'price' was likely one of the empty headers that didn't have a button?
# No, debug info shows: [..., 'Qty', '', 'Price Chng $', ...]
# And cells: [..., '2,944.633', 'TITLE:04/24/2026', 'TITLE:+1.13%', ...]
# 'TITLE:04/24/2026' corresponds to the empty header between Qty and Price Chng $
# That title contains the date, but the cell text is usually the price.
market_price = _parse_decimal(
row_map.get('price')
or row_map.get('market price')
or row_map.get('last price')
or row_map.get('empty_4')
or row_map.get('empty_5')
)
# Quantity
quantity = _parse_float(row_map.get('qty') or row_map.get('quantity'))
market_value = _parse_decimal(row_map.get('mkt val') or row_map.get('market value'))
# Cost basis mapping
cost_basis_total = _parse_decimal(
row_map.get('cost basis')
or row_map.get('total cost')
)
unrealized_gain = _parse_decimal(
row_map.get('gain/loss $')
or row_map.get('unrealized gain')
or row_map.get('empty_11') # Adjusted index
)
unrealized_gain_pct = _parse_float(
row_map.get('gain/loss %')
or row_map.get('unrealized gain %')
or row_map.get('empty_12')
)
asset_type = classify_asset(symbol, description)
lots = _parse_lots(lots_rows)
return Position(
symbol=symbol or "",
description=description or None,
asset_type=asset_type,
quantity=quantity,
market_price=market_price,
market_value=market_value,
cost_basis_total=cost_basis_total,
unrealized_gain=unrealized_gain,
unrealized_gain_pct=unrealized_gain_pct,
lots=lots,
)
async def get_positions(
account: AccountSummary | str | None = None,
*,
include_non_equity: bool = False,
debug: bool = False,
) -> Envelope[list[Position]]:
cookies = await ensure_cookies()
if not cookies:
return fail("Unable to establish Schwab session.", ErrorType.AUTHENTICATION, retryable=False)
config = load_config()
playwright_url = get_playwright_url(config)
playwright = browser = context = page = None
try:
playwright, browser = await connect(playwright_url)
context = await new_context(browser, cookies=cookies)
page = await new_page(context)
if not await goto_with_auth_check(page, context, POSITIONS_URL, debug=debug):
return fail("Failed to load Schwab positions page.", ErrorType.AUTHENTICATION, retryable=True)
if account:
requested_id = account.id if isinstance(account, AccountSummary) else account
if debug:
print(f"DEBUG: Attempting to switch to account: {requested_id} via Summary page")
# Go to summary page to switch (much more stable than dropdown)
await goto_with_auth_check(page, context, "https://client.schwab.com/app/accounts/summary/#/", debug=debug)
await page.wait_for_timeout(3000)
# Find and click the account row link
clicked = await page.evaluate("""
(query) => {
const rows = Array.from(document.querySelectorAll('sdps-table-row, tr'));
const targetRow = rows.find(r => r.innerText.includes(query) || r.textContent.includes(query));
if (targetRow) {
const link = targetRow.querySelector('a.acctNavigate-button-link');
if (link) {
link.click();
return true;
}
}
return false;
}
""", requested_id)
if clicked:
if debug:
print(f"DEBUG: Clicked account {requested_id} on summary page")
await page.wait_for_timeout(5000)
else:
if debug:
print(f"DEBUG: Failed to find account {requested_id} on summary page, trying dropdown as fallback...")
from ..transactions.scraper import switch_account_on_page
await switch_account_on_page(page, requested_id, context=context, debug=debug)
# Ensure we are on positions page for the selected account
if "/accounts/positions" not in page.url:
await goto_with_auth_check(page, context, POSITIONS_URL, debug=debug)
if debug:
html = await page.content()
save_debug_artifact("positions_page_initial.html", html)
await page.wait_for_selector('#positionsDetails', timeout=45000)
await page.wait_for_timeout(1000)
# Try to expand lots using a more reliable evaluate-based approach
try:
expanded_count = await page.evaluate("""
() => {
const buttons = Array.from(document.querySelectorAll('tr.position-row sdps-button[sdps-id="costBasisTBD"] button'));
let count = 0;
buttons.forEach(btn => {
// Check if already expanded (usually has a different icon or state, but clicking again often toggles)
// For now we just click them all.
btn.click();
count++;
});
return count;
}
""")
if debug:
print(f"Clicked {expanded_count} potential lot expanders")
if expanded_count > 0:
await page.wait_for_timeout(2000) # Wait for expansion
except Exception as e:
if debug:
print(f"Error expanding lots: {e}")
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
await page.wait_for_timeout(1500)
if debug:
html = await page.content()
save_debug_artifact("positions_page_scrolled.html", html)
png = await page.screenshot(full_page=True)
save_debug_artifact("positions_page.png", png)
# 1. Get headers once
headers = await page.evaluate("""
() => {
const table = document.querySelector('#positionsDetails');
if (!table) return [];
return Array.from(table.querySelectorAll('thead tr th')).map(th => {
const btn = th.querySelector('button, .sdps-tables__header-text');
const text = (btn ? (btn.innerText || btn.textContent) : (th.innerText || th.textContent)) || '';
return text.trim().replace(/\\u00a0/g, ' ').replace(/\\u200b/g, '').split('\\n')[0].trim().toLowerCase();
});
}
""")
if not headers:
return fail("Positions table headers not found.", ErrorType.PARSING, retryable=True)
# 1.5 Pre-cleanup: Close any accidentally opened modals
try:
open_modals = await page.query_selector_all('app-lot sdps-modal[sdps-id="open-lot-overlay"].sdps-modal--open')
for m in open_modals:
close = await m.query_selector('button.sdps-modal__close')
if close:
await close.click(force=True)
await page.wait_for_timeout(500)
except Exception:
pass
# 2. Get all position rows metadata first to avoid stale handle issues
position_metadata = await page.evaluate("""
() => {
const rows = Array.from(document.querySelectorAll('tr.position-row'));
return rows.map((row, index) => {
const symbol = row.getAttribute('data-symbol') || '';
const cells = Array.from(row.querySelectorAll('td')).map((cell) => {
const btn = cell.querySelector('button, a, .sdps-button');
if (btn) {
const clone = btn.cloneNode(true);
clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove());
let txt = clone.innerText.trim();
if (!txt && btn.hasAttribute('title')) txt = btn.getAttribute('title').trim();
return txt;
}
const titledSpan = cell.querySelector('span[title]');
const clone = cell.cloneNode(true);
clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove());
let txt = clone.innerText.trim();
// If no direct text but has a title with a number, use that
if (!txt && titledSpan && titledSpan.getAttribute('title')) {
const t = titledSpan.getAttribute('title').trim();
if (t.includes('$') || /^[+-]?[\\d,.]+$/.test(t)) return t;
}
return txt;
});
return { symbol, cells, index };
});
}
""")
if debug:
print(f"Found {len(position_metadata)} positions to process")
all_positions: list[Position] = []
for meta in position_metadata:
symbol = meta['symbol']
idx = meta['index']
# Re-fetch row for lot expansion if needed
lots_data = []
try:
rows = await page.query_selector_all('tr.position-row')
if idx < len(rows):
row = rows[idx]
expander = await row.query_selector('sdps-button[sdps-id="costBasisTBD"] button')
if expander:
await expander.scroll_into_view_if_needed()
# Use force=True because sometimes modals/overlays block the click in Schwab's UI
await expander.click(force=True)
# Wait for modal to appear
await page.wait_for_timeout(1000)
# Find the active modal (not inert, visible)
modal_handle = None
modals = await page.query_selector_all('app-lot sdps-modal[sdps-id="open-lot-overlay"]')
for m in modals:
is_hidden = await m.evaluate('el => el.getAttribute("aria-hidden") === "true" || el.hasAttribute("inert")')
if not is_hidden:
modal_handle = m
break
if not modal_handle and modals:
modal_handle = modals[-1] # Fallback to last one
if modal_handle:
modal_id = await modal_handle.get_attribute('modal-id')
if debug:
print(f"Processing modal {modal_id} for {symbol}")
# Wait for table to be populated
try:
await modal_handle.wait_for_selector('#responsiveLotTable tbody tr.data-row', timeout=3000)
except Exception:
pass
# Extract lots from this specific modal
lots_data = await page.evaluate(f"""
(mId) => {{
const modal = document.querySelector(`app-lot sdps-modal[modal-id="${{mId}}"]`);
if (!modal) return [];
const lotTable = modal.querySelector('#responsiveLotTable');
if (!lotTable) return [];
const lotRows = Array.from(lotTable.querySelectorAll('tbody tr.data-row'));
return lotRows.map(r => {{
return Array.from(r.querySelectorAll('th, td')).map(c => {{
const clone = c.cloneNode(true);
clone.querySelectorAll('sup, .sdps-sr-only, .transactionCostColor').forEach(el => el.remove());
return clone.innerText.trim();
}});
}});
}}
""", modal_id)
# Close this specific modal
close_btn = await modal_handle.query_selector('button.sdps-modal__close')
if close_btn:
await close_btn.click(force=True)
await page.wait_for_timeout(1000)
except Exception as e:
if debug:
print(f"Error expanding lots for {symbol}: {e}")
row_map = _map_row(headers, meta['cells'])
position = _row_to_position(row_map, lots_data, symbol=symbol)
if not include_non_equity and position.asset_type not in {"EQUITY", "ETF"}:
continue
all_positions.append(position)
if not all_positions:
return fail("No positions matched the requested criteria.", ErrorType.VALIDATION, retryable=False)
return ok(all_positions)
except Exception as exc:
if debug:
import traceback
traceback.print_exc()
return fail(str(exc), ErrorType.UNKNOWN, retryable=True)
finally:
await _safe_close_page(page)
await _safe_close_context(context)
await _safe_close_browser(browser)
await _safe_stop_playwright(playwright)
async def _safe_close_page(page) -> None:
if page is None:
return
try:
await page.close()
except Exception:
pass
async def _safe_close_context(context) -> None:
if context is None:
return
try:
await context.close()
except Exception:
pass
async def _safe_close_browser(browser) -> None:
if browser is None:
return
try:
await browser.close()
except Exception:
pass
async def _safe_stop_playwright(playwright) -> None:
if playwright is None:
return
try:
await playwright.stop()
except Exception:
pass