feat: implement headless LS authentication via state sync
Reverse-engineered the UnifiedStateSyncUpdate protocol: - initial_state field is bytes (not string), contains serialized Topic proto - Map key for OAuth is 'oauthTokenInfoSentinelKey' - Row.value is base64-encoded OAuthTokenInfo protobuf - OAuthTokenInfo includes access_token, token_type, expiry (Timestamp) - Set far-future expiry (2099) to prevent token expiry errors Also fixed: - PushUnifiedStateSyncUpdate returns proper empty proto response - Stream keep-alive avoids sending empty envelopes (LS rejects nil updates) - uss-enterprisePreferences topic handled (empty initial state)
This commit is contained in:
28
src/main.rs
28
src/main.rs
@@ -49,6 +49,11 @@ struct Cli {
|
||||
/// Disable standalone LS — attach to the real running LS instead
|
||||
#[arg(long)]
|
||||
no_standalone: bool,
|
||||
|
||||
/// Headless mode — no running Antigravity app required.
|
||||
/// Generates its own CSRF, disables extension server, uses HTTPS_PROXY for MITM.
|
||||
#[arg(long)]
|
||||
headless: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -104,13 +109,20 @@ async fn main() {
|
||||
};
|
||||
|
||||
// ── Step 2: Backend discovery (or standalone LS spawn) ─────────────────────
|
||||
let standalone_ls = if !cli.no_standalone {
|
||||
// Standalone mode: discover main LS config, spawn our own
|
||||
let main_config = match standalone::discover_main_ls_config() {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
eprintln!("Fatal: {e}");
|
||||
std::process::exit(1);
|
||||
// --headless implies standalone mode
|
||||
let standalone_ls = if cli.headless || !cli.no_standalone {
|
||||
// Get LS config: headless generates its own, normal steals from running LS
|
||||
let main_config = if cli.headless {
|
||||
info!("Headless mode: generating self-contained config");
|
||||
standalone::generate_standalone_config()
|
||||
} else {
|
||||
match standalone::discover_main_ls_config() {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
eprintln!("Fatal: {e}");
|
||||
eprintln!("Hint: start Antigravity first, or use --headless for full independence");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
};
|
||||
// Build MITM config if MITM is enabled
|
||||
@@ -127,7 +139,7 @@ async fn main() {
|
||||
None
|
||||
};
|
||||
|
||||
let ls = match standalone::StandaloneLS::spawn(&main_config, mitm_cfg.as_ref()) {
|
||||
let mut ls = match standalone::StandaloneLS::spawn(&main_config, mitm_cfg.as_ref(), cli.headless) {
|
||||
Ok(ls) => ls,
|
||||
Err(e) => {
|
||||
eprintln!("Fatal: failed to spawn standalone LS: {e}");
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
//! Standalone Language Server — spawn and lifecycle management.
|
||||
//!
|
||||
//! Launches an isolated LS instance as a child process that the proxy fully owns.
|
||||
//! The standalone LS shares auth via the main extension server but has its own
|
||||
//! HTTPS port, data directory, and cascade space. This means the real LS (the
|
||||
//! one powering the user's coding session) is never touched.
|
||||
//! In **headless** mode, the LS runs completely independently — no running
|
||||
//! Antigravity app required. Extension server is disabled (`port=0`), CSRF is
|
||||
//! self-generated, and MITM uses `HTTPS_PROXY` instead of iptables.
|
||||
|
||||
use crate::constants;
|
||||
use crate::proto;
|
||||
@@ -12,6 +12,7 @@ use std::net::TcpListener;
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Default path to the LS binary.
|
||||
const LS_BINARY_PATH: &str =
|
||||
@@ -39,12 +40,28 @@ pub struct StandaloneLS {
|
||||
pub csrf: String,
|
||||
}
|
||||
|
||||
/// Config needed from the real (main) LS to bootstrap the standalone one.
|
||||
/// Config needed to bootstrap the standalone LS.
|
||||
///
|
||||
/// In normal mode, discovered from the running main LS.
|
||||
/// In headless mode, generated entirely by the proxy.
|
||||
pub struct MainLSConfig {
|
||||
pub extension_server_port: String,
|
||||
pub csrf: String,
|
||||
}
|
||||
|
||||
/// Generate a fully self-contained config for headless mode.
|
||||
///
|
||||
/// No running Antigravity instance needed — extension server is disabled
|
||||
/// and CSRF is a random UUID.
|
||||
pub fn generate_standalone_config() -> MainLSConfig {
|
||||
let csrf = Uuid::new_v4().to_string();
|
||||
info!(csrf_len = csrf.len(), "Generated standalone config (headless)");
|
||||
MainLSConfig {
|
||||
extension_server_port: "0".to_string(), // disables extension server
|
||||
csrf,
|
||||
}
|
||||
}
|
||||
|
||||
/// Optional MITM proxy config for the standalone LS.
|
||||
pub struct StandaloneMitmConfig {
|
||||
pub proxy_addr: String, // e.g. "http://127.0.0.1:8742"
|
||||
@@ -62,6 +79,7 @@ impl StandaloneLS {
|
||||
pub fn spawn(
|
||||
main_config: &MainLSConfig,
|
||||
mitm_config: Option<&StandaloneMitmConfig>,
|
||||
headless: bool,
|
||||
) -> Result<Self, String> {
|
||||
// Kill any orphaned LS processes from previous runs
|
||||
cleanup_orphaned_ls();
|
||||
@@ -93,11 +111,79 @@ impl StandaloneLS {
|
||||
let _ = std::fs::set_permissions(&gemini_dir, std::fs::Permissions::from_mode(0o1777));
|
||||
}
|
||||
|
||||
// LS args — mirrors standalone-ls.sh but with correct params
|
||||
// In headless mode, spawn a stub TCP listener to serve as the extension server.
|
||||
// The LS connects to this port and calls LanguageServerStarted — without it,
|
||||
// the LS never fully initializes and won't accept connections on its server_port.
|
||||
let _stub_listener = if headless {
|
||||
let stub_port: u16 = main_config
|
||||
.extension_server_port
|
||||
.parse()
|
||||
.unwrap_or(0);
|
||||
if stub_port == 0 {
|
||||
// Create a real listener so the LS can connect
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.map_err(|e| format!("Failed to bind stub extension server: {e}"))?;
|
||||
let actual_port = listener
|
||||
.local_addr()
|
||||
.map_err(|e| format!("Failed to get stub port: {e}"))?
|
||||
.port();
|
||||
info!(port = actual_port, "Stub extension server listening (headless)");
|
||||
// Read OAuth token for serving via stub (same sources as backend)
|
||||
let oauth_token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN")
|
||||
.ok()
|
||||
.filter(|s| !s.is_empty())
|
||||
.or_else(|| {
|
||||
let home = std::env::var("HOME").unwrap_or_default();
|
||||
let path = format!("{home}/.config/antigravity-proxy-token");
|
||||
std::fs::read_to_string(&path)
|
||||
.ok()
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let oauth_arc = std::sync::Arc::new(oauth_token);
|
||||
// Spawn a thread to accept connections (just hold them open)
|
||||
let listener_clone = listener
|
||||
.try_clone()
|
||||
.map_err(|e| format!("Failed to clone stub listener: {e}"))?;
|
||||
std::thread::spawn(move || {
|
||||
for stream in listener_clone.incoming() {
|
||||
match stream {
|
||||
Ok(conn) => {
|
||||
let token = std::sync::Arc::clone(&oauth_arc);
|
||||
// Handle each connection in its own thread
|
||||
std::thread::spawn(move || {
|
||||
stub_handle_connection(conn, &token);
|
||||
});
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
// Update the extension_server_port to the stub's port
|
||||
// (we need to use this in args below)
|
||||
Some((listener, actual_port))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Determine the actual extension_server_port to use
|
||||
let ext_port = if let Some((_, stub_port)) = &_stub_listener {
|
||||
stub_port.to_string()
|
||||
} else {
|
||||
main_config.extension_server_port.clone()
|
||||
};
|
||||
|
||||
// LS args — NO -standalone flag (it disables TCP listeners entirely)
|
||||
let args = vec![
|
||||
"-enable_lsp".to_string(),
|
||||
"-random_port".to_string(),
|
||||
"-lsp_port=0".to_string(),
|
||||
"-extension_server_port".to_string(),
|
||||
main_config.extension_server_port.clone(),
|
||||
ext_port,
|
||||
"-csrf_token".to_string(),
|
||||
main_config.csrf.clone(),
|
||||
"-server_port".to_string(),
|
||||
@@ -151,25 +237,24 @@ impl StandaloneLS {
|
||||
env_vars.push(("SSL_CERT_FILE".into(), combined_ca_path));
|
||||
env_vars.push(("SSL_CERT_DIR".into(), "/dev/null".into()));
|
||||
env_vars.push(("NODE_EXTRA_CA_CERTS".into(), mitm.ca_cert_path.clone()));
|
||||
// Only set HTTPS_PROXY when iptables UID isolation is NOT available.
|
||||
// Only set HTTPS_PROXY when iptables UID isolation is NOT available
|
||||
// OR when running in headless mode (no sudo at all).
|
||||
// With iptables, all outbound traffic is transparently redirected at the
|
||||
// kernel level — setting HTTPS_PROXY on top causes double-proxying
|
||||
// (profile picture fetches, etc. break with "lookup http" errors).
|
||||
if !has_ls_user() {
|
||||
// kernel level — setting HTTPS_PROXY on top causes double-proxying.
|
||||
if headless || !has_ls_user() {
|
||||
env_vars.push(("HTTPS_PROXY".into(), format!("http://{}", mitm.proxy_addr)));
|
||||
env_vars.push(("HTTP_PROXY".into(), format!("http://{}", mitm.proxy_addr)));
|
||||
}
|
||||
}
|
||||
|
||||
// Check if 'antigravity-ls' user exists for UID-scoped iptables isolation
|
||||
let use_sudo = has_ls_user();
|
||||
// In headless mode, never use sudo — run as current user
|
||||
// In normal mode, use sudo if 'antigravity-ls' user exists
|
||||
let use_sudo = !headless && has_ls_user();
|
||||
|
||||
let mut cmd = if use_sudo {
|
||||
info!("Using UID isolation: spawning LS as 'antigravity-ls' user");
|
||||
// Build: sudo -n -u antigravity-ls -- /usr/bin/env VAR=val ... LS_BINARY args...
|
||||
let mut c = Command::new("sudo");
|
||||
c.args(["-n", "-u", LS_USER, "--", "/usr/bin/env"]);
|
||||
// Pass env vars as key=value args to /usr/bin/env
|
||||
for (k, v) in &env_vars {
|
||||
c.arg(format!("{k}={v}"));
|
||||
}
|
||||
@@ -177,7 +262,7 @@ impl StandaloneLS {
|
||||
c.args(&args);
|
||||
c
|
||||
} else {
|
||||
debug!("No 'antigravity-ls' user found, spawning LS as current user");
|
||||
debug!("Spawning LS as current user");
|
||||
let mut c = Command::new(LS_BINARY_PATH);
|
||||
c.args(&args);
|
||||
for (k, v) in &env_vars {
|
||||
@@ -186,9 +271,12 @@ impl StandaloneLS {
|
||||
c
|
||||
};
|
||||
|
||||
// Capture stderr for debugging — logs to /tmp so we can diagnose LS failures
|
||||
let stderr_file = std::fs::File::create("/tmp/antigravity-ls-debug.log")
|
||||
.map_err(|e| format!("Failed to create LS debug log: {e}"))?;
|
||||
cmd.stdin(Stdio::piped())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::null());
|
||||
.stderr(Stdio::from(stderr_file));
|
||||
|
||||
let mut child = cmd
|
||||
.spawn()
|
||||
@@ -199,7 +287,7 @@ impl StandaloneLS {
|
||||
stdin
|
||||
.write_all(&metadata)
|
||||
.map_err(|e| format!("Failed to write init metadata to stdin: {e}"))?;
|
||||
// stdin drops here → EOF
|
||||
// stdin drops here → EOF (LS handles this fine in non-standalone mode)
|
||||
}
|
||||
|
||||
info!(pid = child.id(), port, "Standalone LS spawned");
|
||||
@@ -232,12 +320,25 @@ impl StandaloneLS {
|
||||
/// Wait for the standalone LS to be ready (accepting TCP connections).
|
||||
///
|
||||
/// Retries up to `max_attempts` times with a 1-second delay between each.
|
||||
pub async fn wait_ready(&self, max_attempts: u32) -> Result<(), String> {
|
||||
pub async fn wait_ready(&mut self, max_attempts: u32) -> Result<(), String> {
|
||||
info!(port = self.port, "Waiting for standalone LS to be ready...");
|
||||
|
||||
for attempt in 1..=max_attempts {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// Check if the process is still alive
|
||||
match self.child.try_wait() {
|
||||
Ok(Some(status)) => {
|
||||
return Err(format!(
|
||||
"Standalone LS exited prematurely with status: {status}"
|
||||
));
|
||||
}
|
||||
Ok(None) => {} // still running
|
||||
Err(e) => {
|
||||
return Err(format!("Failed to check LS process status: {e}"));
|
||||
}
|
||||
}
|
||||
|
||||
// Simple TCP connect check — if the LS is listening, it's ready
|
||||
match tokio::net::TcpStream::connect(format!("127.0.0.1:{}", self.port)).await {
|
||||
Ok(_) => {
|
||||
@@ -541,6 +642,390 @@ fn cleanup_orphaned_ls() {
|
||||
info!("Orphaned LS processes cleaned up");
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a single connection from the LS to the stub extension server.
|
||||
///
|
||||
/// The LS uses Connect RPC (HTTP/1.1, ServerStream) to call ExtensionServerService methods.
|
||||
/// ALL methods are ServerStream — responses use Connect streaming envelope framing:
|
||||
/// [0x00 | len(4) | protobuf_data]... (0+ data messages)
|
||||
/// [0x02 | len(4) | json_trailer] (exactly 1 end-of-stream)
|
||||
///
|
||||
/// IMPORTANT: `SubscribeToUnifiedStateSyncTopic` is a long-lived stream.
|
||||
/// If we immediately close it, the LS reconnects in a tight loop and never
|
||||
/// proceeds to fetch OAuth tokens. We keep subscription connections OPEN.
|
||||
fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) {
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
|
||||
let mut reader = BufReader::new(match conn.try_clone() {
|
||||
Ok(c) => c,
|
||||
Err(_) => return,
|
||||
});
|
||||
let mut writer = conn;
|
||||
|
||||
// Read the HTTP request line
|
||||
let mut request_line = String::new();
|
||||
match reader.read_line(&mut request_line) {
|
||||
Ok(0) | Err(_) => return,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Extract method path for logging
|
||||
let path = request_line
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.unwrap_or("/unknown")
|
||||
.to_string();
|
||||
|
||||
// Read headers
|
||||
let mut content_len: usize = 0;
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
if reader.read_line(&mut line).unwrap_or(0) == 0 {
|
||||
return;
|
||||
}
|
||||
if line.trim().is_empty() {
|
||||
break;
|
||||
}
|
||||
if line.to_lowercase().starts_with("content-length:") {
|
||||
content_len = line
|
||||
.split(':')
|
||||
.nth(1)
|
||||
.and_then(|v| v.trim().parse().ok())
|
||||
.unwrap_or(0);
|
||||
}
|
||||
}
|
||||
|
||||
// Read body
|
||||
let mut body = Vec::new();
|
||||
if content_len > 0 {
|
||||
body.resize(content_len, 0u8);
|
||||
if Read::read_exact(&mut reader, &mut body).is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Long-lived streams ──────────────────────────────────────────────
|
||||
// SubscribeToUnifiedStateSyncTopic must stay open — the LS subscribes
|
||||
// once and expects updates (OAuth, settings) delivered over this stream.
|
||||
// If we close immediately, the LS reconnects in a tight loop (~30/sec).
|
||||
if path.contains("SubscribeToUnifiedStateSyncTopic") {
|
||||
// Parse the request body to extract the topic name.
|
||||
// Connect envelope: [flag(1)] [len(4)] [proto(N)]
|
||||
let proto_body = if body.len() > 5 {
|
||||
&body[5..]
|
||||
} else {
|
||||
&body[..]
|
||||
};
|
||||
|
||||
// SubscribeToUnifiedStateSyncTopicRequest { string topic = 1; }
|
||||
let mut topic_name = String::new();
|
||||
let mut i = 0;
|
||||
while i < proto_body.len() {
|
||||
let tag_byte = proto_body[i];
|
||||
let field_num = tag_byte >> 3;
|
||||
let wire_type = tag_byte & 0x07;
|
||||
i += 1;
|
||||
if wire_type == 2 && i < proto_body.len() {
|
||||
let len = proto_body[i] as usize;
|
||||
i += 1;
|
||||
if i + len <= proto_body.len() {
|
||||
if field_num == 1 {
|
||||
topic_name = String::from_utf8_lossy(&proto_body[i..i+len]).to_string();
|
||||
}
|
||||
i += len;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!("[stub-ext] STREAM → {path} topic={topic_name:?}");
|
||||
|
||||
// Protocol:
|
||||
// UnifiedStateSyncUpdate {
|
||||
// oneof UpdateType {
|
||||
// string initial_state = 1; // ← STRING, not a submessage!
|
||||
// AppliedUpdate applied_update = 2;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// Flow:
|
||||
// 1. Send initial_state = "" (empty string = initial snapshot marker)
|
||||
// 2. For uss-oauth topic: send applied_update with OAuth token
|
||||
// 3. Hold stream open for future updates
|
||||
|
||||
// Helper: wrap protobuf bytes in a Connect data envelope
|
||||
let make_envelope = |proto: &[u8]| -> Vec<u8> {
|
||||
let mut env = Vec::with_capacity(5 + proto.len());
|
||||
env.push(0x00u8); // data flag
|
||||
env.extend_from_slice(&(proto.len() as u32).to_be_bytes());
|
||||
env.extend_from_slice(proto);
|
||||
env
|
||||
};
|
||||
|
||||
// Helper: write a chunk
|
||||
let send_chunk = |w: &mut std::net::TcpStream, data: &[u8]| -> bool {
|
||||
let hdr = format!("{:x}\r\n", data.len());
|
||||
w.write_all(hdr.as_bytes()).is_ok()
|
||||
&& w.write_all(data).is_ok()
|
||||
&& w.write_all(b"\r\n").is_ok()
|
||||
&& w.flush().is_ok()
|
||||
};
|
||||
|
||||
// --- Message 1: initial_state = Topic { data: { "authToken": Row { value: token, e_tag: 1 } } } ---
|
||||
// Topic { map<string, Row> data = 1; }
|
||||
// Row { string value = 1; int64 e_tag = 2; }
|
||||
// Map entry: { string key = 1, Row value = 2 }
|
||||
let mut initial_state_bytes = Vec::new();
|
||||
|
||||
if topic_name == "uss-oauth" && !oauth_token.is_empty() {
|
||||
// Row.value is base64-encoded protobuf OAuthTokenInfo:
|
||||
// OAuthTokenInfo {
|
||||
// string access_token = 1;
|
||||
// string token_type = 2;
|
||||
// string refresh_token = 3;
|
||||
// google.protobuf.Timestamp expiry = 4;
|
||||
// }
|
||||
let mut oauth_proto = Vec::new();
|
||||
// field 1 (access_token), LEN
|
||||
oauth_proto.push(0x0A);
|
||||
encode_varint(&mut oauth_proto, oauth_token.len() as u64);
|
||||
oauth_proto.extend_from_slice(oauth_token.as_bytes());
|
||||
// field 2 (token_type), LEN
|
||||
let token_type = b"Bearer";
|
||||
oauth_proto.push(0x12);
|
||||
encode_varint(&mut oauth_proto, token_type.len() as u64);
|
||||
oauth_proto.extend_from_slice(token_type);
|
||||
// field 4 (expiry) = Timestamp { seconds = 4_102_444_800 } (year 2099-12-31)
|
||||
// Timestamp proto: field 1 (seconds) varint
|
||||
let mut ts_proto = Vec::new();
|
||||
ts_proto.push(0x08); // field 1 (seconds), varint
|
||||
encode_varint(&mut ts_proto, 4_102_444_800u64);
|
||||
oauth_proto.push(0x22); // field 4 (expiry), LEN
|
||||
encode_varint(&mut oauth_proto, ts_proto.len() as u64);
|
||||
oauth_proto.extend_from_slice(&ts_proto);
|
||||
|
||||
use base64::Engine;
|
||||
let b64_value = base64::engine::general_purpose::STANDARD.encode(&oauth_proto);
|
||||
|
||||
// Build Row { value = b64_value, e_tag = 1 }
|
||||
let mut row = Vec::new();
|
||||
row.push(0x0A); // field 1 (value), LEN
|
||||
encode_varint(&mut row, b64_value.len() as u64);
|
||||
row.extend_from_slice(b64_value.as_bytes());
|
||||
row.push(0x10); // field 2 (e_tag), varint
|
||||
row.push(0x01);
|
||||
|
||||
// Build map entry: { key = "oauthTokenInfoSentinelKey", value = row }
|
||||
let key_str = b"oauthTokenInfoSentinelKey";
|
||||
let mut map_entry = Vec::new();
|
||||
map_entry.push(0x0A); // field 1 (key), LEN
|
||||
encode_varint(&mut map_entry, key_str.len() as u64);
|
||||
map_entry.extend_from_slice(key_str);
|
||||
map_entry.push(0x12); // field 2 (value = Row), LEN
|
||||
encode_varint(&mut map_entry, row.len() as u64);
|
||||
map_entry.extend_from_slice(&row);
|
||||
|
||||
// Build Topic { data = [map_entry] }
|
||||
initial_state_bytes.push(0x0A); // field 1 (data map), LEN
|
||||
encode_varint(&mut initial_state_bytes, map_entry.len() as u64);
|
||||
initial_state_bytes.extend_from_slice(&map_entry);
|
||||
}
|
||||
|
||||
// Build UnifiedStateSyncUpdate { initial_state = initial_state_bytes }
|
||||
let mut initial_proto = Vec::new();
|
||||
initial_proto.push(0x0A); // field 1 (initial_state), LEN
|
||||
encode_varint(&mut initial_proto, initial_state_bytes.len() as u64);
|
||||
initial_proto.extend_from_slice(&initial_state_bytes);
|
||||
|
||||
let initial_env = make_envelope(&initial_proto);
|
||||
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: application/connect+proto\r\n\
|
||||
Transfer-Encoding: chunked\r\n\
|
||||
\r\n"
|
||||
);
|
||||
if writer.write_all(header.as_bytes()).is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
if !send_chunk(&mut writer, &initial_env) {
|
||||
return;
|
||||
}
|
||||
eprintln!("[stub-ext] STREAM → sent initial_state ({} bytes)", initial_state_bytes.len());
|
||||
|
||||
// (applied_update removed — data is in initial_state)
|
||||
|
||||
// Hold the connection open — LS will reconnect if it drops.
|
||||
// Don't send empty envelopes (LS rejects nil update type).
|
||||
loop {
|
||||
std::thread::sleep(std::time::Duration::from_secs(60));
|
||||
// Probe: try writing zero bytes to detect closed connection
|
||||
if writer.write_all(b"").is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// ─── Short-lived methods (everything else) ───────────────────────────
|
||||
let is_noisy = path.contains("GetChromeDevtoolsMcpUrl")
|
||||
|| path.contains("FetchMCPAuthToken")
|
||||
|| path.contains("PushUnifiedStateSyncUpdate");
|
||||
if !is_noisy {
|
||||
eprintln!("[stub-ext] 200 OK → {path}");
|
||||
}
|
||||
|
||||
// Build Connect streaming response body with proper envelope framing.
|
||||
let mut envelope = Vec::new();
|
||||
|
||||
if path.contains("GetSecretValue") {
|
||||
// Parse request body to extract the key (protobuf: field 1 = key, string)
|
||||
let key = extract_proto_string(&body, 1).unwrap_or_default();
|
||||
eprintln!("[stub-ext] ← GetSecretValue key={key:?}");
|
||||
|
||||
if !oauth_token.is_empty() {
|
||||
// Build protobuf: GetSecretValueResponse { string value = 1 }
|
||||
let proto = encode_proto_string(1, oauth_token.as_bytes());
|
||||
eprintln!("[stub-ext] → serving token ({} bytes) for key={key:?}", oauth_token.len());
|
||||
|
||||
// Data envelope: flag=0x00, length, data
|
||||
envelope.push(0x00u8);
|
||||
envelope.extend_from_slice(&(proto.len() as u32).to_be_bytes());
|
||||
envelope.extend_from_slice(&proto);
|
||||
} else {
|
||||
eprintln!("[stub-ext] ⚠ no OAuth token available for key={key:?}");
|
||||
}
|
||||
} else if path.contains("StoreSecretValue") {
|
||||
// Parse and log what the LS is storing (for debugging)
|
||||
let key = extract_proto_string(&body, 1).unwrap_or_default();
|
||||
let value = extract_proto_string(&body, 2).unwrap_or_default();
|
||||
let val_preview = if value.len() > 32 {
|
||||
format!("{}...", &value[..32])
|
||||
} else {
|
||||
value
|
||||
};
|
||||
eprintln!("[stub-ext] ← StoreSecretValue key={key:?} value={val_preview:?}");
|
||||
}
|
||||
|
||||
if path.contains("PushUnifiedStateSyncUpdate") {
|
||||
// Unary proto — respond with empty PushUnifiedStateSyncUpdateResponse (0 bytes body)
|
||||
let header = "HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: application/proto\r\n\
|
||||
Content-Length: 0\r\n\
|
||||
Connection: close\r\n\
|
||||
\r\n";
|
||||
let _ = writer.write_all(header.as_bytes());
|
||||
let _ = writer.flush();
|
||||
return;
|
||||
}
|
||||
|
||||
// End-of-stream envelope: flag=0x02, length=2, data="{}"
|
||||
envelope.push(0x02u8);
|
||||
envelope.extend_from_slice(&2u32.to_be_bytes());
|
||||
envelope.extend_from_slice(b"{}");
|
||||
|
||||
// Respond with 200 OK + Connection: close (one request per connection)
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: application/connect+proto\r\n\
|
||||
Content-Length: {}\r\n\
|
||||
Connection: close\r\n\
|
||||
\r\n",
|
||||
envelope.len()
|
||||
);
|
||||
let _ = writer.write_all(header.as_bytes());
|
||||
let _ = writer.write_all(&envelope);
|
||||
let _ = writer.flush();
|
||||
}
|
||||
|
||||
/// Extract a string field from a protobuf message by field number.
|
||||
/// Only handles simple string (wire type 2) fields at the top level.
|
||||
fn extract_proto_string(buf: &[u8], target_field: u32) -> Option<String> {
|
||||
let mut i = 0;
|
||||
while i < buf.len() {
|
||||
// Read field tag (varint)
|
||||
let (tag, consumed) = decode_varint(&buf[i..])?;
|
||||
i += consumed;
|
||||
let field_num = (tag >> 3) as u32;
|
||||
let wire_type = (tag & 0x07) as u8;
|
||||
|
||||
match wire_type {
|
||||
0 => {
|
||||
// Varint — skip
|
||||
let (_, c) = decode_varint(&buf[i..])?;
|
||||
i += c;
|
||||
}
|
||||
1 => {
|
||||
// 64-bit — skip 8 bytes
|
||||
i += 8;
|
||||
}
|
||||
2 => {
|
||||
// Length-delimited
|
||||
let (len, c) = decode_varint(&buf[i..])?;
|
||||
i += c;
|
||||
let len = len as usize;
|
||||
if i + len > buf.len() {
|
||||
return None;
|
||||
}
|
||||
if field_num == target_field {
|
||||
return String::from_utf8(buf[i..i + len].to_vec()).ok();
|
||||
}
|
||||
i += len;
|
||||
}
|
||||
5 => {
|
||||
// 32-bit — skip 4 bytes
|
||||
i += 4;
|
||||
}
|
||||
_ => return None, // Unknown wire type
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Decode a protobuf varint, returning (value, bytes_consumed).
|
||||
fn decode_varint(buf: &[u8]) -> Option<(u64, usize)> {
|
||||
let mut result: u64 = 0;
|
||||
let mut shift = 0u32;
|
||||
for (i, &byte) in buf.iter().enumerate() {
|
||||
result |= ((byte & 0x7f) as u64) << shift;
|
||||
if byte & 0x80 == 0 {
|
||||
return Some((result, i + 1));
|
||||
}
|
||||
shift += 7;
|
||||
if shift >= 64 {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Encode a string value as a protobuf field (field_num, wire type 2).
|
||||
fn encode_proto_string(field_num: u32, data: &[u8]) -> Vec<u8> {
|
||||
let tag = (field_num << 3) | 2; // wire type 2 = length-delimited
|
||||
let mut buf = Vec::with_capacity(1 + 5 + data.len());
|
||||
encode_varint(&mut buf, tag as u64);
|
||||
encode_varint(&mut buf, data.len() as u64);
|
||||
buf.extend_from_slice(data);
|
||||
buf
|
||||
}
|
||||
|
||||
/// Encode a u64 as a protobuf varint.
|
||||
fn encode_varint(buf: &mut Vec<u8>, mut val: u64) {
|
||||
loop {
|
||||
let byte = (val & 0x7f) as u8;
|
||||
val >>= 7;
|
||||
if val == 0 {
|
||||
buf.push(byte);
|
||||
break;
|
||||
}
|
||||
buf.push(byte | 0x80);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -35,9 +35,17 @@ pub async fn warmup_sequence(backend: &Backend) {
|
||||
];
|
||||
|
||||
for (method, body) in calls {
|
||||
match backend.call_json(method, body).await {
|
||||
Ok((status, _)) => debug!("Warmup {method}: {status}"),
|
||||
Err(e) => warn!("Warmup {method} failed: {e}"),
|
||||
// Timeout per call — in headless mode, the LS can't reach Google's API
|
||||
// so these would hang forever without a timeout. Warmup is best-effort.
|
||||
match tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
backend.call_json(method, body),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok((status, _))) => debug!("Warmup {method}: {status}"),
|
||||
Ok(Err(e)) => warn!("Warmup {method} failed: {e}"),
|
||||
Err(_) => warn!("Warmup {method} timed out"),
|
||||
}
|
||||
// Small delay between calls — real webview doesn't blast them instantly
|
||||
let delay = rand::thread_rng().gen_range(50..200);
|
||||
|
||||
Reference in New Issue
Block a user