from __future__ import annotations import asyncio import re import time from datetime import datetime, timezone from typing import Optional, List, Dict, Any from ...utils.logging import save_debug_artifact # Export options constants DEFAULT_HISTORY_URL = "https://client.schwab.com/app/accounts/history/#/" async def goto_history(page, context=None, debug: bool = False) -> None: if context: from ...browser.navigation import goto_with_auth_check auth_success = await goto_with_auth_check(page, context, DEFAULT_HISTORY_URL, debug=debug) if not auth_success: raise Exception("Authentication failed during navigation to history page") else: # Fallback for cases where context isn't available await page.goto(DEFAULT_HISTORY_URL, timeout=60000) await page.wait_for_load_state('domcontentloaded') # Wait for one of the known panels in history page to ensure full UI ready try: await page.wait_for_selector('.sdps-page-header__account-selector, #account-selector', timeout=15000) except Exception: # Fallback wait await page.wait_for_timeout(5000) if debug: try: png = await page.screenshot(full_page=True) save_debug_artifact("debug_export_history_loaded.png", png) except Exception: pass async def open_export_panel(page, debug: bool = False) -> None: # Close any obstructing overlay dialogs first (e.g., What's changed) try: overlays = page.locator("div[role='dialog']").filter(has_text="What's changed") if await overlays.count() > 0 and await overlays.first.is_visible(): if debug: print("DEBUG: Closing 'What's changed' overlay before export") close_btn = overlays.first.locator("button[aria-label='Close'], button:has-text('Close')").first try: await close_btn.click() except Exception: await page.keyboard.press('Escape') await page.wait_for_timeout(500) except Exception: pass if debug: print("DEBUG: Clicking top-level Export button to open options panel") # Use aria-label selector to target the visible Export button (not the hidden one in dialogs) export_button = page.locator('button[aria-label="Export"]').first await export_button.scroll_into_view_if_needed() await export_button.click() await page.wait_for_timeout(1500) async def select_time_period(page, time_period: Optional[str], container=None, debug: bool = False) -> None: if not time_period: return try: scope = container or page period_selector = scope.locator(f'text={time_period}').first if await period_selector.is_visible(): await period_selector.click() await page.wait_for_timeout(1000) if debug: print(f"DEBUG: Selected time period: {time_period}") except Exception: # Non-fatal; keep defaults pass async def ensure_csv_format(page, container=None, debug: bool = False) -> None: try: scope = container or page csv_option = scope.locator('text=CSV').first if await csv_option.is_visible(): await csv_option.click() await page.wait_for_timeout(1000) if debug: print("DEBUG: Ensured CSV format is selected") except Exception: pass def parse_suggested_filename(filename: str) -> Dict[str, str]: """Parse Schwab's suggested filename into an account label and timestamp. Robustly handles extra underscores, composite account names, and suffixes. Returns a normalized label like "Joint_XXX604" and extracted timestamp. """ # Timestamp ts_match = re.search(r"(\d{8}-\d{6})", filename) ts = ts_match.group(1) if ts_match else datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S') stem = filename.rsplit('.', 1)[0] # Remove trailing _Transactions_ if present stem_wo_suffix = re.sub(r"_Transactions_\d{8}-\d{6}$", "", stem) # Try direct XXX pattern m = re.search(r"XXX(\d{3,4})", stem_wo_suffix) if m: ending = m.group(1) prefix = stem_wo_suffix.split(f"XXX{ending}")[0].rstrip('_') # Sanitize prefix to create label prefix = re.sub(r"[^A-Za-z0-9]+", "_", prefix).strip('_') or "Account" label = f"{prefix}_XXX{ending}" return {"label": label, "ts": ts} # Try '… 604' or '... 604' or 'ending in 6 0 4' m2 = re.search(r"(?:…|\.\.\.|ending in)\s*([0-9\s]{3,8})", stem_wo_suffix, flags=re.IGNORECASE) if m2: digits = re.sub(r"\s+", "", m2.group(1)) ending = digits[-3:] # Prefix is text before ellipsis/ending in phrase prefix = re.split(r"(?:…|\.\.\.|ending in)", stem_wo_suffix, flags=re.IGNORECASE)[0].rstrip('_ ') prefix = re.sub(r"[^A-Za-z0-9]+", "_", prefix).strip('_') or "Account" label = f"{prefix}_XXX{ending}" return {"label": label, "ts": ts} # Fallback safe = re.sub(r"[^A-Za-z0-9]+", "_", stem_wo_suffix).strip('_') return {"label": safe, "ts": ts} def _label_matches_account_query(account_query: Optional[str], label: str) -> bool: """Determine whether a parsed filename `label` (e.g., "Joint_XXX604") matches an `account_query` which could be a full label ("PLA_Assets_XXX674"), an ending ("604"), or a type substring ("Joint"/"PLA"). This function is used to verify that the downloaded file corresponds to the intended account before we accept it. """ if not account_query: return True query = str(account_query).strip() label_lower = label.lower() query_lower = query.lower() # Exact label match if query == label: return True # Match by ending digits in the label (from _XXX####) m = re.search(r"XXX(\d{3,4})$", label) if m: ending = m.group(1) if query.isdigit(): # Allow matching last 3 digits if ending.endswith(query): return True # Also allow matching on suffix-only query like 'XXX604' if query_upper := query.upper(): if query_upper == f"XXX{ending}": return True # Substring in label (match by type/name) if query_lower in label_lower: return True return False def _normalize_label_from_text(text: str) -> Optional[str]: """Create a normalized account label (Type_XXX###) from raw menu text.""" if not text: return None # Collapse whitespace t = re.sub(r"\s+", " ", text).strip() # Try XXX123 pattern m = re.search(r"XXX(\d{3,4})", t) if m: ending = m.group(1)[-3:] prefix = t.split(f"XXX{m.group(1)}")[0].strip(" -•") prefix = re.sub(r"[^A-Za-z0-9]+", "_", prefix).strip('_') or "Account" return f"{prefix}_XXX{ending}" # Try 'ending in' or ellipsis with digits m2 = re.search(r"(?:ending in|…|\.\.\.)\s*([0-9\s]{3,8})", t, flags=re.IGNORECASE) if m2: ending = re.sub(r"\s+", "", m2.group(1))[-3:] prefix = re.split(r"(?:ending in|…|\.\.\.)", t, flags=re.IGNORECASE)[0].strip(" -•") prefix = re.sub(r"[^A-Za-z0-9]+", "_", prefix).strip('_') or "Account" return f"{prefix}_XXX{ending}" return None def parse_account_text(text): """Parse account dropdown text to extract structured account info with enhanced pattern matching""" text = text.strip() lines = [line.strip() for line in text.split('\n') if line.strip()] account_type = None account_ending = None # Enhanced pattern matching with multiple strategies # First, check for the live Schwab format: "TypeType…XXXAccount ending in X Y Z" live_format_match = re.search(r'^([A-Za-z\s]+)\1…(\d{3,4})Account ending in ([\d\s]+)', text) if live_format_match: account_type = live_format_match.group(1).strip() account_ending = live_format_match.group(2) # Validate the ending matches the spaced version spaced_ending = live_format_match.group(3).replace(' ', '') if account_ending == spaced_ending: if account_type and account_ending: normalized_type = account_type.replace(' ', '_').replace('-', '_') label = f'{normalized_type}_XXX{account_ending[-3:]}' return { 'label': label, 'type': account_type, 'ending': account_ending[-3:] } # Parse line by line for other formats for line in lines: # Strategy 1: 'Account ending in X Y Z' format ending_match = re.search(r'Account ending in (\d \d \d)', line) if ending_match: account_ending = ending_match.group(1).replace(' ', '') continue # Strategy 2: 'Account ending in XXX' format (without spaces) ending_match_no_space = re.search(r'Account ending in (\d{3,4})', line) if ending_match_no_space: account_ending = ending_match_no_space.group(1) continue # Strategy 3: Account type with …XXX or ...XXX pattern type_match = re.search(r'^([A-Za-z\s]+)\s*[…\.]{1,3}(\d{3,4})', line) if type_match: account_type = type_match.group(1).strip() account_ending = type_match.group(2) continue # Strategy 4: Account type with XXX pattern type_match_xxx = re.search(r'^([A-Za-z\s]+)\s*XXX(\d{3,4})', line) if type_match_xxx: account_type = type_match_xxx.group(1).strip() account_ending = type_match_xxx.group(2) continue # Strategy 5: Direct account type and ending pattern (e.g., "Joint 604") direct_match = re.search(r'^([A-Za-z\s]+?)\s+(\d{3,4})\s*$', line) if direct_match: candidate_type = direct_match.group(1).strip() candidate_ending = direct_match.group(2) # Only accept if it looks like a known account type if any(known_type.lower() in candidate_type.lower() for known_type in ['joint', 'ira', 'individual', 'bogle', 'roth', 'general', 'pla', 'checking']): account_type = candidate_type account_ending = candidate_ending continue # Strategy 6: Just account type name (for multi-line parsing) known_account_types = [ 'Joint Account', 'Joint', 'IRA Account', 'IRA', 'Individual Account', 'Individual', 'Bogle', 'IRA Rachel', 'Roth IRA Rachel', 'PLA Assets', 'Roth IRA', 'ROTH IRA', 'General Checking', 'PLA Line', 'Roth', 'Traditional IRA' ] # Try exact match first if line in known_account_types and not account_type: account_type = line # Strategy 7: Partial matches for compound account types (preserve original line) if not account_type: for known_type in ['joint', 'ira', 'individual', 'bogle', 'roth', 'general', 'pla', 'checking']: if known_type in line.lower() and len(line.strip()) < 50 and len(line.strip()) > 2: # Use the original line text to preserve exact formatting account_type = line.strip() break # Final validation and formatting if account_type and account_ending: # Ensure ending is at least 3 digits if len(account_ending) >= 3: # Normalize account type for labeling normalized_type = account_type.replace(' ', '_').replace('-', '_') label = f'{normalized_type}_XXX{account_ending[-3:]}' return { 'label': label, 'type': account_type, 'ending': account_ending[-3:] } # Debug fallback - if we have promising text but couldn't parse it if any(keyword in text.lower() for keyword in ['joint', 'ira', 'individual', 'bogle', 'account']): # Extract any 3-4 digit number as potential account ending digit_match = re.search(r'\b(\d{3,4})\b', text) if digit_match: # Try to extract account type from context for keyword in ['joint', 'ira', 'individual', 'bogle', 'roth', 'general', 'pla']: if keyword in text.lower(): account_type = keyword.title() account_ending = digit_match.group(1) label = f'{account_type}_XXX{account_ending[-3:]}' return { 'label': label, 'type': account_type, 'ending': account_ending[-3:] } return None async def discover_accounts_with_numbers(page, debug: bool = False) -> List[Dict[str, str]]: """Discover accounts including their actual account numbers for API switching. Returns list of account info including: - label: Normalized label like "PLA_Assets_XXX674" - type: Account type like "PLA Assets" - ending: Last 3 digits like "674" - account_number: Full account number like "7485-7674" (if available) """ if debug: print("DEBUG: Discovering accounts with account numbers...") # First get basic account info basic_accounts = await discover_accounts_from_page(page, debug=debug) # Now try to get account numbers by examining dropdown elements more closely try: # Click account selector to open dropdown await page.locator('.sdps-page-header__account-selector, #account-selector').first.click() await page.wait_for_timeout(2000) # Look for elements with account numbers account_elements = await page.evaluate(''' () => { const elements = Array.from(document.querySelectorAll('button, a, [data-account], [data-number]')); return elements.map(el => { const text = (el.textContent || el.innerText || '').trim(); const dataAccount = el.getAttribute('data-account'); const dataNumber = el.getAttribute('data-number'); const onclick = el.onclick ? el.onclick.toString() : ''; const href = el.href || ''; // Look for account numbers in various attributes let accountNumber = null; // Check data attributes if (dataAccount && dataAccount.includes('-')) { accountNumber = dataAccount; } else if (dataNumber && dataNumber.includes('-')) { accountNumber = dataNumber; } // Check onclick handlers for account numbers const numberMatch = onclick.match(/(\\d{4}-\\d{3,4})/); if (numberMatch) { accountNumber = numberMatch[1]; } // Check href for account numbers const hrefMatch = href.match(/(\\d{4}-\\d{3,4})/); if (hrefMatch) { accountNumber = hrefMatch[1]; } return { text: text.substring(0, 100), accountNumber: accountNumber, element: el.tagName + (el.id ? '#' + el.id : '') + (el.className ? '.' + el.className.split(' ')[0] : '') }; }).filter(item => item.text.includes('ending in') || item.accountNumber); } ''') if debug: print(f"DEBUG: Found {len(account_elements)} elements with potential account numbers") for elem in account_elements[:5]: print(f"DEBUG: - {elem['text'][:50]} -> {elem['accountNumber']} ({elem['element']})") # Match account numbers to basic account info enhanced_accounts = [] for account in basic_accounts: enhanced_account = account.copy() enhanced_account['account_number'] = None # Try to find matching account number account_ending = account['ending'] for elem in account_elements: if account_ending in elem['text'] and elem['accountNumber']: enhanced_account['account_number'] = elem['accountNumber'] break enhanced_accounts.append(enhanced_account) # Close dropdown try: await page.keyboard.press('Escape') await page.wait_for_timeout(500) except: pass if debug: print(f"DEBUG: Enhanced accounts with numbers:") for acc in enhanced_accounts: print(f"DEBUG: - {acc['label']} -> {acc.get('account_number', 'NO_NUMBER')}") return enhanced_accounts except Exception as e: if debug: print(f"DEBUG: Error discovering account numbers: {e}") # Fall back to basic accounts without numbers return [dict(acc, account_number=None) for acc in basic_accounts] async def discover_accounts_from_page(page, debug: bool = False) -> List[Dict[str, str]]: """Discover account entries from the page-level selector dropdown with enhanced reliability.""" # Note: This function assumes the page is already on the transaction history page if debug: print("DEBUG: Starting enhanced account discovery...") # Take initial screenshot try: png = await page.screenshot(full_page=True) save_debug_artifact("debug_account_discovery_start.png", png) except Exception: pass # Enhanced account selector strategy with multiple attempts click_success = False max_attempts = 3 for attempt in range(max_attempts): if debug: print(f"DEBUG: Attempt {attempt + 1}/{max_attempts} - Searching for account selector...") # Enhanced selector discovery with more patterns account_selector_candidates = await page.evaluate(''' () => { const selectors = [ '#account-selector', '.sdps-page-header__account-selector', '[id*="account-selector"]', '[class*="account-selector"]', 'button[aria-label*="Account"]', 'button[title*="Account"]', '[data-testid*="account"]', 'button:has-text("Account")', '[class*="account"][class*="dropdown"]', '[class*="account"][class*="button"]' ]; const results = []; for (const selector of selectors) { try { const elements = document.querySelectorAll(selector); elements.forEach((el, i) => { if (el.offsetParent !== null && el.offsetWidth > 0 && el.offsetHeight > 0) { const text = (el.textContent || el.innerText || '').trim(); results.push({ selector: selector, index: i, id: el.id, className: el.className, text: text.substring(0, 100), tagName: el.tagName.toLowerCase(), isVisible: el.offsetParent !== null, hasAccountText: text.toLowerCase().includes('account') || text.match(/\\d{3}/) !== null }); } }); } catch (e) { // Skip selectors that cause errors } } return results.sort((a, b) => (b.hasAccountText ? 1 : 0) - (a.hasAccountText ? 1 : 0)); } ''') if debug and len(account_selector_candidates) > 0: print(f"DEBUG: Found {len(account_selector_candidates)} potential account selector elements:") for candidate in account_selector_candidates[:5]: # Show top candidates className = candidate.get('className', '')[:50] if candidate.get('className') else '' print(f"DEBUG: - {candidate['tagName']} {candidate['selector']}#{candidate['id']}.{className} text: '{candidate['text'][:50]}' hasAccountText: {candidate.get('hasAccountText')}") # Try clicking with enhanced strategy clicked = await page.evaluate(''' () => { const selectors = [ '.sdps-page-header__account-selector', '#account-selector', '[id*="account-selector"]', '[class*="account-selector"]', 'button[aria-label*="Account"]' ]; for (const selector of selectors) { const elements = document.querySelectorAll(selector); for (const button of elements) { if (button.offsetParent !== null && button.offsetWidth > 0 && button.offsetHeight > 0) { try { button.scrollIntoView({ behavior: 'smooth', block: 'center' }); button.click(); return { success: true, selector: selector, text: (button.textContent || '').trim().substring(0, 50) }; } catch (e) { continue; } } } } return { success: false }; } ''') if debug: print(f"DEBUG: Account selector click result: {clicked}") if clicked.get('success'): click_success = True break # Wait before retry if attempt < max_attempts - 1: if debug: print(f"DEBUG: Click attempt {attempt + 1} failed, waiting before retry...") await page.wait_for_timeout(2000) if not click_success: if debug: print("DEBUG: All account selector click attempts failed") # Take failure screenshot try: png = await page.screenshot(full_page=True) save_debug_artifact("debug_account_selector_click_failed.png", png) except Exception: pass return [] # Wait longer for dropdown to appear after successful click await page.wait_for_timeout(4000) # Enhanced dropdown discovery with better pattern matching dropdown = None dropdown_search_attempts = 2 for search_attempt in range(dropdown_search_attempts): if debug: print(f"DEBUG: Dropdown search attempt {search_attempt + 1}/{dropdown_search_attempts}") # Enhanced dropdown selector strategy dropdown_candidates = await page.evaluate(''' () => { const selectors = [ '[role="menu"]', '[role="listbox"]', '[role="dialog"]', '[class*="dropdown"]', '[class*="menu"]', '[class*="overlay"]', '[class*="modal"]', '[class*="account"]', '[class*="selector"]', 'div[style*="position: absolute"]', 'div[style*="z-index"]' ]; const candidates = []; for (const selector of selectors) { try { const elements = document.querySelectorAll(selector); elements.forEach((elem, i) => { if (elem.offsetParent !== null && elem.offsetWidth > 0 && elem.offsetHeight > 0) { const text = (elem.textContent || elem.innerText || '').trim(); const hasAccountPattern = ( text.includes('ending in') || /…\\d{3,4}|XXX\\d{3,4}|\\.\\.\\.\\d{3,4}/.test(text) || (/joint|ira|individual|bogle|account/i.test(text) && /\\d{3}/.test(text)) ); if (text.length > 10 && hasAccountPattern) { candidates.push({ selector: selector, index: i, element: elem, text: text.substring(0, 200), score: hasAccountPattern ? 1 : 0, className: elem.className }); } } }); } catch (e) { // Skip problematic selectors } } return candidates.sort((a, b) => b.score - a.score); } ''') if debug: print(f"DEBUG: Found {len(dropdown_candidates)} dropdown candidates") for candidate in dropdown_candidates[:3]: # Show top candidates preview = candidate.get('text', '').replace('\n', ' ')[:100] print(f"DEBUG: - {candidate['selector']} (score: {candidate.get('score')}) text: {preview}") # Select best candidate if dropdown_candidates: dropdown = await page.query_selector_all(dropdown_candidates[0]['selector']) if dropdown: dropdown = dropdown[dropdown_candidates[0]['index']] if debug: print(f"DEBUG: Selected dropdown with selector: {dropdown_candidates[0]['selector']}") break # If no dropdown found, wait and try again if search_attempt < dropdown_search_attempts - 1: if debug: print("DEBUG: No suitable dropdown found, waiting and retrying...") await page.wait_for_timeout(2000) # Try clicking again in case dropdown closed await page.evaluate(''' () => { const button = document.querySelector('.sdps-page-header__account-selector, #account-selector'); if (button) button.click(); } ''') await page.wait_for_timeout(2000) if not dropdown: # Close any open dropdowns and return empty await page.click('body') if debug: print("DEBUG: No suitable account dropdown found after all attempts") # Take failure screenshot for debugging try: png = await page.screenshot(full_page=True) save_debug_artifact("debug_account_dropdown_not_found.png", png) except Exception: pass return [] # Enhanced account parsing with better error handling if debug: # Take screenshot of dropdown for debugging try: png = await page.screenshot(full_page=True) save_debug_artifact("debug_account_dropdown_opened.png", png) except Exception: pass # Get all potential account elements with enhanced selection account_elements = await dropdown.query_selector_all('button, a, [role="option"], li, div, span') accounts = [] seen_endings = set() if debug: print(f"DEBUG: Found {len(account_elements)} potential account elements in dropdown") # Enhanced parsing with multiple strategies for elem in account_elements: try: text = await elem.inner_text() if not text or len(text.strip()) < 3: continue # Enhanced pattern matching for account detection has_account_pattern = ( 'ending in' in text or re.search(r'\d \d \d', text) or re.search(r'…\d{3,4}|XXX\d{3,4}|\.\.\.\d{3,4}', text) or (any(keyword in text.lower() for keyword in ['joint', 'ira', 'individual', 'bogle', 'roth', 'general', 'pla']) and re.search(r'\d{3}', text)) ) if not has_account_pattern: continue # Skip navigation and header elements skip_phrases = [ 'Edit Account Nicknames & Groups', 'Other Accounts', 'Brokerage Accounts', 'Schwab Bank Accounts', 'Select an account', 'Account selector', 'Choose account', 'Switch account' ] if any(skip_phrase in text for skip_phrase in skip_phrases): continue parsed = parse_account_text(text) if parsed and parsed['ending'] not in seen_endings: seen_endings.add(parsed['ending']) accounts.append(parsed) if debug: print(f"DEBUG: Successfully parsed account: {parsed['type']} ending in {parsed['ending']} (label: {parsed['label']})") elif debug and text.strip(): print(f"DEBUG: Failed to parse account text: '{text[:100]}'") except Exception as e: if debug: print(f"DEBUG: Error processing account element: {e}") continue # Close dropdown with enhanced cleanup try: await page.keyboard.press('Escape') # Try escape first await page.wait_for_timeout(500) await page.click('body') # Fallback click await page.wait_for_timeout(1000) except Exception: pass if debug: print(f"DEBUG: Successfully discovered {len(accounts)} accounts from dropdown") if accounts: for account in accounts: print(f"DEBUG: - {account['label']} ({account['type']} ending {account['ending']})") return accounts async def _resolve_export_dialog(page, debug: bool = False): """Find the export transactions dialog robustly. Prefer dialog with aria-labelledby containing 'export-transactions', otherwise choose the last visible dialog that contains a CSV option or Export button. """ dialogs = page.locator("div[role='dialog']") # Strategy 1: aria-labelledby hint candidate = page.locator("div[role='dialog'][aria-labelledby*='export-transactions']").last if await candidate.count() > 0 and await candidate.is_visible(): if debug: print("DEBUG: Found export dialog via aria-labelledby contains 'export-transactions'") return candidate # Strategy 2: visible dialog that contains CSV option csv_candidate = dialogs.filter(has=page.locator("text=CSV")).last if await csv_candidate.count() > 0 and await csv_candidate.is_visible(): if debug: print("DEBUG: Found export dialog via presence of CSV option") return csv_candidate # Strategy 3: visible dialog that contains an Export button export_candidate = dialogs.filter(has=page.locator("button:has-text('Export')")).last if await export_candidate.count() > 0 and await export_candidate.is_visible(): if debug: print("DEBUG: Found export dialog via presence of dialog Export button") return export_candidate # Strategy 4: fallback to last dialog if debug: print("DEBUG: Falling back to last dialog; may be incorrect") return dialogs.last async def _ensure_account_in_export_dialog(page, dialog, account_query: Optional[str], debug: bool = False) -> bool: """Ensure the export dialog, if it contains its own account selector, is set to the requested account. Returns True if either no dialog-level account selector exists or it was set/matched successfully. Returns False if a dialog-level selector exists but we could not match/select target account. """ if not account_query: return True try: # Try to detect a dialog-level account indicator current_in_dialog = await dialog.evaluate('''(root) => { const text = (root.textContent || '').trim(); return text ? text.substring(0, 300) : ''; }''') if debug: print(f"DEBUG: Export dialog initial text preview: {current_in_dialog[:120]}…") # If dialog text already contains our target pattern, consider it set def _to_match_str(q: str) -> str: return q.replace('_XXX', ' ending in ').replace('_', ' ') if current_in_dialog and _to_match_str(account_query) in current_in_dialog: if debug: print("DEBUG: Dialog appears to already reference the target account") return True # Try to find a dialog-level account selector trigger (combobox/button) selector_candidates = [ '[role="combobox"]', 'button:has-text("Account")', 'button[aria-haspopup="listbox"]', '[aria-controls*="account"], [id*="account"], [class*="account"]' ] found_trigger = None for sel in selector_candidates: try: loc = dialog.locator(sel).first if await loc.count() > 0 and await loc.is_visible(): found_trigger = loc break except Exception: continue if not found_trigger: # No obvious dialog-level selector; assume page-level selection applies if debug: print("DEBUG: No dialog-level account selector found; relying on page-level selection") return True # Open the dialog-level account dropdown try: await found_trigger.scroll_into_view_if_needed() await found_trigger.click() await page.wait_for_timeout(500) except Exception: pass # Find options container within dialog options_container = None option_container_selectors = [ '[role="listbox"]', '[role="menu"]', '[class*="menu"]', '[class*="list"]', '[class*="dropdown"]' ] for sel in option_container_selectors: try: el = await dialog.query_selector(sel) if el: options_container = el break except Exception: continue if not options_container: # Fall back to page-wide, but prefer the dialog scope options_container = dialog # Collect option-like elements and try to match option_elements = await options_container.query_selector_all('button, a, [role="option"], li, div, span') if debug: print(f"DEBUG: Found {len(option_elements)} dialog option elements") # Define a helper to parse option text target = None for elem in option_elements: try: text = await elem.inner_text() except Exception: continue if not text or len(text.strip()) < 3: continue parsed = parse_account_text(text) if not parsed: continue if (account_query == parsed['label'] or account_query == parsed['ending'] or account_query.lower() in parsed['label'].lower() or account_query.lower() in parsed['type'].lower()): target = (elem, parsed) break if not target: if debug: print("DEBUG: No matching account option found in dialog-level selector") return False elem, parsed = target # Click the matching option try: await page.evaluate('(el) => el.scrollIntoView({behavior: "smooth", block: "center"})', elem) except Exception: pass click_ok = False for _ in range(3): try: await elem.click(force=True) click_ok = True break except Exception: await page.wait_for_timeout(150) continue if not click_ok: if debug: print("DEBUG: Failed to click dialog-level account option") return False await page.wait_for_timeout(500) # Verify the dialog now references target account try: after_text = await dialog.evaluate('(root) => (root.textContent || "").trim().substring(0, 300)') except Exception: after_text = None if after_text and _to_match_str(account_query) in after_text: if debug: print("DEBUG: Dialog-level account selection verified") return True if debug: print("DEBUG: Dialog-level account selection not verified; proceeding anyway") return True except Exception as e: if debug: print(f"DEBUG: Exception in _ensure_account_in_export_dialog: {e}") return True async def switch_account_with_verification(page, account_query: str, debug: bool = False) -> bool: """Enhanced account switching with verification based on successful test script. Args: page: Playwright page object account_query: Account identifier (ending digits, type, or full label like 'PLA_Assets_XXX674') debug: Enable debug output Returns: True if switch was successful and verified, False otherwise """ if not account_query: return False try: if debug: print(f"DEBUG: Starting enhanced account switch for: {account_query}") # Parse the account query to determine target target_ending = None target_type = None if "_XXX" in account_query: parts = account_query.split("_XXX") target_type = parts[0].replace("_", " ") target_ending = parts[1][-3:] if len(parts[1]) >= 3 else parts[1] elif account_query.isdigit() and len(account_query) >= 3: target_ending = account_query[-3:] else: # Assume it's a type string like "PLA Assets" target_type = account_query if debug: print(f"DEBUG: Parsed target - type: '{target_type}', ending: '{target_ending}'") # Check current account selection first current_account = await page.evaluate(''' () => { const button = document.querySelector('#account-selector'); if (button) { return button.textContent.trim(); } return ''; } ''') if debug: print(f"DEBUG: Current account: {current_account}") # Check if we're already on the correct account has_target_keywords = False has_correct_ending = False if target_type: # Check for both parts of target type (e.g., "PLA" AND "Assets") type_parts = target_type.lower().split() has_target_keywords = all(part in current_account.lower() for part in type_parts) if target_ending: has_correct_ending = f"ending in {' '.join(target_ending)}" in current_account.lower() is_on_target = (has_target_keywords and has_correct_ending) if target_type and target_ending else \ has_target_keywords if target_type else \ has_correct_ending if target_ending else False if debug: print(f"DEBUG: Keywords match: {has_target_keywords}, Ending match: {has_correct_ending}") print(f"DEBUG: Already on target account: {is_on_target}") if is_on_target: if debug: print("DEBUG: Already on correct account, no switch needed") return True # Need to switch - open account selector dropdown if debug: print("DEBUG: Opening account selector dropdown...") await page.locator('.sdps-page-header__account-selector, #account-selector').first.click() await page.wait_for_timeout(2000) # Find all account options in dropdown all_account_links = await page.query_selector_all('a[id*="account-selector-header"]') if debug: print(f"DEBUG: Found {len(all_account_links)} account options in dropdown") # Look for target account option clicked_target = False for i, link in enumerate(all_account_links): link_text = await link.inner_text() if debug: print(f"DEBUG: Option {i+1}: {link_text}") # Check if this matches our target text_lower = link_text.lower() is_match = False if target_type and target_ending: type_parts = target_type.lower().split() has_type = all(part in text_lower for part in type_parts) has_ending = target_ending in link_text is_match = has_type and has_ending elif target_type: type_parts = target_type.lower().split() is_match = all(part in text_lower for part in type_parts) elif target_ending: is_match = target_ending in link_text if is_match: if debug: print(f"DEBUG: ✓ Found target account option: {link_text}") try: # Try force click first await link.click(force=True) clicked_target = True if debug: print("DEBUG: ✓ Clicked account option (force)") break except Exception as e1: if debug: print(f"DEBUG: Force click failed: {e1}") try: # Try JavaScript click as fallback await link.evaluate("element => element.click()") clicked_target = True if debug: print("DEBUG: ✓ Clicked account option (JS)") break except Exception as e2: if debug: print(f"DEBUG: JS click also failed: {e2}") continue if not clicked_target: if debug: print("DEBUG: ❌ Could not find or click target account option") return False # Wait for page to update after account switch if debug: print("DEBUG: Waiting for page to update after account switch...") await page.wait_for_timeout(3000) # Reload page to get fresh data for the new account if debug: print("DEBUG: Reloading page to get fresh data for selected account...") await page.reload() await page.wait_for_load_state('domcontentloaded') await page.wait_for_timeout(2000) # Verify the account switch was successful if debug: print("DEBUG: Verifying account switch...") final_account = await page.evaluate(''' () => { const button = document.querySelector('#account-selector'); if (button) { return button.textContent.trim(); } return ''; } ''') if debug: print(f"DEBUG: Final account: {final_account}") # Verify we're now on the target account final_has_keywords = False final_has_ending = False if target_type: type_parts = target_type.lower().split() final_has_keywords = all(part in final_account.lower() for part in type_parts) if target_ending: final_has_ending = f"ending in {' '.join(target_ending)}" in final_account.lower() final_is_on_target = (final_has_keywords and final_has_ending) if target_type and target_ending else \ final_has_keywords if target_type else \ final_has_ending if target_ending else False if final_is_on_target: if debug: print("DEBUG: ✅ Account switch verification successful!") return True else: if debug: print(f"DEBUG: ❌ Account switch verification failed!") print(f"DEBUG: Expected type '{target_type}' ending '{target_ending}'") print(f"DEBUG: Got: {final_account}") return False except Exception as e: if debug: print(f"DEBUG: Exception in switch_account_with_verification: {e}") return False async def switch_account_via_api(page, account_number: str, debug: bool = False) -> bool: """Switch account using Schwab's SwitchAccount API endpoint. Args: page: Playwright page object account_number: Account number in format "1234-5678" debug: Enable debug output Returns: True if switch was successful, False otherwise """ try: if debug: print(f"DEBUG: Switching to account {account_number} via API...") # Make POST request to SwitchAccount endpoint response = await page.evaluate(''' async (accountNumber) => { try { const response = await fetch('/Areas/MvcGlobal/SwitchAccount', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ selectionType: 'S', accountNumber: accountNumber }) }); const data = await response.json(); return { success: response.ok, status: response.status, data: data }; } catch (error) { return { success: false, error: error.message }; } } ''', account_number) if debug: print(f"DEBUG: SwitchAccount API response: {response}") if response.get('success') and response.get('status') == 200: # Wait for page to reflect the account change await page.wait_for_timeout(2000) # Verify the switch worked by checking current account current_account = await page.evaluate(''' () => { const header = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return header ? (header.textContent || '').trim() : ''; } ''') if debug: print(f"DEBUG: Account after API switch: {current_account[:100]}") return True else: if debug: print(f"DEBUG: SwitchAccount API failed: {response}") return False except Exception as e: if debug: print(f"DEBUG: Exception in switch_account_via_api: {e}") return False async def switch_account_on_page(page, account_query: Optional[str], context=None, debug: bool = False) -> bool: """Attempt to switch account using the page-level selector given a query like '604' or 'Joint'.""" if not account_query: return False try: # ENHANCED DEBUGGING: Add detailed logging for production troubleshooting if debug: print(f"DEBUG: === ACCOUNT SWITCH DEBUG START ===") print(f"DEBUG: Requested account: {account_query}") print(f"DEBUG: Current URL: {page.url}") # Ensure on the history page if 'accounts/history' not in page.url: if debug: print("DEBUG: Not on history page, navigating...") await goto_history(page, context=context, debug=debug) # ENHANCED DEBUGGING: Take screenshot before attempting switch if debug: try: png = await page.screenshot(full_page=True) save_debug_artifact(f"debug_before_account_switch_{account_query}.png", png) print("DEBUG: Screenshot saved before account switch attempt") except Exception as e: print(f"DEBUG: Failed to take screenshot: {e}") # Use enhanced selector discovery like discover_accounts_from_page click_success = False max_attempts = 3 for attempt in range(max_attempts): if debug: print(f"DEBUG: Account switch attempt {attempt + 1}/{max_attempts} for query: {account_query}") # ENHANCED DEBUGGING: Log current page state if debug: current_text = await page.evaluate('() => document.body.innerText.substring(0, 200)') print(f"DEBUG: Current page text preview: {current_text}") # Enhanced selector discovery with multiple patterns clicked = await page.evaluate(''' () => { const selectors = [ '.sdps-page-header__account-selector', '#account-selector', '[id*="account-selector"]', '[class*="account-selector"]', 'button[aria-label*="Account"]', 'button[title*="Account"]', '[data-testid*="account"]', 'button', // Generic button selector '[class*="account"][class*="dropdown"]', '[class*="account"][class*="button"]' ]; for (const selector of selectors) { const elements = document.querySelectorAll(selector); for (const button of elements) { if (button.offsetParent !== null && button.offsetWidth > 0 && button.offsetHeight > 0) { try { button.scrollIntoView({ behavior: 'auto', block: 'center' }); // Use a slight delay before clicking to avoid context destruction issues setTimeout(() => { try { button.click(); } catch(e) {} }, 10); return { success: true, selector: selector, text: (button.textContent || '').trim().substring(0, 50) }; } catch (e) { continue; } } } } return { success: false }; } ''') if debug: print(f"DEBUG: Account selector click result: {clicked}") if clicked.get('success'): click_success = True break # Wait before retry if attempt < max_attempts - 1: if debug: print(f"DEBUG: Click attempt {attempt + 1} failed, waiting before retry...") await page.wait_for_timeout(2000) if not click_success: if debug: print("DEBUG: All account selector click attempts failed") return False # ENHANCED DEBUGGING: Take screenshot after clicking selector if debug: try: png = await page.screenshot(full_page=True) save_debug_artifact(f"debug_after_selector_click_{account_query}.png", png) print("DEBUG: Screenshot saved after selector click") except Exception as e: print(f"DEBUG: Failed to take screenshot: {e}") # Wait for dropdown to appear await page.wait_for_timeout(300) # QUICK PATH: Try direct locator-based selection that pierces shadow DOM try: import re as _re # Build robust name regex: match type and ending in compact or spaced form q = str(account_query) target_type = None target_ending = None if '_XXX' in q: parts = q.split('_XXX') target_type = parts[0].replace('_', ' ') target_ending = parts[1][-3:] elif q.isdigit() and len(q) in (3, 4): target_ending = q[-3:] name_regex = None if target_type and target_ending: spaced = ' '.join(list(target_ending)) name_regex = _re.compile(rf"{_re.escape(target_type)}.*({_re.escape(target_ending)}|{_re.escape(spaced)}|XXX{_re.escape(target_ending)})", _re.I) elif target_ending: spaced = ' '.join(list(target_ending)) name_regex = _re.compile(rf"({_re.escape(target_ending)}|{_re.escape(spaced)}|XXX{_re.escape(target_ending)})", _re.I) else: name_regex = _re.compile(_re.escape(q), _re.I) # Try ARIA-controlled listbox via header button first try: btn_loc = page.locator('#account-selector').first controls_id = None try: controls_id = await btn_loc.get_attribute('aria-controls') except Exception: controls_id = None if controls_id: listbox = page.locator(f'#{controls_id}') if await listbox.count() > 0 and await listbox.is_visible(): # focus listbox and use get_by_role within it try: await listbox.focus() except Exception: pass target_loc = None # prefer role=option inside listbox try: opt = listbox.get_by_role('option', name=name_regex) if await opt.count() > 0: target_loc = opt.first except Exception: target_loc = None if not target_loc: # fallback to text filter for css in ['[role="option"]', 'button', 'a', 'div', 'span', 'li']: try: cand = listbox.locator(css).filter(has_text=name_regex) if await cand.count() > 0: target_loc = cand.first break except Exception: continue if target_loc is not None: try: await target_loc.scroll_into_view_if_needed() except Exception: pass try: async with page.expect_navigation(wait_until='domcontentloaded', timeout=10000): await target_loc.click() except Exception: await target_loc.click(force=True) try: await page.wait_for_load_state('domcontentloaded', timeout=5000) except Exception: pass # Verify header reflects change try: header_now = await page.evaluate('''() => { const sel = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return sel ? (sel.textContent || '').trim() : ''; }''') except Exception: header_now = '' if debug and header_now: print(f"DEBUG: Header after listbox-controlled click: {header_now[:120]}...") ok = False if target_ending: spaced = ' '.join(list(target_ending)) if header_now and (target_ending in header_now or spaced in header_now): ok = True if target_type and ok: ok = target_type.lower() in (header_now or '').lower() if ok: if debug: print("DEBUG: Listbox-controlled selection verified") return True except Exception as e: if debug: print(f"DEBUG: aria-controls listbox path failed: {e}") # Try common roles first for sel in [ ('role=menuitem', page.get_by_role('menuitem', name=name_regex)), ('role=option', page.get_by_role('option', name=name_regex)), ('button', page.locator('button').filter(has_text=name_regex)), ('a', page.locator('a').filter(has_text=name_regex)), ('div', page.locator('div').filter(has_text=name_regex)), ('span', page.locator('span').filter(has_text=name_regex)), ]: label, locator = sel try: count = await locator.count() except Exception: count = 0 if count and count > 0: target_loc = locator.first try: await target_loc.scroll_into_view_if_needed() except Exception: pass try: async with page.expect_navigation(wait_until='domcontentloaded', timeout=10000): await target_loc.click() except Exception: await target_loc.click(force=True) try: await page.wait_for_load_state('domcontentloaded', timeout=5000) except Exception: pass # Verify header reflects change try: header_now = await page.evaluate('''() => { const sel = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return sel ? (sel.textContent || '').trim() : ''; }''') except Exception: header_now = '' if debug and header_now: print(f"DEBUG: Header after locator-based click ({label}): {header_now[:120]}...") ok = False if target_ending: spaced = ' '.join(list(target_ending)) if header_now and (target_ending in header_now or spaced in header_now): ok = True if target_type and ok: ok = target_type.lower() in (header_now or '').lower() if ok: if debug: print("DEBUG: Locator-based selection verified") return True if debug: print("DEBUG: Locator-based selection did not find a clickable element; falling back") except Exception as e: if debug: print(f"DEBUG: Locator-based selection failed: {e}") # ENHANCED DEBUGGING: Check what's actually visible after dropdown click if debug: visible_elements = await page.evaluate(''' () => { const elements = document.querySelectorAll('[role="menu"], [role="listbox"], [role="dialog"], [class*="dropdown"], [class*="menu"], [class*="overlay"], [class*="modal"], [class*="account"], [class*="selector"], div[style*="position: absolute"], div[style*="z-index"]'); return Array.from(elements).slice(0, 5).map(el => ({ tag: el.tagName, class: el.className, id: el.id, text: (el.textContent || el.innerText || '').trim().substring(0, 100), visible: el.offsetParent !== null && el.offsetWidth > 0 && el.offsetHeight > 0 })); } ''') print(f"DEBUG: Visible dropdown elements: {visible_elements}") # Discover available accounts from the dropdown accounts = await discover_accounts_from_page(page, debug=debug) if not accounts: if debug: print("DEBUG: No accounts discovered from dropdown") return False if debug: print(f"DEBUG: Discovered {len(accounts)} accounts from dropdown") for acc in accounts: print(f"DEBUG: - {acc['label']} ({acc['type']} ending {acc['ending']})") # ENHANCED DEBUGGING: Verify the account we're looking for exists if debug: matching_accounts = [acc for acc in accounts if account_query == acc['label'] or account_query == acc['ending'] or account_query.lower() in acc['label'].lower() or account_query.lower() in acc['type'].lower()] print(f"DEBUG: Accounts matching query '{account_query}': {matching_accounts}") # Find matching account using robust matching logic target_account = None # Try multiple matching strategies for account in accounts: # Strategy 1: Exact label match (e.g., "PLA_Assets_XXX674") if account_query == account['label']: target_account = account break # Strategy 2: Match by ending digits (e.g., "674") if account_query == account['ending']: target_account = account break # Strategy 3: Case-insensitive substring match in label if account_query.lower() in account['label'].lower(): target_account = account break # Strategy 4: Match by account type (e.g., "PLA" in "PLA_Assets_XXX674") if account_query.lower() in account['type'].lower(): target_account = account break if not target_account: if debug: print(f"DEBUG: No matching account found for query: {account_query}") print(f"DEBUG: Available accounts: {[acc['label'] for acc in accounts]}") return False if debug: print(f"DEBUG: Found target account: {target_account['label']}") # ENHANCED DEBUGGING: Take screenshot before clicking target account if debug: try: png = await page.screenshot(full_page=True) save_debug_artifact(f"debug_before_target_click_{account_query}.png", png) print("DEBUG: Screenshot saved before target account click") except Exception as e: print(f"DEBUG: Failed to take screenshot: {e}") # Try a direct ARIA role-based click first for reliability try: ending = target_account['ending'] spaced = ' '.join(list(ending)) acc_type = target_account['type'] # Build a tolerant regex: type followed by either compact or spaced ending or XXX### import re as _re name_regex = _re.compile(rf"{_re.escape(acc_type)}.*({_re.escape(ending)}|{_re.escape(spaced)}|XXX{_re.escape(ending)})", _re.I) # Prefer within a visible listbox if present option_locator = page.locator('[role="listbox"] [role="option"]').filter(has_text=name_regex) count = await option_locator.count() if count == 0: # Fallback to any role=option in document option_locator = page.locator('[role="option"]').filter(has_text=name_regex) count = await option_locator.count() if count > 0: target_opt = option_locator.first try: await target_opt.scroll_into_view_if_needed() except Exception: pass try: async with page.expect_navigation(wait_until='domcontentloaded', timeout=15000): await target_opt.click() except Exception: await target_opt.click(force=True) try: await page.wait_for_load_state('domcontentloaded', timeout=8000) except Exception: pass # Verify header reflects new selection try: header_after = await page.evaluate('''() => { const sel = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return sel ? (sel.textContent || '').trim() : ''; }''') except Exception: header_after = '' if header_after and acc_type.lower() in header_after.lower() and (ending in header_after or spaced in header_after): if debug: print("DEBUG: Role=option click succeeded; account appears selected") account_clicked = True # Close dropdown best-effort try: await page.keyboard.press('Escape') await page.wait_for_timeout(200) except Exception: pass # short settle await page.wait_for_timeout(300) else: if debug: print("DEBUG: Role=option click did not verify selection; falling back to element strategies") except Exception as e: if debug: print(f"DEBUG: Role=option strategy failed: {e}") # Try to find and click the target account option # Get all potential account elements dropdown_candidates = await page.evaluate(''' () => { const selectors = [ '[role="menu"]', '[role="listbox"]', '[role="dialog"]', '[class*="dropdown"]', '[class*="menu"]', '[class*="overlay"]', '[class*="modal"]', '[class*="account"]', '[class*="selector"]', 'div[style*="position: absolute"]', 'div[style*="z-index"]' ]; const candidates = []; for (const selector of selectors) { try { const elements = document.querySelectorAll(selector); elements.forEach((elem, i) => { if (elem.offsetParent !== null && elem.offsetWidth > 0 && elem.offsetHeight > 0) { const text = (elem.textContent || elem.innerText || '').trim(); const hasAccountPattern = ( text.includes('ending in') || /…\\d{3,4}|XXX\\d{3,4}|\\.\\.\\.\\d{3,4}/.test(text) || (/joint|ira|individual|bogle|account/i.test(text) && /\\d{3}/.test(text)) ); if (text.length > 10 && hasAccountPattern) { candidates.push({ selector: selector, index: i, text: text.substring(0, 200), score: hasAccountPattern ? 1 : 0 }); } } }); } catch (e) { // Skip problematic selectors } } return candidates.sort((a, b) => b.score - a.score); } ''') if not dropdown_candidates: if debug: print("DEBUG: No dropdown candidates found") return False # Use the first candidate which actually contains account text chosen = None for cand in dropdown_candidates: try: els = await page.query_selector_all(cand['selector']) if not els or len(els) <= cand['index']: continue el = els[cand['index']] txt = await el.text_content() if txt and ('ending in' in txt or re.search(r'\d \d \d', txt) or re.search(r'XXX\d{3,4}', txt) or 'Account Selector' in txt): chosen = cand break except Exception: continue if not chosen: chosen = dropdown_candidates[0] dropdown_selector = chosen['selector'] dropdown_index = chosen['index'] if debug: print(f"DEBUG: Using dropdown selector: {dropdown_selector}, index: {dropdown_index}") dropdown = await page.query_selector_all(dropdown_selector) if not dropdown or len(dropdown) <= dropdown_index: if debug: print("DEBUG: Dropdown element not found") return False dropdown = dropdown[dropdown_index] # Get all account elements in the dropdown account_elements = await dropdown.query_selector_all('button, a, [role="option"], li, div, span') if debug: print(f"DEBUG: Found {len(account_elements)} account elements in dropdown") # ENHANCED DEBUGGING: Log all account elements and their text if debug: for i, elem in enumerate(account_elements): try: text = await elem.inner_text() if text and len(text.strip()) >= 3: print(f"DEBUG: Account element {i}: '{text[:100]}'") except Exception as e: print(f"DEBUG: Error getting text from element {i}: {e}") account_clicked = False for elem in account_elements: try: text = await elem.inner_text() if not text or len(text.strip()) < 3: continue # Parse the account text parsed = parse_account_text(text) if not parsed: continue # ENHANCED DEBUGGING: Only log the target account match if (parsed['label'] == target_account['label'] or parsed['ending'] == target_account['ending']): if debug: print(f"DEBUG: Found target account: {parsed['label']}") # ENHANCED DEBUGGING: Take screenshot before clicking if debug: try: png = await page.screenshot(full_page=True) save_debug_artifact(f"debug_before_account_click_{account_query}_target_{target_account['label']}.png", png) except Exception as e: pass # Prefer clicking a truly clickable ancestor (button/a/role=option/menuitem) try: clickable = await page.evaluate_handle('''(el) => { let e = el; for (let i = 0; i < 6 && e; i++) { const role = (e.getAttribute && e.getAttribute('role')) || ''; const tag = (e.tagName || '').toUpperCase(); if (tag === 'BUTTON' || tag === 'A' || role === 'option' || role === 'menuitem') return e; e = e.parentElement; } return el; }''', elem) except Exception: clickable = elem # If there's an anchor ancestor with href, navigate directly as a first-class strategy if not account_clicked: try: # Try to find a nearest anchor and interact depending on href anchor_handle = await page.evaluate_handle('''(el) => { function findAnchor(node){ let e = node; for (let i = 0; i < 6 && e; i++) { if (e.tagName && e.tagName.toUpperCase() === 'A' && (e.href || e.getAttribute('href'))) return e; e = e.parentElement; } return null; } return findAnchor(el); }''', elem) if anchor_handle: try: href = await page.evaluate('(a) => a.getAttribute("href") || a.href || ""', anchor_handle) except Exception: href = '' if href and isinstance(href, str) and not href.lower().startswith('javascript'): if debug: print(f"DEBUG: Navigating directly to account URL: {href}") try: await page.goto(href, timeout=30000) await page.wait_for_selector('.sdps-page-header__account-selector, #account-selector', timeout=15000) account_clicked = True except Exception as e: if debug: print(f"DEBUG: Direct navigation failed: {e}") else: # Fallback: simulate a native click on the anchor to trigger SPA handler if debug: print("DEBUG: Clicking javascript: anchor to trigger SPA selection") try: await page.evaluate('(a) => { a.click(); }', anchor_handle) # Brief wait to allow SPA to process await page.wait_for_timeout(500) except Exception as e: if debug: print(f"DEBUG: Anchor click via JS failed: {e}") except Exception as e: if debug: print(f"DEBUG: Anchor search/click failed: {e}") # ENHANCED FIX: Try multiple click strategies for visibility issues click_success = False async def _click_with_nav(action_desc: str, click_fn): nonlocal click_success try: # Many times selecting an account triggers a navigation/reload. # Set up the navigation expectation BEFORE triggering the click. try: async with page.expect_navigation(wait_until='domcontentloaded', timeout=15000): await click_fn() click_success = True if debug: print(f"DEBUG: Click with navigation succeeded ({action_desc})") return except Exception as nav_err: # No navigation captured; fallback to plain click and wait if debug: print(f"DEBUG: No navigation captured ({action_desc}): {nav_err}") await click_fn() try: await page.wait_for_load_state('domcontentloaded', timeout=2000) except Exception: pass await page.wait_for_timeout(400) click_success = True if debug: print(f"DEBUG: Click without navigation succeeded ({action_desc})") except Exception as e: if debug: print(f"DEBUG: Click attempt failed ({action_desc}): {e}") # Strategy 1: Enhanced scroll and force click try: # Pre-scroll to element then click with navigation capture await page.evaluate('(element) => element.scrollIntoView({behavior: "smooth", block: "center"})', clickable) await page.wait_for_timeout(200) await _click_with_nav("scroll+force", lambda: clickable.click(force=True)) except Exception as e: if debug: print(f"DEBUG: Enhanced scroll + force click failed: {e}") # Strategy 2: Multiple scroll strategies if not click_success: try: # Try different scroll positions await page.evaluate('(element) => { const rect = element.getBoundingClientRect(); window.scrollTo(rect.left, rect.top - 100); }', clickable) await page.wait_for_timeout(200) await _click_with_nav("pos-scroll+force", lambda: clickable.click(force=True)) if click_success and debug: print(f"DEBUG: Click successful with position scroll") except Exception as e: if debug: print(f"DEBUG: Position scroll + click failed: {e}") # Strategy 3: Make element visible then click if not click_success: try: await page.evaluate('(element) => { element.style.visibility = "visible"; element.style.display = "block"; element.style.opacity = "1"; element.scrollIntoView({block: "center"}); }', clickable) await page.wait_for_timeout(200) await _click_with_nav("make-visible+force", lambda: clickable.click(force=True)) if click_success and debug: print(f"DEBUG: Click successful after making visible") except Exception as e: if debug: print(f"DEBUG: Make visible + click failed: {e}") # Strategy 4: JavaScript click with enhanced parameters if not click_success: try: await _click_with_nav( "dispatchEvent(MouseEvent)", lambda: page.evaluate('''(element) => { element.dispatchEvent(new MouseEvent("click", {bubbles: true, cancelable: true, view: window})); }''', clickable) ) if click_success and debug: print(f"DEBUG: Click successful with MouseEvent") except Exception as e: if debug: print(f"DEBUG: MouseEvent click failed: {e}") # Strategy 5: Hover then multiple click attempts if not click_success: try: await clickable.hover(timeout=2000) await page.wait_for_timeout(150) # Try multiple rapid clicks with nav capture on first try: await _click_with_nav("hover+rapid-1", lambda: clickable.click(force=True)) except Exception: pass if not click_success: for attempt in range(2): try: await clickable.click(force=True) await page.wait_for_timeout(100) click_success = True break except: continue if click_success and debug: print(f"DEBUG: Click successful with hover + rapid clicks") except Exception as e: if debug: print(f"DEBUG: Hover + rapid clicks failed: {e}") # Strategy 6: Coordinate click on element's bounding box within its scrollable container if not click_success: try: # Try to scroll the nearest scrollable ancestor to reveal element try: await page.evaluate('''(el) => { function findScrollable(node){ let e = node; for (let i=0; i<8 && e; i++){ const style = getComputedStyle(e); if (/(auto|scroll)/.test(style.overflowY)) return e; e = e.parentElement; } return null; } const sc = findScrollable(el) || document.scrollingElement || document.body; const r = el.getBoundingClientRect(); const scRect = sc.getBoundingClientRect ? sc.getBoundingClientRect() : {top:0,left:0,height:window.innerHeight}; const targetY = r.top + (r.height/2) - (scRect.height/2); try { sc.scrollBy({ top: targetY, behavior: 'auto' }); } catch(_) { sc.scrollTop += targetY; } }''', clickable) except Exception: pass # Compute viewport coordinates and click bbox = await clickable.bounding_box() if not bbox: # Fallback to DOM rect rect = await page.evaluate('(el) => { const r = el.getBoundingClientRect(); return {x:r.left, y:r.top, width:r.width, height:r.height}; }', clickable) bbox = rect if bbox and bbox['width'] > 2 and bbox['height'] > 2: x = bbox['x'] + bbox['width']/2 y = bbox['y'] + bbox['height']/2 await page.mouse.move(x, y) await page.mouse.click(x, y) await page.wait_for_timeout(600) click_success = True if debug: print("DEBUG: Coordinate click attempted on target element") except Exception as e: if debug: print(f"DEBUG: Coordinate click failed: {e}") if click_success: # Mark as clicked BEFORE waiting, since navigation may occur account_clicked = True if debug: print(f"DEBUG: Account option clicked; waiting for potential navigation/reload") try: # Try to catch a navigation if it occurs try: async with page.expect_navigation(timeout=5000): pass # If a navigation was already triggered by the click, this may catch it except Exception: # No navigation event captured; proceed with load-state wait pass try: await page.wait_for_load_state('domcontentloaded', timeout=8000) except Exception: pass await page.wait_for_timeout(500) except Exception as e: if debug: print(f"DEBUG: Post-click wait encountered exception: {e}") if debug: print(f"DEBUG: Click sequence complete for {target_account['label']}") break else: if debug: print(f"DEBUG: All click strategies failed for account: {parsed['label']}") except Exception as e: if debug: print(f"DEBUG: Error processing account element: {e}") continue if not account_clicked: if debug: print(f"DEBUG: Could not click target via element strategies; attempting keyboard navigation") # Attempt keyboard navigation on the account selector try: # Re-open selector to ensure focus is on the dropdown await page.evaluate('''() => { const btn = document.querySelector('.sdps-page-header__account-selector, #account-selector'); if (btn && btn.click) btn.click(); }''') await page.wait_for_timeout(500) except Exception: pass # Determine current selection from header try: header_text = await page.evaluate('''() => { const sel = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return sel ? (sel.textContent || '').trim() : ''; }''') except Exception: header_text = '' current_parsed = parse_account_text(header_text) if header_text else None current_label = current_parsed['label'] if current_parsed else None # Compute index positions def _find_index(label: str) -> int: for i, acc in enumerate(accounts): if label == acc['label']: return i # fallback by ending if label and 'XXX' in label: ending = label.split('XXX')[-1] for i, acc in enumerate(accounts): if acc['ending'] == ending: return i return -1 current_index = _find_index(current_label) if current_label else -1 target_index = _find_index(target_account['label']) if debug: print(f"DEBUG: Keyboard nav indices - current: {current_index}, target: {target_index}") try: # Focus the account selector button btn = page.locator('#account-selector').first if await btn.count() == 0: btn = page.locator('.sdps-page-header__account-selector').first try: await btn.focus() except Exception: pass # Open dropdown via keyboard if needed try: await page.keyboard.press('Enter') await page.wait_for_timeout(200) except Exception: pass # If indices are known, compute steps; else scan downwards up to N max_steps = max(len(accounts) + 5, 10) if current_index >= 0 and target_index >= 0: steps = target_index - current_index key = 'ArrowDown' if steps >= 0 else 'ArrowUp' for _ in range(abs(steps)): await page.keyboard.press(key) await page.wait_for_timeout(120) else: # Blind scan for _ in range(max_steps): await page.keyboard.press('ArrowDown') await page.wait_for_timeout(80) # Confirm selection await page.keyboard.press('Enter') await page.wait_for_timeout(300) # Verify header updated try: await page.wait_for_load_state('domcontentloaded', timeout=5000) except Exception: pass try: new_header_text = await page.evaluate('''() => { const sel = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return sel ? (sel.textContent || '').trim() : ''; }''') except Exception: new_header_text = '' if new_header_text and target_account['type'].lower() in new_header_text.lower() and target_account['ending'] in new_header_text: if debug: print("DEBUG: Keyboard navigation succeeded; account appears selected") account_clicked = True else: if debug: print("DEBUG: Keyboard navigation did not confirm selection") except Exception as e: if debug: print(f"DEBUG: Keyboard navigation failed: {e}") if not account_clicked: if debug: print(f"DEBUG: All primary switch methods failed for {account_query}, attempting Summary page fallback...") try: # Go to summary page if not already there if "accounts/summary" not in page.url: await page.goto("https://client.schwab.com/app/accounts/summary/#/") await page.wait_for_timeout(5000) # Find the row for this account in the summary table and click its link clicked_summary = await page.evaluate(""" (query) => { const rows = Array.from(document.querySelectorAll('sdps-table-row, tr')); const targetRow = rows.find(r => r.innerText.includes(query) || r.textContent.includes(query)); if (targetRow) { const link = targetRow.querySelector('a.acctNavigate-button-link'); if (link) { link.click(); return true; } } return false; } """, account_query) if clicked_summary: if debug: print(f"DEBUG: Successfully clicked account {account_query} on summary page") await page.wait_for_timeout(5000) return True except Exception as summary_err: if debug: print(f"DEBUG: Summary page fallback failed: {summary_err}") if debug: print(f"DEBUG: Could not find and click/select target account: {target_account['label']}") print(f"DEBUG: Target account details: {target_account}") # Close dropdown (best-effort) try: if not page.is_closed(): await page.keyboard.press('Escape') await page.wait_for_timeout(300) await page.click('body') await page.wait_for_timeout(500) except Exception: pass # CRITICAL: Verify the account switch actually worked using the same logic as the working test if account_clicked: if debug: print("DEBUG: Verifying account switch actually worked...") # Wait for UI to update try: await page.wait_for_load_state('domcontentloaded', timeout=8000) except Exception: pass await page.wait_for_timeout(500) # Get the current active account using the same method as the working test try: current_active_account = await page.evaluate(''' () => { const selector = document.querySelector('.sdps-page-header__account-selector'); return selector ? selector.textContent?.trim() : null; } ''') except Exception: current_active_account = None if debug and current_active_account: print(f"DEBUG: Current active account after switch: {current_active_account[:100]}...") # Use the SAME verification logic as the working test script # Check if the account text actually contains our target account_switch_verified = ( target_account['type'].lower() in current_active_account.lower() and target_account['ending'] in current_active_account ) if account_switch_verified: if debug: print("✅ SUCCESS: Account switch VERIFIED - target account is now active") print(f"✅ Found {target_account['type']} and {target_account['ending']} in account text") return True else: if debug: print("❌ FAILURE: Account switch failed verification - target account not active") print(f"❌ Expected: {target_account['type']} and {target_account['ending']}") print(f"❌ Got: {current_active_account[:100]}...") return False else: if debug: print("❌ FAILURE: Could not verify account switch - no active account found") return False else: if debug: print("❌ FAILURE: Could not click target account") return False except Exception as e: if debug: print(f"DEBUG: Exception in switch_account_on_page: {e}") import traceback print(f"DEBUG: Full traceback: {traceback.format_exc()}") return False async def perform_export_download_enhanced(page, time_period: Optional[str] = None, account: Optional[str] = None, debug: bool = False, context=None, preserve_filename: bool = True) -> Dict[str, Any]: """Enhanced export function with reliable account switching and filename preservation. Args: page: Playwright page object time_period: Time period for export (e.g., "Current Month") account: Account identifier to switch to before export debug: Enable debug output context: Browser context for page recovery preserve_filename: If True, save with original Schwab filename Returns: Dict containing export results and metadata """ if debug: print("DEBUG: Starting enhanced export download...") try: # Ensure we're on the history page await goto_history(page, context=context, debug=debug) # Switch to target account if specified if account: if debug: print(f"DEBUG: Attempting to switch to account: {account}") success = await switch_account_with_verification(page, account, debug=debug) if not success: error_msg = f"Failed to switch to account '{account}'. Please manually select the correct account and retry." if debug: print(f"DEBUG: {error_msg}") return { "error": error_msg, "account_requested": account, "success": False } # Open export panel if debug: print("DEBUG: Opening export panel...") await open_export_panel(page, debug=debug) # Wait for export dialog await page.wait_for_timeout(2000) # Find export dialog export_dialog = None dialogs = await page.query_selector_all("div[role='dialog']") for i, dialog in enumerate(dialogs): try: dialog_id = await dialog.get_attribute('aria-labelledby') dialog_body_id = await dialog.get_attribute('aria-describedby') if (dialog_id and 'export-transactions' in dialog_id) or \ (dialog_body_id and 'export-transactions' in dialog_body_id): export_dialog = dialog if debug: print(f"DEBUG: Found export transactions dialog by ID") break except: pass # Also check dialog content try: dialog_text = await dialog.inner_text() if any(keyword in dialog_text.lower() for keyword in ['export transactions', 'csv', 'download']): export_dialog = dialog if debug: print(f"DEBUG: Found export dialog by content") break except: pass if not export_dialog: return { "error": "Could not find export dialog", "success": False } # Configure export settings if time_period: try: await select_time_period(page, time_period, container=export_dialog, debug=debug) except Exception as e: if debug: print(f"DEBUG: Time period selection failed: {e}") try: await ensure_csv_format(page, container=export_dialog, debug=debug) except Exception as e: if debug: print(f"DEBUG: CSV format selection failed: {e}") # Find and click export button export_selectors = [ "button:has-text('Export')", "button[aria-label*='export']", "button[aria-label*='Export']", "input[type='submit'][value*='Export']", "button:has-text('Download')", ".export-button", "[data-testid*='export']" ] export_btn = None for selector in export_selectors: try: btn = await export_dialog.query_selector(selector) if btn and await btn.is_visible(): export_btn = btn if debug: print(f"DEBUG: Found export button with selector: {selector}") break except Exception: continue if not export_btn: return { "error": "Could not find export button in dialog", "success": False } # Set up download handler and click export download_promise = page.wait_for_event('download') try: await export_btn.click(force=True) if debug: print("DEBUG: Export button clicked (force)") except Exception as e1: if debug: print(f"DEBUG: Force click failed: {e1}") try: await export_btn.evaluate("element => element.click()") if debug: print("DEBUG: Export button clicked (JS)") except Exception as e2: if debug: print(f"DEBUG: JS click also failed: {e2}") return { "error": "Failed to click export button", "success": False } # Wait for download try: download = await asyncio.wait_for(download_promise, timeout=30) # Save the download suggested_filename = download.suggested_filename if preserve_filename: download_path = f"./{suggested_filename}" else: # Use timestamp-based filename from datetime import datetime download_path = f"./export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" await download.save_as(download_path) # Get file info import os file_size = os.path.getsize(download_path) # Parse filename info filename_info = parse_suggested_filename(suggested_filename) if suggested_filename else {} result = { "success": True, "filename": suggested_filename, "saved_path": download_path, "file_size": file_size, "account_info": filename_info, "time_period": time_period, "account_requested": account } if debug: print(f"DEBUG: ✅ Export successful!") print(f"DEBUG: Filename: {suggested_filename}") print(f"DEBUG: Saved to: {download_path}") print(f"DEBUG: File size: {file_size:,} bytes") return result except asyncio.TimeoutError: return { "error": "Download timeout - export may have failed", "success": False } except Exception as e: return { "error": f"Download failed: {str(e)}", "success": False } except Exception as e: if debug: print(f"DEBUG: Exception in perform_export_download_enhanced: {e}") return { "error": f"Export failed: {str(e)}", "success": False } async def perform_export_download(page, time_period: Optional[str] = None, account: Optional[str] = None, debug: bool = False, context=None) -> Dict[str, Any]: if debug: print("DEBUG: Navigating to history page…") # If the page was closed due to prior actions, reopen it try: if page.is_closed(): if context is None: raise Exception("Playwright page is closed and no context provided to recover") from ...browser.client import new_page page = await new_page(context) except Exception: pass await goto_history(page, context=context, debug=debug) # Check current account but DO NOT attempt switching to avoid context closure current_account_info = None if account and debug: try: current_account = await page.evaluate(''' () => { const header = document.querySelector('.sdps-page-header__account-selector, #account-selector'); return header ? (header.textContent || '').trim() : ''; } ''') print(f"DEBUG: Current account text: {current_account[:200]}") # Parse current account info without switching target_ending = account[-3:] if len(account) >= 3 else account account_type = None if "_XXX" in account: account_type = account.split("_XXX")[0].replace("_", " ") # Check if selected account matches target by parsing the "Selected" portion account_matched = False if "Selected" in current_account: selected_portion = current_account.split("Selected")[0] if debug: print(f"DEBUG: Currently selected portion: '{selected_portion}'") # More robust matching logic if account_type and target_ending: type_match = account_type.lower() in selected_portion.lower() ending_match = (target_ending in selected_portion or f"ending in {' '.join(target_ending)}" in selected_portion.lower()) account_matched = type_match and ending_match elif target_ending: account_matched = (target_ending in selected_portion or f"ending in {' '.join(target_ending)}" in selected_portion.lower()) else: # Fallback to substring match for account type only account_matched = account.lower() in selected_portion.lower() current_account_info = { 'text': current_account, 'matched': account_matched, 'target_type': account_type, 'target_ending': target_ending } if account_matched: if debug: print(f"DEBUG: ✅ Current account matches target {account}") else: if debug: print(f"DEBUG: ⚠️ Current account does NOT match target {account}") print(f"DEBUG: Target type: '{account_type}', ending: '{target_ending}'") print(f"DEBUG: IMPORTANT: Account switching via UI causes browser crashes.") print(f"DEBUG: The export will proceed and verify by filename. If wrong account,") print(f"DEBUG: user will get clear instructions to manually select the correct account.") except Exception as e: if debug: print(f"DEBUG: Could not check current account: {e}") print(f"DEBUG: Will proceed with export and verify by filename") # Perform export with verification and retry if filename doesn't match target account max_export_attempts = 3 last_meta = None for export_attempt in range(max_export_attempts): if debug: print(f"DEBUG: Export attempt {export_attempt + 1}/{max_export_attempts}…") # Ensure page still alive before continuing page_closed = False try: page_closed = page.is_closed() except Exception: page_closed = True if page_closed: if context is None: raise Exception("Playwright page is closed and no context provided to recover") from ...browser.client import new_page page = await new_page(context) await goto_history(page, context=context, debug=debug) # NOTE: We don't re-attempt account switching here anymore # Account switching is done BEFORE the export loop to avoid page closure issues await open_export_panel(page, debug=debug) # Scope to the export dialog for subsequent interactions if debug: print("DEBUG: Resolving export dialog…") dialog = await _resolve_export_dialog(page, debug=debug) if debug: try: png = await page.screenshot(full_page=True) save_debug_artifact("debug_export_dialog_open.png", png) except Exception: pass # Ensure any dialog-level account selector also targets requested account await _ensure_account_in_export_dialog(page, dialog, account, debug=debug) await select_time_period(page, time_period, container=dialog, debug=debug) await ensure_csv_format(page, container=dialog, debug=debug) # Re-verify account before download (header or dialog) if account: if debug: print("DEBUG: Final account verification before download…") pre_download_account = await page.evaluate(''' () => { const header = document.querySelector('.sdps-page-header__account-selector, #account-selector'); const headerText = header ? (header.textContent || '').trim() : ''; return headerText; } ''') if debug: print(f"DEBUG: Header account before download: {pre_download_account}") # Try dialog scope too try: dialog_text = await dialog.evaluate('(root) => (root.textContent || "").trim().substring(0, 300)') except Exception: dialog_text = None if debug and dialog_text: print(f"DEBUG: Dialog account preview before download: {dialog_text[:120]}…") # Trigger download via the Export button inside the dialog try: async with page.expect_download(timeout=60000) as download_info: await dialog.locator("button:has-text('Export')").first.click() download = await download_info.value except Exception: # Fallback: try clicking any visible Export inside dialog with force async with page.expect_download(timeout=60000) as download_info: await dialog.locator("button:has-text('Export')").first.click(force=True) download = await download_info.value suggested = download.suggested_filename meta = parse_suggested_filename(suggested) last_meta = meta if debug: print("DEBUG: Download verification:") print(f"DEBUG: Requested account: {account}") print(f"DEBUG: Downloaded filename: {suggested}") print(f"DEBUG: Parsed account from filename: {meta.get('label', 'Unknown')}") # Verify the downloaded filename corresponds to the requested account if not account or _label_matches_account_query(account, meta.get('label', '')): # Accept this download temp_path = f"/tmp/{suggested}" await download.save_as(temp_path) with open(temp_path, 'rb') as f: csv_content = f.read() if debug: print(f"DEBUG: Download complete: {suggested} -> {temp_path}") return {"content": csv_content, "filename": suggested, "path": temp_path, **meta} # Mismatch: close dialog, re-verify account, and retry if debug: print("⚠️ WARNING: Downloaded filename doesn't match requested account; retrying export") try: await page.keyboard.press('Escape') await page.wait_for_timeout(300) await page.click('body') except Exception: pass # NOTE: We no longer attempt to switch accounts here as it causes page closure # Account switching is done once BEFORE the export loop # Just give UI time to settle before retry await page.wait_for_timeout(1500) # As a last resort before next attempt, reload the history page # The account selection should be preserved in the session try: await goto_history(page, context=context, debug=debug) except Exception: pass # If we reach here, all export attempts produced mismatched account files current_label = (last_meta or {}).get('label', 'Unknown') # Enhanced error message with clear resolution steps error_msg = f"""🚨 ACCOUNT MISMATCH: Wrong account transactions exported REQUESTED: {account} EXPORTED: {current_label} 🔧 SOLUTION - Manual Account Selection Required: Due to Schwab's website design, automatic account switching causes browser crashes. Please follow these steps: 1. 🌐 Open Schwab website manually: https://client.schwab.com 2. 📋 Navigate to: Accounts → History → Transactions 3. 🎯 Click the account selector dropdown (top of page) 4. ✅ Select the account: {account} 5. 🔄 Re-run the scraper (it will use the manually selected account) 💡 WHY THIS HAPPENS: - Schwab's account switching triggers complete page reloads - This closes the browser automation session - Manual selection before running scraper works perfectly 📖 ALTERNATIVE: Use the account that's currently selected ({current_label})""" raise Exception(error_msg)