All checks were successful
Build and Push Docker Image / build (push) Successful in 34s
102 lines
3.2 KiB
Python
102 lines
3.2 KiB
Python
import asyncio
|
|
|
|
from . import unified_api
|
|
from .browser.session import get_session_info as _session_info
|
|
|
|
|
|
def get_morningstar_data(ticker: str, debug: bool = False):
|
|
"""Synchronous wrapper for `unified_api.get_morningstar_data`"""
|
|
return asyncio.run(unified_api.get_morningstar_data(ticker, debug=debug))
|
|
|
|
|
|
def get_transaction_history(account=None, start_date=None, end_date=None, time_period=None, debug=False):
|
|
"""Synchronous wrapper for `unified_api.get_transaction_history`"""
|
|
return asyncio.run(
|
|
unified_api.get_transaction_history(
|
|
account=account,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
time_period=time_period,
|
|
debug=debug,
|
|
)
|
|
)
|
|
|
|
|
|
def get_transaction_history_enhanced(account=None, start_date=None, end_date=None, time_period=None, debug=False):
|
|
"""Synchronous wrapper for enhanced transaction history."""
|
|
return asyncio.run(
|
|
unified_api.get_transaction_history_enhanced(
|
|
account=account,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
time_period=time_period,
|
|
debug=debug,
|
|
)
|
|
)
|
|
|
|
|
|
def list_accounts(debug: bool = False):
|
|
"""Synchronous wrapper for account discovery."""
|
|
return asyncio.run(unified_api.list_accounts(debug=debug))
|
|
|
|
|
|
def get_account_overview(account=None, debug: bool = False):
|
|
return asyncio.run(unified_api.get_account_overview(account=account, debug=debug))
|
|
|
|
|
|
def get_positions(account=None, include_non_equity: bool = False, debug: bool = False):
|
|
return asyncio.run(
|
|
unified_api.get_positions(
|
|
account=account,
|
|
include_non_equity=include_non_equity,
|
|
debug=debug,
|
|
)
|
|
)
|
|
|
|
|
|
def get_portfolio_snapshot(account=None, aggregate_by_symbol: bool = True, include_non_equity: bool = False, debug: bool = False):
|
|
return asyncio.run(
|
|
unified_api.get_portfolio_snapshot(
|
|
account=account,
|
|
aggregate_by_symbol=aggregate_by_symbol,
|
|
include_non_equity=include_non_equity,
|
|
debug=debug,
|
|
)
|
|
)
|
|
|
|
|
|
def refresh_session(debug: bool = False):
|
|
return asyncio.run(unified_api.refresh_session(debug=debug))
|
|
|
|
|
|
def check_session_health(debug: bool = False):
|
|
envelope = asyncio.run(unified_api.get_session_status(debug=debug))
|
|
return envelope["success"]
|
|
|
|
|
|
def get_session_status(debug: bool = False):
|
|
return asyncio.run(unified_api.get_session_status(debug=debug))
|
|
|
|
|
|
def get_session_info(debug: bool = False):
|
|
return _session_info()
|
|
|
|
|
|
def ensure_valid_session(debug: bool = False):
|
|
envelope = asyncio.run(unified_api.refresh_session(debug=debug))
|
|
return envelope["success"]
|
|
|
|
|
|
def export_cookies(cookies_path: str, debug: bool = False):
|
|
"""Synchronous wrapper for exporting cookies."""
|
|
return asyncio.run(unified_api.export_cookies(cookies_path, debug=debug))
|
|
|
|
|
|
def set_cookies(cookies_path: str, debug: bool = False):
|
|
"""Synchronous wrapper for setting cookies."""
|
|
return asyncio.run(unified_api.set_cookies(cookies_path, debug=debug))
|
|
|
|
|
|
def list_available_accounts(debug: bool = False):
|
|
"""Synchronous wrapper for listing available transaction accounts."""
|
|
return asyncio.run(unified_api.list_available_accounts(debug=debug)) |