From 5f40385c8df81c57e49ff73d54047cc70761361e Mon Sep 17 00:00:00 2001 From: Nikketryhard Date: Sun, 15 Feb 2026 23:24:43 -0600 Subject: [PATCH] feat: sudoless MITM via LD_PRELOAD DNS redirect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hook getaddrinfo() via LD_PRELOAD to redirect Google API domain resolution to 127.0.0.1, combined with a port-modified endpoint URL. This makes the LS connect directly to the local MITM proxy for ALL API calls - even the CodeAssistClient which has Proxy:nil hardcoded. Architecture: LS → DNS: googleapis.com → 127.0.0.1 (hooked via getaddrinfo) → Connect: 127.0.0.1:MITM_PORT (from -cloud_code_endpoint) → MITM proxy intercepts transparent TLS via SNI → Forward to real Google API Key findings from investigation: - Go uses raw syscalls for connect() (NOT hookable via LD_PRELOAD) - Go uses libc getaddrinfo() for DNS (hookable via CGO path) - dns_redirect.so is compiled from embedded C source on first run - No iptables, no sudo, no CAP_NET_BIND_SERVICE needed --- src/mitm/dns_redirect.c | 74 +++++++ src/standalone.rs | 467 +++++++++++++++++++++++++++++++++------- 2 files changed, 461 insertions(+), 80 deletions(-) create mode 100644 src/mitm/dns_redirect.c diff --git a/src/mitm/dns_redirect.c b/src/mitm/dns_redirect.c new file mode 100644 index 0000000..660535a --- /dev/null +++ b/src/mitm/dns_redirect.c @@ -0,0 +1,74 @@ +/* + * DNS redirect preload library for headless MITM interception. + * + * Hooks getaddrinfo() via LD_PRELOAD to redirect Google API domain + * resolution to 127.0.0.1, so the LS connects to our local MITM proxy + * instead of the real Google servers. + * + * This works because the LS binary (Go + CGO) uses libc's getaddrinfo() + * for DNS resolution, even though it uses raw syscalls for connect(). + * + * Build: gcc -shared -fPIC -o dns_redirect.so dns_redirect.c -ldl + * + * Environment variables: + * DNS_REDIRECT_LOG — optional path to log redirected lookups + * + * The list of redirected domains is compiled in. Only exact matches + * are redirected; all other lookups pass through to the real resolver. + */ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Google API domains whose DNS resolution should be redirected to 127.0.0.1 */ +static const char *REDIRECT_DOMAINS[] = { + "daily-cloudcode-pa.googleapis.com", + "cloudcode-pa.googleapis.com", + "autopush-cloudcode-pa.googleapis.com", + NULL +}; + +typedef int (*getaddrinfo_t)(const char *, const char *, + const struct addrinfo *, struct addrinfo **); + +static int should_redirect(const char *host) { + if (!host) return 0; + for (int i = 0; REDIRECT_DOMAINS[i]; i++) { + if (strcmp(host, REDIRECT_DOMAINS[i]) == 0) return 1; + } + return 0; +} + +int getaddrinfo(const char *node, const char *service, + const struct addrinfo *hints, struct addrinfo **res) { + getaddrinfo_t real_getaddrinfo = + (getaddrinfo_t)dlsym(RTLD_NEXT, "getaddrinfo"); + + if (should_redirect(node)) { + /* Optional logging */ + const char *log_path = getenv("DNS_REDIRECT_LOG"); + if (log_path) { + int fd = open(log_path, O_WRONLY | O_CREAT | O_APPEND, 0666); + if (fd >= 0) { + char buf[256]; + int len = snprintf(buf, sizeof(buf), + "[dns_redirect] %s -> 127.0.0.1\n", node); + write(fd, buf, len); + close(fd); + } + } + + /* Resolve to localhost — the MITM proxy is listening there */ + return real_getaddrinfo("127.0.0.1", service, hints, res); + } + + return real_getaddrinfo(node, service, hints, res); +} diff --git a/src/standalone.rs b/src/standalone.rs index b341b6c..776a3bb 100644 --- a/src/standalone.rs +++ b/src/standalone.rs @@ -27,6 +27,59 @@ const DATA_DIR: &str = "/tmp/antigravity-standalone"; /// System user for UID-scoped iptables isolation. const LS_USER: &str = "antigravity-ls"; +/// Path for the compiled dns_redirect.so preload library. +const DNS_REDIRECT_SO_PATH: &str = "/tmp/antigravity-dns-redirect.so"; + +/// Source file for the DNS redirect preload library (relative to binary). +const DNS_REDIRECT_C_SOURCE: &str = include_str!("mitm/dns_redirect.c"); + +/// Build the dns_redirect.so preload library if it doesn't already exist. +/// +/// The library hooks `getaddrinfo()` via LD_PRELOAD to redirect Google API +/// domain lookups to 127.0.0.1. This is needed because the LS binary uses +/// CGO for DNS resolution (libc getaddrinfo) but raw syscalls for connect(), +/// so only DNS can be intercepted via LD_PRELOAD. +/// +/// Returns the path to the .so on success, None on failure. +fn build_dns_redirect_so() -> Option { + let so_path = DNS_REDIRECT_SO_PATH; + + // Skip rebuild if already exists + if std::path::Path::new(so_path).exists() { + return Some(so_path.to_string()); + } + + // Write C source to a temp file + let c_path = format!("{so_path}.c"); + if let Err(e) = std::fs::write(&c_path, DNS_REDIRECT_C_SOURCE) { + tracing::warn!("Failed to write dns_redirect.c: {e}"); + return None; + } + + // Compile: gcc -shared -fPIC -o dns_redirect.so dns_redirect.c -ldl + let output = Command::new("gcc") + .args(["-shared", "-fPIC", "-o", so_path, &c_path, "-ldl"]) + .output(); + + match output { + Ok(out) if out.status.success() => { + info!("Built dns_redirect.so at {so_path}"); + // Clean up source + let _ = std::fs::remove_file(&c_path); + Some(so_path.to_string()) + } + Ok(out) => { + let stderr = String::from_utf8_lossy(&out.stderr); + tracing::warn!("Failed to compile dns_redirect.so: {stderr}"); + None + } + Err(e) => { + tracing::warn!("gcc not found, cannot build dns_redirect.so: {e}"); + None + } + } +} + /// A running standalone LS process. pub struct StandaloneLS { child: Child, @@ -64,7 +117,7 @@ pub fn generate_standalone_config() -> MainLSConfig { /// Optional MITM proxy config for the standalone LS. pub struct StandaloneMitmConfig { - pub proxy_addr: String, // e.g. "http://127.0.0.1:8742" + pub proxy_addr: String, // Full URL with scheme, e.g. "http://127.0.0.1:8742" pub ca_cert_path: String, // path to MITM CA .pem } @@ -84,6 +137,7 @@ impl StandaloneLS { // Kill any orphaned LS processes from previous runs cleanup_orphaned_ls(); let port = find_free_port()?; + let lsp_port = find_free_port()?; let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -102,13 +156,47 @@ impl StandaloneLS { // Setup data dir (mode 1777 so both current user and antigravity-ls can write) let gemini_dir = format!("{DATA_DIR}/.gemini"); - std::fs::create_dir_all(&gemini_dir) - .map_err(|e| format!("Failed to create standalone data dir: {e}"))?; - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let _ = std::fs::set_permissions(DATA_DIR, std::fs::Permissions::from_mode(0o1777)); - let _ = std::fs::set_permissions(&gemini_dir, std::fs::Permissions::from_mode(0o1777)); + let app_data_dir = format!("{DATA_DIR}/.gemini/antigravity-standalone"); + let annotations_dir = format!("{app_data_dir}/annotations"); + let brain_dir = format!("{app_data_dir}/brain"); + for dir in [DATA_DIR, &gemini_dir, &app_data_dir, &annotations_dir, &brain_dir] { + let _ = std::fs::create_dir_all(dir); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let _ = std::fs::set_permissions(dir, std::fs::Permissions::from_mode(0o1777)); + } + } + // Check if data dir is writable by writing a test file. + // Old runs as `antigravity-ls` user leave dirs owned by that user. + let test_path = format!("{app_data_dir}/.write_test"); + if std::fs::write(&test_path, b"ok").is_err() { + eprintln!( + "\n ⚠ Data dir {} is not writable (owned by another user from previous sudo run)\n \ + Fix with: sudo chmod -R a+rwX {}\n", + app_data_dir, DATA_DIR + ); + } else { + let _ = std::fs::remove_file(&test_path); + } + + // Pre-seed user_settings.pb with detect_and_use_proxy = ENABLED. + // The LS reads this at startup when creating its HTTP transport. + // Without it, the LS ignores HTTPS_PROXY and API traffic bypasses MITM. + // UserSettings proto: field 34 (varint) = 1 (DETECT_AND_USE_PROXY_ENABLED) + // Tag: (34 << 3) | 0 = 272 → varint [0x90, 0x02] + // Value: 1 → varint [0x01] + let settings_path = format!("{app_data_dir}/user_settings.pb"); + let settings_bytes: &[u8] = &[0x90, 0x02, 0x01]; + if let Err(e) = std::fs::write(&settings_path, settings_bytes) { + tracing::warn!("Failed to pre-seed user_settings.pb: {e}"); + } else { + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let _ = std::fs::set_permissions(&settings_path, std::fs::Permissions::from_mode(0o0666)); + } + tracing::info!("Pre-seeded user_settings.pb (detect_and_use_proxy=ENABLED)"); } // In headless mode, spawn a stub TCP listener to serve as the extension server. @@ -128,20 +216,37 @@ impl StandaloneLS { .map_err(|e| format!("Failed to get stub port: {e}"))? .port(); info!(port = actual_port, "Stub extension server listening (headless)"); - // Read OAuth token for serving via stub (same sources as backend) - let oauth_token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN") - .ok() - .filter(|s| !s.is_empty()) - .or_else(|| { - let home = std::env::var("HOME").unwrap_or_default(); - let path = format!("{home}/.config/antigravity-proxy-token"); - std::fs::read_to_string(&path) - .ok() - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) + // Read OAuth state from Antigravity's state.vscdb if available. + // The DB stores the exact Topic proto (access_token + refresh_token + expiry) + // which lets the LS auto-refresh tokens via its built-in Google OAuth2 client. + let (oauth_token, oauth_topic_bytes) = read_oauth_from_state_db() + .map(|(token, topic)| { + info!("Loaded OAuth token from Antigravity state.vscdb"); + (token, Some(topic)) }) - .unwrap_or_default(); + .unwrap_or_else(|| { + // Fall back to env var / token file + let token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN") + .ok() + .filter(|s| !s.is_empty()) + .or_else(|| { + let home = std::env::var("HOME").unwrap_or_default(); + let path = format!("{home}/.config/antigravity-proxy-token"); + std::fs::read_to_string(&path) + .ok() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + }) + .unwrap_or_default(); + if !token.is_empty() { + info!("Loaded OAuth token from env/file (no refresh token — manual refresh needed)"); + } else { + eprintln!("[headless] ⚠ No OAuth token found. Login to Antigravity first, or set ANTIGRAVITY_OAUTH_TOKEN"); + } + (token, None) + }); let oauth_arc = std::sync::Arc::new(oauth_token); + let topic_arc = std::sync::Arc::new(oauth_topic_bytes); // Spawn a thread to accept connections (just hold them open) let listener_clone = listener .try_clone() @@ -151,9 +256,10 @@ impl StandaloneLS { match stream { Ok(conn) => { let token = std::sync::Arc::clone(&oauth_arc); + let topic = std::sync::Arc::clone(&topic_arc); // Handle each connection in its own thread std::thread::spawn(move || { - stub_handle_connection(conn, &token); + stub_handle_connection(conn, &token, &topic); }); } Err(_) => break, @@ -178,10 +284,11 @@ impl StandaloneLS { }; // LS args — NO -standalone flag (it disables TCP listeners entirely) + // NOTE: do NOT use -random_port — it overrides -server_port and the LS + // would listen on a random port we can't discover. let args = vec![ "-enable_lsp".to_string(), - "-random_port".to_string(), - "-lsp_port=0".to_string(), + format!("-lsp_port={}", lsp_port), "-extension_server_port".to_string(), ext_port, "-csrf_token".to_string(), @@ -191,7 +298,22 @@ impl StandaloneLS { "-workspace_id".to_string(), format!("standalone_{ts}"), "-cloud_code_endpoint".to_string(), - "https://daily-cloudcode-pa.googleapis.com".to_string(), + // When MITM is active, append the MITM port to the endpoint URL. + // The LS's CodeAssistClient ignores HTTPS_PROXY (hardcoded Proxy:nil), + // so we redirect at the DNS+port level instead: + // 1. LD_PRELOAD hooks getaddrinfo() → 127.0.0.1 for API domains + // 2. Custom port in URL → LS connects to 127.0.0.1:MITM_PORT + // 3. MITM proxy intercepts the transparent TLS connection via SNI + if let Some(mitm) = mitm_config { + // Extract port from proxy_addr (e.g. "http://127.0.0.1:8742" → "8742") + let mitm_port = mitm.proxy_addr + .rsplit(':') + .next() + .unwrap_or("8742"); + format!("https://daily-cloudcode-pa.googleapis.com:{mitm_port}") + } else { + "https://daily-cloudcode-pa.googleapis.com".to_string() + }, "-app_data_dir".to_string(), "antigravity-standalone".to_string(), "-gemini_dir".to_string(), @@ -242,8 +364,23 @@ impl StandaloneLS { // With iptables, all outbound traffic is transparently redirected at the // kernel level — setting HTTPS_PROXY on top causes double-proxying. if headless || !has_ls_user() { - env_vars.push(("HTTPS_PROXY".into(), format!("http://{}", mitm.proxy_addr))); - env_vars.push(("HTTP_PROXY".into(), format!("http://{}", mitm.proxy_addr))); + // proxy_addr already includes the scheme (e.g. "http://127.0.0.1:8742") + env_vars.push(("HTTPS_PROXY".into(), mitm.proxy_addr.clone())); + env_vars.push(("HTTP_PROXY".into(), mitm.proxy_addr.clone())); + + // LD_PRELOAD DNS redirect: hooks getaddrinfo() so Google API domains + // resolve to 127.0.0.1. Combined with the port-modified endpoint URL, + // this makes the LS connect to our MITM proxy for ALL API calls — + // even the CodeAssistClient which has Proxy:nil hardcoded. + let so_path = build_dns_redirect_so(); + if let Some(so) = so_path { + info!(path = %so, "Enabling LD_PRELOAD DNS redirect for headless MITM"); + env_vars.push(("LD_PRELOAD".into(), so)); + env_vars.push(( + "DNS_REDIRECT_LOG".into(), + "/tmp/antigravity-dns-redirect.log".into(), + )); + } } } @@ -643,6 +780,167 @@ fn cleanup_orphaned_ls() { } } +/// Read OAuth token state directly from Antigravity's state.vscdb. +/// +/// The DB stores the exact Topic proto bytes under key `antigravityUnifiedStateSync.oauthToken`. +/// This includes access_token + refresh_token + expiry, allowing the LS to auto-refresh. +/// Returns (access_token, topic_proto_bytes) or None if unavailable. +fn read_oauth_from_state_db() -> Option<(String, Vec)> { + use base64::Engine; + + let home = std::env::var("HOME").ok()?; + let db_path = format!("{home}/.config/Antigravity/User/globalStorage/state.vscdb"); + + // Check the DB file exists + if !std::path::Path::new(&db_path).exists() { + return None; + } + + // Read the Topic proto (base64-encoded in the DB) + let output = std::process::Command::new("sqlite3") + .args([ + &db_path, + "SELECT value FROM ItemTable WHERE key='antigravityUnifiedStateSync.oauthToken'", + ]) + .output() + .ok()?; + + if !output.status.success() { + return None; + } + + let b64_str = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if b64_str.is_empty() { + return None; + } + + // Decode the base64 to get the raw Topic proto bytes + let topic_bytes = base64::engine::general_purpose::STANDARD + .decode(&b64_str) + .ok()?; + + if topic_bytes.is_empty() { + return None; + } + + // Extract the access_token from the OAuthTokenInfo inside the Topic proto. + // The inner value (Row.value) is also base64, containing a serialized OAuthTokenInfo. + // For the access_token (used by GetSecretValue), we can read it from the authStatus. + let access_token = read_access_token_from_auth_status(&db_path) + .or_else(|| extract_access_token_from_topic(&topic_bytes)) + .unwrap_or_default(); + + Some((access_token, topic_bytes)) +} + +/// Read the current access token from `antigravityAuthStatus` in state.vscdb. +/// This JSON object has an `apiKey` field with the latest access token. +fn read_access_token_from_auth_status(db_path: &str) -> Option { + let output = std::process::Command::new("sqlite3") + .args([ + db_path, + "SELECT value FROM ItemTable WHERE key='antigravityAuthStatus'", + ]) + .output() + .ok()?; + + if !output.status.success() { + return None; + } + + let json_str = String::from_utf8_lossy(&output.stdout).trim().to_string(); + // Simple extraction: find "apiKey":"..." pattern + let marker = "\"apiKey\":\""; + let start = json_str.find(marker)? + marker.len(); + let end = json_str[start..].find('"')? + start; + let api_key = &json_str[start..end]; + if api_key.starts_with("ya29.") { + Some(api_key.to_string()) + } else { + None + } +} + +/// Extract access_token from the Topic proto bytes by finding the inner +/// base64-encoded OAuthTokenInfo and decoding its first string field. +fn extract_access_token_from_topic(topic_bytes: &[u8]) -> Option { + use base64::Engine; + + // Find long base64 strings in the proto (the Row.value field) + // Simple approach: convert to string and find base64 pattern + let as_str = String::from_utf8_lossy(topic_bytes); + // The base64 OAuthTokenInfo starts with "Co" (0x0A = field 1, len-delimited) + for segment in as_str.split(|c: char| !c.is_alphanumeric() && c != '+' && c != '/' && c != '=') { + if segment.len() > 50 { + if let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(segment) { + // Try to extract field 1 (access_token) from the OAuthTokenInfo proto + if let Some(token) = extract_proto_string_from_bytes(&decoded, 1) { + if token.starts_with("ya29.") { + return Some(token); + } + } + } + } + } + None +} + +/// Extract a string field from raw protobuf bytes by field number. +fn extract_proto_string_from_bytes(buf: &[u8], target_field: u32) -> Option { + let mut i = 0; + while i < buf.len() { + let (tag, bytes_read) = decode_varint_at(buf, i)?; + i += bytes_read; + let field_num = (tag >> 3) as u32; + let wire_type = (tag & 0x07) as u8; + match wire_type { + 0 => { + // varint — skip it + let (_, vr) = decode_varint_at(buf, i)?; + i += vr; + } + 2 => { + // length-delimited + let (len, lr) = decode_varint_at(buf, i)?; + i += lr; + let len = len as usize; + if i + len > buf.len() { + return None; + } + if field_num == target_field { + return String::from_utf8(buf[i..i + len].to_vec()).ok(); + } + i += len; + } + _ => return None, // unsupported wire type + } + } + None +} + +/// Decode a varint from a byte slice at the given offset. +/// Returns (value, bytes_consumed). +fn decode_varint_at(buf: &[u8], offset: usize) -> Option<(u64, usize)> { + let mut val: u64 = 0; + let mut shift = 0u32; + let mut i = offset; + loop { + if i >= buf.len() { + return None; + } + let b = buf[i]; + i += 1; + val |= ((b & 0x7F) as u64) << shift; + if b & 0x80 == 0 { + return Some((val, i - offset)); + } + shift += 7; + if shift >= 64 { + return None; + } + } +} + /// Handle a single connection from the LS to the stub extension server. /// /// The LS uses Connect RPC (HTTP/1.1, ServerStream) to call ExtensionServerService methods. @@ -653,7 +951,7 @@ fn cleanup_orphaned_ls() { /// IMPORTANT: `SubscribeToUnifiedStateSyncTopic` is a long-lived stream. /// If we immediately close it, the LS reconnects in a tight loop and never /// proceeds to fetch OAuth tokens. We keep subscription connections OPEN. -fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) { +fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str, oauth_topic_bytes: &Option>) { use std::io::{BufRead, BufReader, Read, Write}; let mut reader = BufReader::new(match conn.try_clone() { @@ -780,58 +1078,60 @@ fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) { // Map entry: { string key = 1, Row value = 2 } let mut initial_state_bytes = Vec::new(); - if topic_name == "uss-oauth" && !oauth_token.is_empty() { - // Row.value is base64-encoded protobuf OAuthTokenInfo: - // OAuthTokenInfo { - // string access_token = 1; - // string token_type = 2; - // string refresh_token = 3; - // google.protobuf.Timestamp expiry = 4; - // } - let mut oauth_proto = Vec::new(); - // field 1 (access_token), LEN - oauth_proto.push(0x0A); - encode_varint(&mut oauth_proto, oauth_token.len() as u64); - oauth_proto.extend_from_slice(oauth_token.as_bytes()); - // field 2 (token_type), LEN - let token_type = b"Bearer"; - oauth_proto.push(0x12); - encode_varint(&mut oauth_proto, token_type.len() as u64); - oauth_proto.extend_from_slice(token_type); - // field 4 (expiry) = Timestamp { seconds = 4_102_444_800 } (year 2099-12-31) - // Timestamp proto: field 1 (seconds) varint - let mut ts_proto = Vec::new(); - ts_proto.push(0x08); // field 1 (seconds), varint - encode_varint(&mut ts_proto, 4_102_444_800u64); - oauth_proto.push(0x22); // field 4 (expiry), LEN - encode_varint(&mut oauth_proto, ts_proto.len() as u64); - oauth_proto.extend_from_slice(&ts_proto); + if topic_name == "uss-oauth" { + if let Some(topic_bytes) = oauth_topic_bytes { + // Use the exact Topic proto from Antigravity's state.vscdb. + // This includes access_token + refresh_token + expiry, so the + // LS can auto-refresh tokens via its built-in Google OAuth2 client. + initial_state_bytes = topic_bytes.clone(); + eprintln!("[stub-ext] using state.vscdb topic ({} bytes)", topic_bytes.len()); + } else if !oauth_token.is_empty() { + // Manual token fallback — construct OAuthTokenInfo with far-future expiry + // (no refresh_token, so the LS can't auto-refresh) + let mut oauth_proto = Vec::new(); + // field 1 (access_token), LEN + oauth_proto.push(0x0A); + encode_varint(&mut oauth_proto, oauth_token.len() as u64); + oauth_proto.extend_from_slice(oauth_token.as_bytes()); + // field 2 (token_type), LEN + let token_type = b"Bearer"; + oauth_proto.push(0x12); + encode_varint(&mut oauth_proto, token_type.len() as u64); + oauth_proto.extend_from_slice(token_type); + // field 4 (expiry) = Timestamp { seconds = 4_102_444_800 } (year 2099-12-31) + let mut ts_proto = Vec::new(); + ts_proto.push(0x08); // field 1 (seconds), varint + encode_varint(&mut ts_proto, 4_102_444_800u64); + oauth_proto.push(0x22); // field 4 (expiry), LEN + encode_varint(&mut oauth_proto, ts_proto.len() as u64); + oauth_proto.extend_from_slice(&ts_proto); - use base64::Engine; - let b64_value = base64::engine::general_purpose::STANDARD.encode(&oauth_proto); + use base64::Engine; + let b64_value = base64::engine::general_purpose::STANDARD.encode(&oauth_proto); - // Build Row { value = b64_value, e_tag = 1 } - let mut row = Vec::new(); - row.push(0x0A); // field 1 (value), LEN - encode_varint(&mut row, b64_value.len() as u64); - row.extend_from_slice(b64_value.as_bytes()); - row.push(0x10); // field 2 (e_tag), varint - row.push(0x01); + // Build Row { value = b64_value, e_tag = 1 } + let mut row = Vec::new(); + row.push(0x0A); // field 1 (value), LEN + encode_varint(&mut row, b64_value.len() as u64); + row.extend_from_slice(b64_value.as_bytes()); + row.push(0x10); // field 2 (e_tag), varint + row.push(0x01); - // Build map entry: { key = "oauthTokenInfoSentinelKey", value = row } - let key_str = b"oauthTokenInfoSentinelKey"; - let mut map_entry = Vec::new(); - map_entry.push(0x0A); // field 1 (key), LEN - encode_varint(&mut map_entry, key_str.len() as u64); - map_entry.extend_from_slice(key_str); - map_entry.push(0x12); // field 2 (value = Row), LEN - encode_varint(&mut map_entry, row.len() as u64); - map_entry.extend_from_slice(&row); + // Build map entry: { key = "oauthTokenInfoSentinelKey", value = row } + let key_str = b"oauthTokenInfoSentinelKey"; + let mut map_entry = Vec::new(); + map_entry.push(0x0A); // field 1 (key), LEN + encode_varint(&mut map_entry, key_str.len() as u64); + map_entry.extend_from_slice(key_str); + map_entry.push(0x12); // field 2 (value = Row), LEN + encode_varint(&mut map_entry, row.len() as u64); + map_entry.extend_from_slice(&row); - // Build Topic { data = [map_entry] } - initial_state_bytes.push(0x0A); // field 1 (data map), LEN - encode_varint(&mut initial_state_bytes, map_entry.len() as u64); - initial_state_bytes.extend_from_slice(&map_entry); + // Build Topic { data = [map_entry] } + initial_state_bytes.push(0x0A); // field 1 (data map), LEN + encode_varint(&mut initial_state_bytes, map_entry.len() as u64); + initial_state_bytes.extend_from_slice(&map_entry); + } } // Build UnifiedStateSyncUpdate { initial_state = initial_state_bytes } @@ -859,12 +1159,19 @@ fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) { // (applied_update removed — data is in initial_state) - // Hold the connection open — LS will reconnect if it drops. - // Don't send empty envelopes (LS rejects nil update type). + // Keep the stream alive with periodic valid messages. + // The LS has a ~10s read timeout on streams. After the initial_state, + // the LS only accepts AppliedUpdate (field 2 in the oneof). + // We send an empty AppliedUpdate {} every 5s as keepalive. + // + // AppliedUpdate is field 2 (wire type 2 = length-delimited), so: + // 0x12 = (field 2 << 3) | 2, 0x00 = length 0 + // This creates: UnifiedStateSyncUpdate { applied_update: AppliedUpdate {} } + let keepalive_proto: &[u8] = &[0x12, 0x00]; // field 2 (applied_update), LEN=0 + let keepalive_env = make_envelope(keepalive_proto); loop { - std::thread::sleep(std::time::Duration::from_secs(60)); - // Probe: try writing zero bytes to detect closed connection - if writer.write_all(b"").is_err() { + std::thread::sleep(std::time::Duration::from_secs(5)); + if !send_chunk(&mut writer, &keepalive_env) { break; } }