from __future__ import annotations import re from decimal import Decimal, InvalidOperation from typing import Any, Optional, Sequence from ...browser.auth import ensure_cookies from ...browser.client import connect, new_context, new_page from ...browser.navigation import goto_with_auth_check from ...core import AccountSummary, Envelope, ErrorType, Lot, Position, fail, ok from ...core.config import get_playwright_url, load_config from ...utils.logging import save_debug_artifact POSITIONS_URL = "https://client.schwab.com/app/accounts/positions/#/" def _parse_decimal(value: str | None) -> Optional[Decimal]: if not value: return None cleaned = value.strip() if not cleaned or cleaned in {"-", "--"}: return None negative = False if cleaned.startswith("(") and cleaned.endswith(")"): negative = True cleaned = ( cleaned.replace("$", "") .replace(",", "") .replace("(", "") .replace(")", "") .replace("−", "-") .replace("%", "") .strip() ) if not cleaned: return None try: parsed = Decimal(cleaned) if negative or parsed < 0: parsed = -abs(parsed) return parsed except InvalidOperation: return None def _parse_float(value: str | None) -> Optional[float]: decimal_value = _parse_decimal(value) if decimal_value is None: return None try: return float(decimal_value) except (ValueError, InvalidOperation): return None def _normalize_account_label(label: str) -> AccountSummary: normalized = re.sub(r"\s+", " ", label).strip() last4_match = re.search(r"(\d{3,4})\b", normalized.replace(" ", "")) last4 = last4_match.group(1)[-4:] if last4_match else None type_match = re.search(r"^[A-Za-z&'\- ]+", normalized) account_type = re.sub(r"\s+", "_", type_match.group(0).strip()) if type_match else "Account" account_id = f"{account_type}-{last4}" if last4 else account_type return AccountSummary( id=account_id, label=normalized, type=account_type, last4=last4, is_margin="margin" in normalized.lower(), ) def _match_account(candidate: AccountSummary, requested: AccountSummary | str | None) -> bool: if requested is None: return True if isinstance(requested, AccountSummary): requested_values = { requested.id.lower(), requested.label.lower(), } if requested.last4: requested_values.add(requested.last4.lower()) else: lookup = requested.strip().lower() requested_values = {lookup} candidate_values = {candidate.id.lower(), candidate.label.lower()} if candidate.last4: candidate_values.add(candidate.last4.lower()) return bool(candidate_values & requested_values) def classify_asset(symbol: str | None, description: str | None) -> str: if symbol: sym = symbol.strip().upper() else: sym = "" desc = (description or "").strip().upper() if sym and re.fullmatch(r"[A-Z]{1,5}", sym): if "ETF" in desc: return "ETF" if any(kw in desc for kw in ["FUND", "MUTUAL"]): return "MUTUAL_FUND" return "EQUITY" if sym and re.search(r"\d", sym) and len(sym) > 5: return "OPTION" if any(kw in desc for kw in ["BOND", "CD", "TREASURY"]): return "BOND" if sym in {"CASH", "MMDA", "SWEEP"} or "CASH" in desc: return "CASH" if "ETF" in desc: return "ETF" if "FUND" in desc: return "MUTUAL_FUND" return "OTHER" async def _evaluate_table(page) -> dict[str, Any] | None: return await page.evaluate( """ () => { const table = document.querySelector('#positionsDetails'); if (!table) { return null; } const headers = Array.from(table.querySelectorAll('thead tr th')).map((th) => { const btn = th.querySelector('button, .sdps-tables__header-text'); if (btn) { return (btn.innerText || btn.textContent || '').trim(); } return (th.innerText || th.textContent || '').trim(); }); const rowElements = Array.from(table.querySelectorAll('tbody tr')); const rows = []; let current = null; let currentAccount = null; const isLotRow = (row) => { const klass = (row.className || '').toLowerCase(); const tagName = (row.tagName || '').toLowerCase(); return klass.includes('lot') || klass.includes('child') || tagName.includes('app-lot'); }; const isPositionRow = (row) => { const klass = (row.className || '').toLowerCase(); return klass.includes('position-row'); }; const isAccountHeader = (row) => { const klass = (row.className || '').toLowerCase(); const text = (row.textContent || '').trim(); return !klass.includes('position-row') && (klass.includes('highlight-row') || klass.includes('border-top-dark')) && text.includes('account panel'); }; for (const row of rowElements) { // Check if this is an account header row if (isAccountHeader(row)) { const text = row.textContent.trim(); // Extract account name from account panel text const match = text.match(/account panel[\\s\\n]+([^\\n]+)/); if (match) { currentAccount = match[1].trim(); } continue; } const cells = Array.from(row.querySelectorAll('td')).map((cell) => { // 1. Try to find a title attribute on a span (often has more precise value) const titledSpan = cell.querySelector('span[title]'); if (titledSpan && titledSpan.getAttribute('title').trim().length > 0) { const title = titledSpan.getAttribute('title').trim(); if (title.includes('$') || /^[+-]?[\\d,.]+$/.test(title) || title.includes('%')) { return title; } } // 2. Try to find text directly or within common button/link wrappers const btn = cell.querySelector('button, a, .sdps-button'); if (btn) { // Check button title too if (btn.hasAttribute('title') && btn.getAttribute('title').trim().length > 0) { return btn.getAttribute('title').trim(); } // Ignore some internal elements like superscripts if present const clone = btn.cloneNode(true); clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove()); return (clone.innerText || clone.textContent || '').trim(); } // 3. Just clean up the cell text const clone = cell.cloneNode(true); clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove()); return (clone.innerText || clone.textContent || '').trim(); }); if (!cells.length) { continue; } if (isLotRow(row)) { if (current) { // For lots, we typically skip the first two columns (empty/checkbox) current.lots.push(cells.slice(2)); } } else if (isPositionRow(row)) { // Extract symbol from data-symbol attribute const symbol = row.getAttribute('data-symbol') || ''; current = { type: 'position', cells: cells, lots: [], symbol: symbol, account: currentAccount }; rows.push(current); } } return { headers, rows }; } """ ) def _map_row(headers: Sequence[str], cells: Sequence[str]) -> dict[str, str]: result: dict[str, str] = {} # Filter out empty headers to get the list of "real" data columns data_headers = [] for h in headers: # Replace non-breaking spaces and other special whitespace with regular spaces h_clean = h.replace('\u00a0', ' ').replace('\u200b', '').strip() name = h_clean.split('\n')[0].strip().lower() if name: data_headers.append(name) else: data_headers.append(f"empty_{len(data_headers)}") # We skip headers that definitely don't have cells (checkbox, symbol is usually in data-symbol) # Looking at debug output, 'description' is the first cell. # So we find where 'description' or 'name' is in our data_headers. start_idx = -1 for i, h in enumerate(data_headers): if h in {'description', 'name'}: start_idx = i break if start_idx == -1: # Fallback to simple index mapping if we can't find description for i, cell in enumerate(cells): key = data_headers[i] if i < len(data_headers) else f"column_{i}" result[key] = cell return result # Map cells starting from description for i, cell in enumerate(cells): header_idx = start_idx + i if header_idx < len(data_headers): key = data_headers[header_idx] result[key] = cell else: result[f"extra_{i}"] = cell return result def _parse_lots(lot_rows: Sequence[Sequence[str]]) -> list[Lot]: lots: list[Lot] = [] for cells in lot_rows: if not cells: continue # New modal table columns: # 0: Open Date, 1: Quantity, 2: Price, 3: Cost/Share, 4: Market Value, 5: Cost Basis, ... acquired_date = cells[0].strip() if len(cells) > 0 else None quantity = _parse_float(cells[1] if len(cells) > 1 else None) # In the modal table, index 5 is Cost Basis. index 3 is Cost/Share. # Position-level Lot contract has 'cost_basis' field which typically means total cost. cost_basis = _parse_decimal(cells[5] if len(cells) > 5 else None) # lot_id isn't explicitly in the table, we'll use holding period or empty lot_id = cells[8].strip() if len(cells) > 8 else None lots.append( Lot( acquired_date=acquired_date or None, quantity=quantity or 0.0, cost_basis=cost_basis, lot_id=lot_id or None, ) ) return lots def _row_to_position(row_map: dict[str, str], lots_rows: Sequence[Sequence[str]], symbol: str = "") -> Position: # Symbol is now passed from data-symbol attribute on row # Description is in the first visible cell description = row_map.get('description') or row_map.get('name') or row_map.get('column_1') or "" # Price is typically in column labeled 'price' or similar # From debug info: 'price chng $' is next, but market price was likely mapped earlier or skipped # Actually 'price' was likely one of the empty headers that didn't have a button? # No, debug info shows: [..., 'Qty', '', 'Price Chng $', ...] # And cells: [..., '2,944.633', 'TITLE:04/24/2026', 'TITLE:+1.13%', ...] # 'TITLE:04/24/2026' corresponds to the empty header between Qty and Price Chng $ # That title contains the date, but the cell text is usually the price. market_price = _parse_decimal( row_map.get('price') or row_map.get('market price') or row_map.get('last price') or row_map.get('empty_4') or row_map.get('empty_5') ) # Quantity quantity = _parse_float(row_map.get('qty') or row_map.get('quantity')) market_value = _parse_decimal(row_map.get('mkt val') or row_map.get('market value')) # Cost basis mapping cost_basis_total = _parse_decimal( row_map.get('cost basis') or row_map.get('total cost') ) unrealized_gain = _parse_decimal( row_map.get('gain/loss $') or row_map.get('unrealized gain') or row_map.get('empty_11') # Adjusted index ) unrealized_gain_pct = _parse_float( row_map.get('gain/loss %') or row_map.get('unrealized gain %') or row_map.get('empty_12') ) asset_type = classify_asset(symbol, description) lots = _parse_lots(lots_rows) return Position( symbol=symbol or "", description=description or None, asset_type=asset_type, quantity=quantity, market_price=market_price, market_value=market_value, cost_basis_total=cost_basis_total, unrealized_gain=unrealized_gain, unrealized_gain_pct=unrealized_gain_pct, lots=lots, ) async def get_positions( account: AccountSummary | str | None = None, *, include_non_equity: bool = False, debug: bool = False, ) -> Envelope[list[Position]]: cookies = await ensure_cookies() if not cookies: return fail("Unable to establish Schwab session.", ErrorType.AUTHENTICATION, retryable=False) config = load_config() playwright_url = get_playwright_url(config) playwright = browser = context = page = None try: playwright, browser = await connect(playwright_url) context = await new_context(browser, cookies=cookies) page = await new_page(context) if not await goto_with_auth_check(page, context, POSITIONS_URL, debug=debug): return fail("Failed to load Schwab positions page.", ErrorType.AUTHENTICATION, retryable=True) if account: requested_id = account.id if isinstance(account, AccountSummary) else account if debug: print(f"DEBUG: Attempting to switch to account: {requested_id} via Summary page") # Go to summary page to switch (much more stable than dropdown) await goto_with_auth_check(page, context, "https://client.schwab.com/app/accounts/summary/#/", debug=debug) await page.wait_for_timeout(3000) # Find and click the account row link clicked = await page.evaluate(""" (query) => { const rows = Array.from(document.querySelectorAll('sdps-table-row, tr')); const targetRow = rows.find(r => r.innerText.includes(query) || r.textContent.includes(query)); if (targetRow) { const link = targetRow.querySelector('a.acctNavigate-button-link'); if (link) { link.click(); return true; } } return false; } """, requested_id) if clicked: if debug: print(f"DEBUG: Clicked account {requested_id} on summary page") await page.wait_for_timeout(5000) else: if debug: print(f"DEBUG: Failed to find account {requested_id} on summary page, trying dropdown as fallback...") from ..transactions.scraper import switch_account_on_page await switch_account_on_page(page, requested_id, context=context, debug=debug) # Ensure we are on positions page for the selected account if "/accounts/positions" not in page.url: await goto_with_auth_check(page, context, POSITIONS_URL, debug=debug) if debug: html = await page.content() save_debug_artifact("positions_page_initial.html", html) await page.wait_for_selector('#positionsDetails', timeout=45000) await page.wait_for_timeout(1000) await page.evaluate('window.scrollTo(0, document.body.scrollHeight)') await page.wait_for_timeout(1500) if debug: html = await page.content() save_debug_artifact("positions_page_scrolled.html", html) png = await page.screenshot(full_page=True) save_debug_artifact("positions_page.png", png) # 1. Get headers once headers = await page.evaluate(""" () => { const table = document.querySelector('#positionsDetails'); if (!table) return []; return Array.from(table.querySelectorAll('thead tr th')).map(th => { const btn = th.querySelector('button, .sdps-tables__header-text'); const text = (btn ? (btn.innerText || btn.textContent) : (th.innerText || th.textContent)) || ''; return text.trim().replace(/\\u00a0/g, ' ').replace(/\\u200b/g, '').split('\\n')[0].trim().toLowerCase(); }); } """) if not headers: return fail("Positions table headers not found.", ErrorType.PARSING, retryable=True) # 1.5 Pre-cleanup: Close any accidentally opened modals try: open_modals = await page.query_selector_all('app-lot sdps-modal[sdps-id="open-lot-overlay"].sdps-modal--open') for m in open_modals: close = await m.query_selector('button.sdps-modal__close') if close: await close.click(force=True) await page.wait_for_timeout(500) except Exception: pass # 2. Get all position rows metadata first to avoid stale handle issues position_metadata = await page.evaluate(""" () => { const rows = Array.from(document.querySelectorAll('tr.position-row')); return rows.map((row, index) => { const symbol = row.getAttribute('data-symbol') || ''; const cells = Array.from(row.querySelectorAll('td')).map((cell) => { const btn = cell.querySelector('button, a, .sdps-button'); if (btn) { const clone = btn.cloneNode(true); clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove()); let txt = clone.innerText.trim(); if (!txt && btn.hasAttribute('title')) txt = btn.getAttribute('title').trim(); return txt; } const titledSpan = cell.querySelector('span[title]'); const clone = cell.cloneNode(true); clone.querySelectorAll('sup, .sdps-sr-only').forEach(el => el.remove()); let txt = clone.innerText.trim(); // If no direct text but has a title with a number, use that if (!txt && titledSpan && titledSpan.getAttribute('title')) { const t = titledSpan.getAttribute('title').trim(); if (t.includes('$') || /^[+-]?[\\d,.]+$/.test(t)) return t; } return txt; }); return { symbol, cells, index }; }); } """) if debug: print(f"Found {len(position_metadata)} positions to process") all_positions: list[Position] = [] for meta in position_metadata: symbol = meta['symbol'] idx = meta['index'] # Re-fetch row for lot expansion if needed lots_data = [] try: rows = await page.query_selector_all('tr.position-row') if idx < len(rows): row = rows[idx] expander = await row.query_selector('sdps-button[sdps-id="costBasisTBD"] button') if expander: await expander.scroll_into_view_if_needed() # Use force=True because sometimes modals/overlays block the click in Schwab's UI await expander.click(force=True) # Wait for modal to appear await page.wait_for_timeout(1000) # Find the active modal (not inert, visible, and matches our symbol) modal_handle = None modals = await page.query_selector_all('app-lot sdps-modal[sdps-id="open-lot-overlay"]') for m in modals: is_hidden = await m.evaluate('el => el.getAttribute("aria-hidden") === "true" || el.hasAttribute("inert")') if is_hidden: continue # Verify title matches symbol to avoid stale modal data title_text = await m.evaluate('el => el.querySelector(".sdps-modal__title")?.innerText || ""') if symbol.upper() in title_text.upper(): modal_handle = m break if modal_handle: modal_id = await modal_handle.get_attribute('modal-id') if debug: print(f"Processing modal {modal_id} for {symbol}") # Wait for table to be populated try: await modal_handle.wait_for_selector('#responsiveLotTable tbody tr.data-row', timeout=3000) except Exception: pass # Extract lots from this specific modal lots_data = await page.evaluate(f""" (mId) => {{ const modal = document.querySelector(`app-lot sdps-modal[modal-id="${{mId}}"]`); if (!modal) return []; const lotTable = modal.querySelector('#responsiveLotTable'); if (!lotTable) return []; const lotRows = Array.from(lotTable.querySelectorAll('tbody tr.data-row')); return lotRows.map(r => {{ return Array.from(r.querySelectorAll('th, td')).map(c => {{ const clone = c.cloneNode(true); clone.querySelectorAll('sup, .sdps-sr-only, .transactionCostColor').forEach(el => el.remove()); return clone.innerText.trim(); }}); }}); }} """, modal_id) # Close this specific modal close_btn = await modal_handle.query_selector('button.sdps-modal__close') if close_btn: await close_btn.click(force=True) # Wait for modal to actually be removed or hidden try: await page.wait_for_selector(f'app-lot sdps-modal[modal-id="{modal_id}"]', state='hidden', timeout=3000) except Exception: pass else: if debug: print(f"DEBUG: Could not find matching visible modal for {symbol}") except Exception as e: if debug: print(f"Error expanding lots for {symbol}: {e}") row_map = _map_row(headers, meta['cells']) position = _row_to_position(row_map, lots_data, symbol=symbol) if not include_non_equity and position.asset_type not in {"EQUITY", "ETF"}: continue all_positions.append(position) if not all_positions: return fail("No positions matched the requested criteria.", ErrorType.VALIDATION, retryable=False) return ok(all_positions) except Exception as exc: if debug: import traceback traceback.print_exc() return fail(str(exc), ErrorType.UNKNOWN, retryable=True) finally: await _safe_close_page(page) await _safe_close_context(context) await _safe_close_browser(browser) await _safe_stop_playwright(playwright) async def _safe_close_page(page) -> None: if page is None: return try: await page.close() except Exception: pass async def _safe_close_context(context) -> None: if context is None: return try: await context.close() except Exception: pass async def _safe_close_browser(browser) -> None: if browser is None: return try: await browser.close() except Exception: pass async def _safe_stop_playwright(playwright) -> None: if playwright is None: return try: await playwright.stop() except Exception: pass