//! Backend: discovery of the local Antigravity language server and HTTP client. //! //! Uses wreq (BoringSSL) to impersonate Chrome's TLS + HTTP/2 fingerprint, //! making our requests indistinguishable from the real Electron webview. use crate::constants::*; use flate2::read::{DeflateDecoder, GzDecoder}; use std::fs; use std::io::Read; use std::process::Command; use std::sync::LazyLock; use tokio::sync::RwLock; use tracing::{debug, info, warn}; use wreq::header::{HeaderMap, HeaderName, HeaderValue}; /// Connection details for the local language server. pub struct Backend { inner: RwLock, client: wreq::Client, } struct BackendInner { pid: String, csrf: String, https_port: String, oauth_token: String, } /// Static headers that never change — built once, in Chrome's exact emission order. /// /// Order matters: wreq preserves insertion order in HTTP/2 HEADERS frames. /// This matches the order captured from Chrome DevTools on the real webview. static STATIC_HEADERS: LazyLock = LazyLock::new(|| { let mut h = HeaderMap::with_capacity(14); // Chrome order: Origin → UA → Accept → Accept-Encoding → Accept-Language // → sec-ch-ua → sec-ch-ua-mobile → sec-ch-ua-platform // → Sec-Fetch-Dest → Sec-Fetch-Mode → Sec-Fetch-Site // → Referer → Priority → Connect-Protocol-Version h.insert("Origin", hv("vscode-file://vscode-app")); h.insert("User-Agent", hv(&USER_AGENT)); h.insert("Accept", hv("*/*")); h.insert("Accept-Encoding", hv("gzip, deflate, br, zstd")); h.insert("Accept-Language", hv("en-US")); h.insert( HeaderName::from_static("sec-ch-ua"), hv(&format!( "\"Not_A Brand\";v=\"99\", \"Chromium\";v=\"{}\"", *CHROME_MAJOR, )), ); h.insert( HeaderName::from_static("sec-ch-ua-mobile"), hv("?0"), ); h.insert( HeaderName::from_static("sec-ch-ua-platform"), hv("\"Linux\""), ); h.insert("Sec-Fetch-Dest", hv("empty")); h.insert("Sec-Fetch-Mode", hv("cors")); h.insert("Sec-Fetch-Site", hv("cross-site")); h.insert("Priority", hv("u=1, i")); h.insert("Connect-Protocol-Version", hv("1")); h }); impl Backend { /// Discover the running language server and build a BoringSSL-backed connection. pub fn new() -> Result { let inner = discover()?; // wreq with Chrome impersonation: BoringSSL + Chrome JA3/JA4 + H2 fingerprint let client = wreq::Client::builder() .emulation(wreq_util::Emulation::Chrome142) .cert_verification(false) // LS uses self-signed cert .verify_hostname(false) .build() .map_err(|e| format!("wreq client build failed: {e}"))?; Ok(Self { inner: RwLock::new(inner), client, }) } /// Create a Backend with known connection details (for standalone LS). /// /// Skips auto-discovery — the caller provides the port, CSRF, and OAuth token. pub fn new_with_config( port: u16, csrf: String, oauth_token: String, ) -> Result { let inner = BackendInner { pid: "standalone".to_string(), csrf, https_port: port.to_string(), oauth_token, }; let client = wreq::Client::builder() .emulation(wreq_util::Emulation::Chrome142) .cert_verification(false) .verify_hostname(false) .build() .map_err(|e| format!("wreq client build failed: {e}"))?; Ok(Self { inner: RwLock::new(inner), client, }) } /// Re-discover language server connection details. /// Runs blocking I/O on a spawn_blocking thread to avoid starving tokio. pub async fn refresh(&self) -> Result<(), String> { let new_inner = tokio::task::spawn_blocking(discover) .await .map_err(|e| format!("spawn_blocking failed: {e}"))??; let mut guard = self.inner.write().await; *guard = new_inner; Ok(()) } /// Get current connection info (for startup banner). pub async fn info(&self) -> (String, String, String, String) { let guard = self.inner.read().await; let token_preview = if guard.oauth_token.is_empty() { "NOT SET".to_string() } else { safe_truncate(&guard.oauth_token, 20) }; let csrf_preview = safe_truncate(&guard.csrf, 8); ( guard.pid.clone(), guard.https_port.clone(), csrf_preview, token_preview, ) } /// Get current OAuth token. /// /// Priority: token file > env var > cached value. /// Uses async I/O for file reads. Single write-lock acquisition /// eliminates the TOCTOU race of read-check-then-write. pub async fn oauth_token(&self) -> String { // Check file first (async I/O — won't block tokio) let token_path = token_file_path(); if let Ok(contents) = tokio::fs::read_to_string(&token_path).await { let token = contents.trim().to_string(); if !token.is_empty() && token.starts_with("ya29.") { // Single lock: compare-and-set atomically let mut guard = self.inner.write().await; if guard.oauth_token != token { info!("Token updated from file"); guard.oauth_token = token.clone(); } return token; } } // Then env var if let Ok(env_token) = std::env::var("ANTIGRAVITY_OAUTH_TOKEN") { if !env_token.is_empty() { let mut guard = self.inner.write().await; if guard.oauth_token != env_token { info!("Token updated from env var"); guard.oauth_token = env_token.clone(); } return env_token; } } self.inner.read().await.oauth_token.clone() } /// Fire-and-forget: update conversation annotations alongside SendUserCascadeMessage. /// /// The real webview calls this after every message to track lastUserViewTime. /// Without it, the LS sees messages without annotation updates — a fingerprint. pub async fn update_annotations(&self, cascade_id: &str) -> Result<(), String> { let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true); let body = serde_json::json!({ "cascadeId": cascade_id, "annotations": { "lastUserViewTime": now }, "mergeAnnotations": true }); match self.call_json("UpdateConversationAnnotations", &body).await { Ok((status, _)) => { debug!("UpdateConversationAnnotations: {status}"); Ok(()) } Err(e) => { warn!("UpdateConversationAnnotations failed: {e}"); Err(e) } } } /// Set OAuth token at runtime. pub async fn set_oauth_token(&self, token: String) { let mut guard = self.inner.write().await; guard.oauth_token = token; } // ─── RPC calls ────────────────────────────────────────────────────── /// Common headers: clone cached static + insert per-request CSRF. fn common_headers(csrf: &str) -> HeaderMap { let mut h = STATIC_HEADERS.clone(); if let Ok(val) = HeaderValue::from_str(csrf) { h.insert( HeaderName::from_static("x-codeium-csrf-token"), val, ); } else { warn!("CSRF token contains invalid header characters, omitting"); } h } /// Call a JSON RPC method on the language server. pub async fn call_json( &self, method: &str, body: &serde_json::Value, ) -> Result<(u16, serde_json::Value), String> { let (base, csrf) = { let guard = self.inner.read().await; ( format!("https://127.0.0.1:{}", guard.https_port), guard.csrf.clone(), ) }; let url = format!("{base}/{LS_SERVICE}/{method}"); let mut headers = Self::common_headers(&csrf); headers.insert("Content-Type", HeaderValue::from_static("application/json")); let body_bytes = serde_json::to_vec(body) .map_err(|e| format!("JSON serialize error: {e}"))?; let resp = self .client .post(&url) .headers(headers) .body(body_bytes) .send() .await .map_err(|e| format!("HTTP error: {e}"))?; let status = resp.status().as_u16(); let encoding = resp .headers() .get("content-encoding") .and_then(|v| v.to_str().ok()) .unwrap_or("") .to_string(); let raw = resp.bytes().await .map_err(|e| format!("Read body error: {e}"))?; let resp_bytes = decompress(method, &raw, &encoding); // High-frequency polling methods → trace; everything else → debug if method.starts_with("GetCascadeTrajectory") { tracing::trace!( "{method} response ({status}, {} bytes, enc={encoding})", resp_bytes.len(), ); } else { tracing::debug!( "{method} response ({status}, {} bytes, enc={encoding})", resp_bytes.len(), ); } tracing::trace!( "{method} body: {}", String::from_utf8_lossy(&resp_bytes[..resp_bytes.len().min(200)]) ); let data: serde_json::Value = match serde_json::from_slice(&resp_bytes) { Ok(v) => v, Err(e) => { tracing::warn!("{method} response is not valid JSON: {e}"); serde_json::Value::Object(serde_json::Map::new()) } }; Ok((status, data)) } /// Call a binary protobuf RPC method. pub async fn call_proto( &self, method: &str, body: Vec, ) -> Result<(u16, Vec), String> { let (base, csrf) = { let guard = self.inner.read().await; ( format!("https://127.0.0.1:{}", guard.https_port), guard.csrf.clone(), ) }; let url = format!("{base}/{LS_SERVICE}/{method}"); let mut headers = Self::common_headers(&csrf); headers.insert("Content-Type", HeaderValue::from_static("application/proto")); let resp = self .client .post(&url) .headers(headers) .body(body) .send() .await .map_err(|e| format!("HTTP error: {e}"))?; let status = resp.status().as_u16(); let encoding = resp .headers() .get("content-encoding") .and_then(|v| v.to_str().ok()) .unwrap_or("") .to_string(); let raw = resp .bytes() .await .map_err(|e| format!("Read body error: {e}"))?; let decompressed = decompress(method, &raw, &encoding); Ok((status, decompressed)) } /// StartCascade → returns cascade_id. pub async fn create_cascade(&self) -> Result { let body = serde_json::json!({"prompt": "new chat"}); let (status, data) = self.call_json("StartCascade", &body).await?; if status != 200 { return Err(format!("StartCascade failed: {status} — {data}")); } tracing::debug!("StartCascade response: {data}"); data["cascadeId"] .as_str() .map(|s| s.to_string()) .ok_or_else(|| format!("Missing cascadeId in response: {data}")) } /// SendUserCascadeMessage with binary protobuf body. #[allow(dead_code)] pub async fn send_message( &self, cascade_id: &str, text: &str, model_enum: u32, ) -> Result<(u16, Vec), String> { self.send_message_with_image(cascade_id, text, model_enum, None).await } /// SendUserCascadeMessage with optional image attachment. pub async fn send_message_with_image( &self, cascade_id: &str, text: &str, model_enum: u32, image: Option<&crate::proto::ImageData>, ) -> Result<(u16, Vec), String> { let token = self.oauth_token().await; if token.is_empty() { return Err("No OAuth token available".to_string()); } let proto = crate::proto::build_request_with_image(cascade_id, text, &token, model_enum, image); self.call_proto("SendUserCascadeMessage", proto).await } /// GetCascadeTrajectorySteps → JSON with steps array. pub async fn get_steps( &self, cascade_id: &str, ) -> Result<(u16, serde_json::Value), String> { let body = serde_json::json!({"cascadeId": cascade_id}); self.call_json("GetCascadeTrajectorySteps", &body).await } /// GetCascadeTrajectory → JSON with trajectory status. pub async fn get_trajectory( &self, cascade_id: &str, ) -> Result<(u16, serde_json::Value), String> { let body = serde_json::json!({"cascadeId": cascade_id}); self.call_json("GetCascadeTrajectory", &body).await } /// Open a server-streaming reactive updates RPC. /// `rpc_method` is the ConnectRPC method name, e.g. "StreamCascadeReactiveUpdates". async fn stream_reactive_rpc( &self, rpc_method: &str, cascade_id: &str, ) -> Result, String> { let (base, csrf) = { let guard = self.inner.read().await; ( format!("https://127.0.0.1:{}", guard.https_port), guard.csrf.clone(), ) }; let url = format!("{base}/{LS_SERVICE}/{rpc_method}"); let body = serde_json::json!({ "protocolVersion": 1, "id": cascade_id, }); let mut headers = Self::common_headers(&csrf); headers.insert("Content-Type", HeaderValue::from_static("application/connect+json")); headers.insert("Connect-Protocol-Version", HeaderValue::from_static("1")); // Connect protocol envelope: [flags:1][length:4][payload] let json_bytes = serde_json::to_vec(&body).unwrap(); let mut envelope = Vec::with_capacity(5 + json_bytes.len()); envelope.push(0x00); envelope.extend_from_slice(&(json_bytes.len() as u32).to_be_bytes()); envelope.extend_from_slice(&json_bytes); let mut resp = self .client .post(&url) .headers(headers) .body(envelope) .send() .await .map_err(|e| format!("{rpc_method} HTTP error: {e}"))?; let status = resp.status().as_u16(); if status != 200 { let err_body = resp.bytes().await.unwrap_or_default(); let err_text = String::from_utf8_lossy(&err_body); return Err(format!("{rpc_method} failed: {status} — {err_text}")); } let resp_ct = resp.headers() .get("content-type") .and_then(|v| v.to_str().ok()) .unwrap_or("unknown") .to_string(); debug!("{rpc_method}: connected for cascade {cascade_id}, content-type: {resp_ct}"); let (tx, rx) = tokio::sync::mpsc::channel::(64); let method = rpc_method.to_string(); let cid = cascade_id.to_string(); tokio::spawn(async move { let mut buf = Vec::new(); while let Ok(Some(chunk)) = resp.chunk().await { if chunk.is_empty() { continue; } buf.extend_from_slice(&chunk); while buf.len() >= 5 { let flags = buf[0]; let len = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize; if buf.len() < 5 + len { break; } let payload = &buf[5..5 + len]; if flags == 0x02 { let text = String::from_utf8_lossy(payload); debug!("{method}: end frame: {text}"); } else if let Ok(json) = serde_json::from_slice::(payload) { if tx.send(json).await.is_err() { buf.drain(..5 + len); break; } } buf.drain(..5 + len); } } debug!("{method}: stream ended for {cid}"); }); Ok(rx) } /// StreamCascadeReactiveUpdates — real-time cascade state diffs. pub async fn stream_cascade_updates( &self, cascade_id: &str, ) -> Result, String> { self.stream_reactive_rpc("StreamCascadeReactiveUpdates", cascade_id).await } } // ─── Discovery helpers ─────────────────────────────────────────────────────── fn discover() -> Result { // Try to find the real LS binary first (when MITM wrapper is installed, // the wrapper is a shell script named language_server_linux_x64, while // the real binary is language_server_linux_x64.real) let pid_output = Command::new("sh") .args(["-c", "pgrep -f 'language_server_linux_x64\\.real' | head -1"]) .output() .map_err(|e| format!("pgrep failed: {e}"))?; let mut pid = String::from_utf8_lossy(&pid_output.stdout) .trim() .to_string(); // Fallback: find any language_server_linux process if pid.is_empty() { let pid_output = Command::new("sh") .args(["-c", "pgrep -f language_server_linux | head -1"]) .output() .map_err(|e| format!("pgrep failed: {e}"))?; pid = String::from_utf8_lossy(&pid_output.stdout) .trim() .to_string(); } if pid.is_empty() { return Err("Language server not running".to_string()); } let cmdline = fs::read(format!("/proc/{pid}/cmdline")) .map_err(|e| format!("Can't read cmdline for PID {pid}: {e}"))?; let args: Vec<&[u8]> = cmdline.split(|&b| b == 0).collect(); let mut csrf = String::new(); for (i, arg) in args.iter().enumerate() { if let Ok(s) = std::str::from_utf8(arg) { if s == "--csrf_token" { if let Some(next) = args.get(i + 1) { if let Ok(token) = std::str::from_utf8(next) { csrf = token.to_string(); } } } } } let csrf_preview = safe_truncate(&csrf, 8); debug!("Discovered LS PID={pid}, CSRF={csrf_preview}"); let log_base = log_base(); let mut https_port = String::new(); if let Ok(mut entries) = fs::read_dir(&log_base) { let mut dirs: Vec = Vec::new(); while let Some(Ok(entry)) = entries.next() { let name = entry.file_name().to_string_lossy().to_string(); if name.starts_with("202") { dirs.push(name); } } dirs.sort_unstable_by(|a, b| b.cmp(a)); static PORT_RE: LazyLock = LazyLock::new(|| regex::Regex::new(r"port at (\d+) for HTTPS").unwrap()); for d in &dirs { let log_path = format!( "{log_base}/{d}/window1/exthost/google.antigravity/Antigravity.log" ); if let Ok(contents) = fs::read_to_string(&log_path) { for line in contents.lines() { if line.contains(&pid) && line.contains("listening") && line.contains("HTTPS") { if let Some(caps) = PORT_RE.captures(line) { https_port = caps[1].to_string(); } } } if !https_port.is_empty() { break; } } } } if https_port.is_empty() { // Fallback: find the LS HTTPS port via `ss` (when log file hasn't caught up) if let Ok(output) = std::process::Command::new("ss") .args(["-tlnp"]) .output() { let ss_out = String::from_utf8_lossy(&output.stdout); // Find listening ports for this PID — typically the first is HTTPS for line in ss_out.lines() { if line.contains(&format!("pid={pid},")) { // Extract port from "127.0.0.1:PORT" if let Some(addr) = line.split_whitespace().nth(3) { if let Some(port_str) = addr.rsplit(':').next() { if let Ok(p) = port_str.parse::() { info!(port = p, "Discovered LS HTTPS port via ss"); https_port = p.to_string(); break; } } } } } } } if https_port.is_empty() { warn!("Could not find HTTPS port in logs, defaulting to 3100"); https_port = "3100".to_string(); } let oauth_token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN") .ok() .filter(|s| !s.is_empty()) .or_else(|| { let home = std::env::var("HOME").unwrap_or_default(); let path = format!("{home}/.config/antigravity-proxy-token"); fs::read_to_string(&path) .ok() .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) }) .unwrap_or_default(); Ok(BackendInner { pid, csrf, https_port, oauth_token, }) } /// Shorthand for HeaderValue (panics on invalid — only for known-safe static values). fn hv(s: &str) -> HeaderValue { HeaderValue::from_str(s).expect("invalid header value in static constant") } /// Decompress response bytes based on Content-Encoding header. fn decompress(method: &str, data: &[u8], encoding: &str) -> Vec { let mut out = Vec::new(); let res = match encoding { "gzip" => GzDecoder::new(data).read_to_end(&mut out), "deflate" => DeflateDecoder::new(data).read_to_end(&mut out), "br" => brotli::Decompressor::new(data, 4096).read_to_end(&mut out), _ => return data.to_vec(), }; match res { Ok(_) => out, Err(e) => { if !encoding.is_empty() { let preview = String::from_utf8_lossy(&data[..data.len().min(100)]); warn!("{method}: {encoding} decompress failed ({} bytes): {e}. Raw: {}", data.len(), preview); } data.to_vec() } } }