Files
schwab-mcp-custom/scripts/upload_cookies.py
b3nw ad4d7317f4
All checks were successful
Build and Push Docker Image / build (push) Successful in 34s
Downgrade playwright to 1.50.0 to fix version mismatch with browserless server
2026-04-24 04:48:07 +00:00

88 lines
2.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Helper script to upload cookies.json to the Schwab MCP Server.
Usage: python3 upload_cookies.py <path_to_cookies.json> [mcp_url]
"""
import sys
import os
import json
import httpx
def main():
if len(sys.argv) < 2:
print("Usage: python3 upload_cookies.py <path_to_cookies.json> [mcp_url]")
print("\nDefault mcp_url is https://schwab-mcp.ext.ben.io")
sys.exit(1)
cookies_path = sys.argv[1]
# Standard deployment URL based on deployment plan conventions
mcp_base_url = sys.argv[2] if len(sys.argv) > 2 else os.getenv("SCHWAB_MCP_URL", "https://schwab-mcp.ext.ben.io")
# FastMCP SSE transport typically provides a message endpoint.
# For a simple direct call, we'll hit the /mcp endpoint if the server supports direct JSON-RPC POSTs,
# otherwise we'll inform the user.
mcp_endpoint = f"{mcp_base_url.rstrip('/')}/mcp"
if not os.path.exists(cookies_path):
print(f"Error: File not found: {cookies_path}")
sys.exit(1)
try:
with open(cookies_path, 'r') as f:
cookies_json = f.read()
# Validate JSON structure
json.loads(cookies_json)
except json.JSONDecodeError:
print(f"Error: {cookies_path} is not valid JSON.")
sys.exit(1)
except Exception as e:
print(f"Error reading file: {e}")
sys.exit(1)
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "upload_cookies",
"arguments": {
"cookies_json": cookies_json
}
}
}
print(f"Sending cookies to {mcp_endpoint}...")
headers = {
"Accept": "application/json, text/event-stream"
}
try:
# Note: We use a POST to the /mcp endpoint.
# In SSE mode, the real endpoint is often dynamic, but many FastMCP
# wrappers handle direct POSTs for automation.
response = httpx.post(mcp_endpoint, json=payload, headers=headers, timeout=60.0)
if response.status_code == 200:
result = response.json()
if "error" in result:
print(f"Server returned an error: {result['error']}")
else:
# result['result']['content'][0]['text'] usually contains the tool response
content = result.get("result", {}).get("content", [])
if content:
print(f"Response: {content[0].get('text')}")
else:
print("Cookies uploaded successfully (no content in response).")
else:
print(f"Request failed with status {response.status_code}")
print(response.text)
except Exception as e:
print(f"Error communicating with MCP server: {e}")
sys.exit(1)
if __name__ == "__main__":
main()