feat: sudoless MITM via LD_PRELOAD DNS redirect

Hook getaddrinfo() via LD_PRELOAD to redirect Google API domain
resolution to 127.0.0.1, combined with a port-modified endpoint URL.
This makes the LS connect directly to the local MITM proxy for ALL
API calls - even the CodeAssistClient which has Proxy:nil hardcoded.

Architecture:
  LS → DNS: googleapis.com → 127.0.0.1 (hooked via getaddrinfo)
     → Connect: 127.0.0.1:MITM_PORT (from -cloud_code_endpoint)
     → MITM proxy intercepts transparent TLS via SNI
     → Forward to real Google API

Key findings from investigation:
- Go uses raw syscalls for connect() (NOT hookable via LD_PRELOAD)
- Go uses libc getaddrinfo() for DNS (hookable via CGO path)
- dns_redirect.so is compiled from embedded C source on first run
- No iptables, no sudo, no CAP_NET_BIND_SERVICE needed
This commit is contained in:
Nikketryhard
2026-02-15 23:24:43 -06:00
parent 6a07786c4e
commit 5f40385c8d
2 changed files with 461 additions and 80 deletions

74
src/mitm/dns_redirect.c Normal file
View File

@@ -0,0 +1,74 @@
/*
* DNS redirect preload library for headless MITM interception.
*
* Hooks getaddrinfo() via LD_PRELOAD to redirect Google API domain
* resolution to 127.0.0.1, so the LS connects to our local MITM proxy
* instead of the real Google servers.
*
* This works because the LS binary (Go + CGO) uses libc's getaddrinfo()
* for DNS resolution, even though it uses raw syscalls for connect().
*
* Build: gcc -shared -fPIC -o dns_redirect.so dns_redirect.c -ldl
*
* Environment variables:
* DNS_REDIRECT_LOG — optional path to log redirected lookups
*
* The list of redirected domains is compiled in. Only exact matches
* are redirected; all other lookups pass through to the real resolver.
*/
#define _GNU_SOURCE
#include <dlfcn.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
/* Google API domains whose DNS resolution should be redirected to 127.0.0.1 */
static const char *REDIRECT_DOMAINS[] = {
"daily-cloudcode-pa.googleapis.com",
"cloudcode-pa.googleapis.com",
"autopush-cloudcode-pa.googleapis.com",
NULL
};
typedef int (*getaddrinfo_t)(const char *, const char *,
const struct addrinfo *, struct addrinfo **);
static int should_redirect(const char *host) {
if (!host) return 0;
for (int i = 0; REDIRECT_DOMAINS[i]; i++) {
if (strcmp(host, REDIRECT_DOMAINS[i]) == 0) return 1;
}
return 0;
}
int getaddrinfo(const char *node, const char *service,
const struct addrinfo *hints, struct addrinfo **res) {
getaddrinfo_t real_getaddrinfo =
(getaddrinfo_t)dlsym(RTLD_NEXT, "getaddrinfo");
if (should_redirect(node)) {
/* Optional logging */
const char *log_path = getenv("DNS_REDIRECT_LOG");
if (log_path) {
int fd = open(log_path, O_WRONLY | O_CREAT | O_APPEND, 0666);
if (fd >= 0) {
char buf[256];
int len = snprintf(buf, sizeof(buf),
"[dns_redirect] %s -> 127.0.0.1\n", node);
write(fd, buf, len);
close(fd);
}
}
/* Resolve to localhost — the MITM proxy is listening there */
return real_getaddrinfo("127.0.0.1", service, hints, res);
}
return real_getaddrinfo(node, service, hints, res);
}

View File

@@ -27,6 +27,59 @@ const DATA_DIR: &str = "/tmp/antigravity-standalone";
/// System user for UID-scoped iptables isolation. /// System user for UID-scoped iptables isolation.
const LS_USER: &str = "antigravity-ls"; const LS_USER: &str = "antigravity-ls";
/// Path for the compiled dns_redirect.so preload library.
const DNS_REDIRECT_SO_PATH: &str = "/tmp/antigravity-dns-redirect.so";
/// Source file for the DNS redirect preload library (relative to binary).
const DNS_REDIRECT_C_SOURCE: &str = include_str!("mitm/dns_redirect.c");
/// Build the dns_redirect.so preload library if it doesn't already exist.
///
/// The library hooks `getaddrinfo()` via LD_PRELOAD to redirect Google API
/// domain lookups to 127.0.0.1. This is needed because the LS binary uses
/// CGO for DNS resolution (libc getaddrinfo) but raw syscalls for connect(),
/// so only DNS can be intercepted via LD_PRELOAD.
///
/// Returns the path to the .so on success, None on failure.
fn build_dns_redirect_so() -> Option<String> {
let so_path = DNS_REDIRECT_SO_PATH;
// Skip rebuild if already exists
if std::path::Path::new(so_path).exists() {
return Some(so_path.to_string());
}
// Write C source to a temp file
let c_path = format!("{so_path}.c");
if let Err(e) = std::fs::write(&c_path, DNS_REDIRECT_C_SOURCE) {
tracing::warn!("Failed to write dns_redirect.c: {e}");
return None;
}
// Compile: gcc -shared -fPIC -o dns_redirect.so dns_redirect.c -ldl
let output = Command::new("gcc")
.args(["-shared", "-fPIC", "-o", so_path, &c_path, "-ldl"])
.output();
match output {
Ok(out) if out.status.success() => {
info!("Built dns_redirect.so at {so_path}");
// Clean up source
let _ = std::fs::remove_file(&c_path);
Some(so_path.to_string())
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
tracing::warn!("Failed to compile dns_redirect.so: {stderr}");
None
}
Err(e) => {
tracing::warn!("gcc not found, cannot build dns_redirect.so: {e}");
None
}
}
}
/// A running standalone LS process. /// A running standalone LS process.
pub struct StandaloneLS { pub struct StandaloneLS {
child: Child, child: Child,
@@ -64,7 +117,7 @@ pub fn generate_standalone_config() -> MainLSConfig {
/// Optional MITM proxy config for the standalone LS. /// Optional MITM proxy config for the standalone LS.
pub struct StandaloneMitmConfig { pub struct StandaloneMitmConfig {
pub proxy_addr: String, // e.g. "http://127.0.0.1:8742" pub proxy_addr: String, // Full URL with scheme, e.g. "http://127.0.0.1:8742"
pub ca_cert_path: String, // path to MITM CA .pem pub ca_cert_path: String, // path to MITM CA .pem
} }
@@ -84,6 +137,7 @@ impl StandaloneLS {
// Kill any orphaned LS processes from previous runs // Kill any orphaned LS processes from previous runs
cleanup_orphaned_ls(); cleanup_orphaned_ls();
let port = find_free_port()?; let port = find_free_port()?;
let lsp_port = find_free_port()?;
let ts = std::time::SystemTime::now() let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@@ -102,13 +156,47 @@ impl StandaloneLS {
// Setup data dir (mode 1777 so both current user and antigravity-ls can write) // Setup data dir (mode 1777 so both current user and antigravity-ls can write)
let gemini_dir = format!("{DATA_DIR}/.gemini"); let gemini_dir = format!("{DATA_DIR}/.gemini");
std::fs::create_dir_all(&gemini_dir) let app_data_dir = format!("{DATA_DIR}/.gemini/antigravity-standalone");
.map_err(|e| format!("Failed to create standalone data dir: {e}"))?; let annotations_dir = format!("{app_data_dir}/annotations");
#[cfg(unix)] let brain_dir = format!("{app_data_dir}/brain");
{ for dir in [DATA_DIR, &gemini_dir, &app_data_dir, &annotations_dir, &brain_dir] {
use std::os::unix::fs::PermissionsExt; let _ = std::fs::create_dir_all(dir);
let _ = std::fs::set_permissions(DATA_DIR, std::fs::Permissions::from_mode(0o1777)); #[cfg(unix)]
let _ = std::fs::set_permissions(&gemini_dir, std::fs::Permissions::from_mode(0o1777)); {
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(dir, std::fs::Permissions::from_mode(0o1777));
}
}
// Check if data dir is writable by writing a test file.
// Old runs as `antigravity-ls` user leave dirs owned by that user.
let test_path = format!("{app_data_dir}/.write_test");
if std::fs::write(&test_path, b"ok").is_err() {
eprintln!(
"\n ⚠ Data dir {} is not writable (owned by another user from previous sudo run)\n \
Fix with: sudo chmod -R a+rwX {}\n",
app_data_dir, DATA_DIR
);
} else {
let _ = std::fs::remove_file(&test_path);
}
// Pre-seed user_settings.pb with detect_and_use_proxy = ENABLED.
// The LS reads this at startup when creating its HTTP transport.
// Without it, the LS ignores HTTPS_PROXY and API traffic bypasses MITM.
// UserSettings proto: field 34 (varint) = 1 (DETECT_AND_USE_PROXY_ENABLED)
// Tag: (34 << 3) | 0 = 272 → varint [0x90, 0x02]
// Value: 1 → varint [0x01]
let settings_path = format!("{app_data_dir}/user_settings.pb");
let settings_bytes: &[u8] = &[0x90, 0x02, 0x01];
if let Err(e) = std::fs::write(&settings_path, settings_bytes) {
tracing::warn!("Failed to pre-seed user_settings.pb: {e}");
} else {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&settings_path, std::fs::Permissions::from_mode(0o0666));
}
tracing::info!("Pre-seeded user_settings.pb (detect_and_use_proxy=ENABLED)");
} }
// In headless mode, spawn a stub TCP listener to serve as the extension server. // In headless mode, spawn a stub TCP listener to serve as the extension server.
@@ -128,20 +216,37 @@ impl StandaloneLS {
.map_err(|e| format!("Failed to get stub port: {e}"))? .map_err(|e| format!("Failed to get stub port: {e}"))?
.port(); .port();
info!(port = actual_port, "Stub extension server listening (headless)"); info!(port = actual_port, "Stub extension server listening (headless)");
// Read OAuth token for serving via stub (same sources as backend) // Read OAuth state from Antigravity's state.vscdb if available.
let oauth_token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN") // The DB stores the exact Topic proto (access_token + refresh_token + expiry)
.ok() // which lets the LS auto-refresh tokens via its built-in Google OAuth2 client.
.filter(|s| !s.is_empty()) let (oauth_token, oauth_topic_bytes) = read_oauth_from_state_db()
.or_else(|| { .map(|(token, topic)| {
let home = std::env::var("HOME").unwrap_or_default(); info!("Loaded OAuth token from Antigravity state.vscdb");
let path = format!("{home}/.config/antigravity-proxy-token"); (token, Some(topic))
std::fs::read_to_string(&path)
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
}) })
.unwrap_or_default(); .unwrap_or_else(|| {
// Fall back to env var / token file
let token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| {
let home = std::env::var("HOME").unwrap_or_default();
let path = format!("{home}/.config/antigravity-proxy-token");
std::fs::read_to_string(&path)
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
})
.unwrap_or_default();
if !token.is_empty() {
info!("Loaded OAuth token from env/file (no refresh token — manual refresh needed)");
} else {
eprintln!("[headless] ⚠ No OAuth token found. Login to Antigravity first, or set ANTIGRAVITY_OAUTH_TOKEN");
}
(token, None)
});
let oauth_arc = std::sync::Arc::new(oauth_token); let oauth_arc = std::sync::Arc::new(oauth_token);
let topic_arc = std::sync::Arc::new(oauth_topic_bytes);
// Spawn a thread to accept connections (just hold them open) // Spawn a thread to accept connections (just hold them open)
let listener_clone = listener let listener_clone = listener
.try_clone() .try_clone()
@@ -151,9 +256,10 @@ impl StandaloneLS {
match stream { match stream {
Ok(conn) => { Ok(conn) => {
let token = std::sync::Arc::clone(&oauth_arc); let token = std::sync::Arc::clone(&oauth_arc);
let topic = std::sync::Arc::clone(&topic_arc);
// Handle each connection in its own thread // Handle each connection in its own thread
std::thread::spawn(move || { std::thread::spawn(move || {
stub_handle_connection(conn, &token); stub_handle_connection(conn, &token, &topic);
}); });
} }
Err(_) => break, Err(_) => break,
@@ -178,10 +284,11 @@ impl StandaloneLS {
}; };
// LS args — NO -standalone flag (it disables TCP listeners entirely) // LS args — NO -standalone flag (it disables TCP listeners entirely)
// NOTE: do NOT use -random_port — it overrides -server_port and the LS
// would listen on a random port we can't discover.
let args = vec![ let args = vec![
"-enable_lsp".to_string(), "-enable_lsp".to_string(),
"-random_port".to_string(), format!("-lsp_port={}", lsp_port),
"-lsp_port=0".to_string(),
"-extension_server_port".to_string(), "-extension_server_port".to_string(),
ext_port, ext_port,
"-csrf_token".to_string(), "-csrf_token".to_string(),
@@ -191,7 +298,22 @@ impl StandaloneLS {
"-workspace_id".to_string(), "-workspace_id".to_string(),
format!("standalone_{ts}"), format!("standalone_{ts}"),
"-cloud_code_endpoint".to_string(), "-cloud_code_endpoint".to_string(),
"https://daily-cloudcode-pa.googleapis.com".to_string(), // When MITM is active, append the MITM port to the endpoint URL.
// The LS's CodeAssistClient ignores HTTPS_PROXY (hardcoded Proxy:nil),
// so we redirect at the DNS+port level instead:
// 1. LD_PRELOAD hooks getaddrinfo() → 127.0.0.1 for API domains
// 2. Custom port in URL → LS connects to 127.0.0.1:MITM_PORT
// 3. MITM proxy intercepts the transparent TLS connection via SNI
if let Some(mitm) = mitm_config {
// Extract port from proxy_addr (e.g. "http://127.0.0.1:8742" → "8742")
let mitm_port = mitm.proxy_addr
.rsplit(':')
.next()
.unwrap_or("8742");
format!("https://daily-cloudcode-pa.googleapis.com:{mitm_port}")
} else {
"https://daily-cloudcode-pa.googleapis.com".to_string()
},
"-app_data_dir".to_string(), "-app_data_dir".to_string(),
"antigravity-standalone".to_string(), "antigravity-standalone".to_string(),
"-gemini_dir".to_string(), "-gemini_dir".to_string(),
@@ -242,8 +364,23 @@ impl StandaloneLS {
// With iptables, all outbound traffic is transparently redirected at the // With iptables, all outbound traffic is transparently redirected at the
// kernel level — setting HTTPS_PROXY on top causes double-proxying. // kernel level — setting HTTPS_PROXY on top causes double-proxying.
if headless || !has_ls_user() { if headless || !has_ls_user() {
env_vars.push(("HTTPS_PROXY".into(), format!("http://{}", mitm.proxy_addr))); // proxy_addr already includes the scheme (e.g. "http://127.0.0.1:8742")
env_vars.push(("HTTP_PROXY".into(), format!("http://{}", mitm.proxy_addr))); env_vars.push(("HTTPS_PROXY".into(), mitm.proxy_addr.clone()));
env_vars.push(("HTTP_PROXY".into(), mitm.proxy_addr.clone()));
// LD_PRELOAD DNS redirect: hooks getaddrinfo() so Google API domains
// resolve to 127.0.0.1. Combined with the port-modified endpoint URL,
// this makes the LS connect to our MITM proxy for ALL API calls —
// even the CodeAssistClient which has Proxy:nil hardcoded.
let so_path = build_dns_redirect_so();
if let Some(so) = so_path {
info!(path = %so, "Enabling LD_PRELOAD DNS redirect for headless MITM");
env_vars.push(("LD_PRELOAD".into(), so));
env_vars.push((
"DNS_REDIRECT_LOG".into(),
"/tmp/antigravity-dns-redirect.log".into(),
));
}
} }
} }
@@ -643,6 +780,167 @@ fn cleanup_orphaned_ls() {
} }
} }
/// Read OAuth token state directly from Antigravity's state.vscdb.
///
/// The DB stores the exact Topic proto bytes under key `antigravityUnifiedStateSync.oauthToken`.
/// This includes access_token + refresh_token + expiry, allowing the LS to auto-refresh.
/// Returns (access_token, topic_proto_bytes) or None if unavailable.
fn read_oauth_from_state_db() -> Option<(String, Vec<u8>)> {
use base64::Engine;
let home = std::env::var("HOME").ok()?;
let db_path = format!("{home}/.config/Antigravity/User/globalStorage/state.vscdb");
// Check the DB file exists
if !std::path::Path::new(&db_path).exists() {
return None;
}
// Read the Topic proto (base64-encoded in the DB)
let output = std::process::Command::new("sqlite3")
.args([
&db_path,
"SELECT value FROM ItemTable WHERE key='antigravityUnifiedStateSync.oauthToken'",
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let b64_str = String::from_utf8_lossy(&output.stdout).trim().to_string();
if b64_str.is_empty() {
return None;
}
// Decode the base64 to get the raw Topic proto bytes
let topic_bytes = base64::engine::general_purpose::STANDARD
.decode(&b64_str)
.ok()?;
if topic_bytes.is_empty() {
return None;
}
// Extract the access_token from the OAuthTokenInfo inside the Topic proto.
// The inner value (Row.value) is also base64, containing a serialized OAuthTokenInfo.
// For the access_token (used by GetSecretValue), we can read it from the authStatus.
let access_token = read_access_token_from_auth_status(&db_path)
.or_else(|| extract_access_token_from_topic(&topic_bytes))
.unwrap_or_default();
Some((access_token, topic_bytes))
}
/// Read the current access token from `antigravityAuthStatus` in state.vscdb.
/// This JSON object has an `apiKey` field with the latest access token.
fn read_access_token_from_auth_status(db_path: &str) -> Option<String> {
let output = std::process::Command::new("sqlite3")
.args([
db_path,
"SELECT value FROM ItemTable WHERE key='antigravityAuthStatus'",
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let json_str = String::from_utf8_lossy(&output.stdout).trim().to_string();
// Simple extraction: find "apiKey":"..." pattern
let marker = "\"apiKey\":\"";
let start = json_str.find(marker)? + marker.len();
let end = json_str[start..].find('"')? + start;
let api_key = &json_str[start..end];
if api_key.starts_with("ya29.") {
Some(api_key.to_string())
} else {
None
}
}
/// Extract access_token from the Topic proto bytes by finding the inner
/// base64-encoded OAuthTokenInfo and decoding its first string field.
fn extract_access_token_from_topic(topic_bytes: &[u8]) -> Option<String> {
use base64::Engine;
// Find long base64 strings in the proto (the Row.value field)
// Simple approach: convert to string and find base64 pattern
let as_str = String::from_utf8_lossy(topic_bytes);
// The base64 OAuthTokenInfo starts with "Co" (0x0A = field 1, len-delimited)
for segment in as_str.split(|c: char| !c.is_alphanumeric() && c != '+' && c != '/' && c != '=') {
if segment.len() > 50 {
if let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(segment) {
// Try to extract field 1 (access_token) from the OAuthTokenInfo proto
if let Some(token) = extract_proto_string_from_bytes(&decoded, 1) {
if token.starts_with("ya29.") {
return Some(token);
}
}
}
}
}
None
}
/// Extract a string field from raw protobuf bytes by field number.
fn extract_proto_string_from_bytes(buf: &[u8], target_field: u32) -> Option<String> {
let mut i = 0;
while i < buf.len() {
let (tag, bytes_read) = decode_varint_at(buf, i)?;
i += bytes_read;
let field_num = (tag >> 3) as u32;
let wire_type = (tag & 0x07) as u8;
match wire_type {
0 => {
// varint — skip it
let (_, vr) = decode_varint_at(buf, i)?;
i += vr;
}
2 => {
// length-delimited
let (len, lr) = decode_varint_at(buf, i)?;
i += lr;
let len = len as usize;
if i + len > buf.len() {
return None;
}
if field_num == target_field {
return String::from_utf8(buf[i..i + len].to_vec()).ok();
}
i += len;
}
_ => return None, // unsupported wire type
}
}
None
}
/// Decode a varint from a byte slice at the given offset.
/// Returns (value, bytes_consumed).
fn decode_varint_at(buf: &[u8], offset: usize) -> Option<(u64, usize)> {
let mut val: u64 = 0;
let mut shift = 0u32;
let mut i = offset;
loop {
if i >= buf.len() {
return None;
}
let b = buf[i];
i += 1;
val |= ((b & 0x7F) as u64) << shift;
if b & 0x80 == 0 {
return Some((val, i - offset));
}
shift += 7;
if shift >= 64 {
return None;
}
}
}
/// Handle a single connection from the LS to the stub extension server. /// Handle a single connection from the LS to the stub extension server.
/// ///
/// The LS uses Connect RPC (HTTP/1.1, ServerStream) to call ExtensionServerService methods. /// The LS uses Connect RPC (HTTP/1.1, ServerStream) to call ExtensionServerService methods.
@@ -653,7 +951,7 @@ fn cleanup_orphaned_ls() {
/// IMPORTANT: `SubscribeToUnifiedStateSyncTopic` is a long-lived stream. /// IMPORTANT: `SubscribeToUnifiedStateSyncTopic` is a long-lived stream.
/// If we immediately close it, the LS reconnects in a tight loop and never /// If we immediately close it, the LS reconnects in a tight loop and never
/// proceeds to fetch OAuth tokens. We keep subscription connections OPEN. /// proceeds to fetch OAuth tokens. We keep subscription connections OPEN.
fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) { fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str, oauth_topic_bytes: &Option<Vec<u8>>) {
use std::io::{BufRead, BufReader, Read, Write}; use std::io::{BufRead, BufReader, Read, Write};
let mut reader = BufReader::new(match conn.try_clone() { let mut reader = BufReader::new(match conn.try_clone() {
@@ -780,58 +1078,60 @@ fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) {
// Map entry: { string key = 1, Row value = 2 } // Map entry: { string key = 1, Row value = 2 }
let mut initial_state_bytes = Vec::new(); let mut initial_state_bytes = Vec::new();
if topic_name == "uss-oauth" && !oauth_token.is_empty() { if topic_name == "uss-oauth" {
// Row.value is base64-encoded protobuf OAuthTokenInfo: if let Some(topic_bytes) = oauth_topic_bytes {
// OAuthTokenInfo { // Use the exact Topic proto from Antigravity's state.vscdb.
// string access_token = 1; // This includes access_token + refresh_token + expiry, so the
// string token_type = 2; // LS can auto-refresh tokens via its built-in Google OAuth2 client.
// string refresh_token = 3; initial_state_bytes = topic_bytes.clone();
// google.protobuf.Timestamp expiry = 4; eprintln!("[stub-ext] using state.vscdb topic ({} bytes)", topic_bytes.len());
// } } else if !oauth_token.is_empty() {
let mut oauth_proto = Vec::new(); // Manual token fallback — construct OAuthTokenInfo with far-future expiry
// field 1 (access_token), LEN // (no refresh_token, so the LS can't auto-refresh)
oauth_proto.push(0x0A); let mut oauth_proto = Vec::new();
encode_varint(&mut oauth_proto, oauth_token.len() as u64); // field 1 (access_token), LEN
oauth_proto.extend_from_slice(oauth_token.as_bytes()); oauth_proto.push(0x0A);
// field 2 (token_type), LEN encode_varint(&mut oauth_proto, oauth_token.len() as u64);
let token_type = b"Bearer"; oauth_proto.extend_from_slice(oauth_token.as_bytes());
oauth_proto.push(0x12); // field 2 (token_type), LEN
encode_varint(&mut oauth_proto, token_type.len() as u64); let token_type = b"Bearer";
oauth_proto.extend_from_slice(token_type); oauth_proto.push(0x12);
// field 4 (expiry) = Timestamp { seconds = 4_102_444_800 } (year 2099-12-31) encode_varint(&mut oauth_proto, token_type.len() as u64);
// Timestamp proto: field 1 (seconds) varint oauth_proto.extend_from_slice(token_type);
let mut ts_proto = Vec::new(); // field 4 (expiry) = Timestamp { seconds = 4_102_444_800 } (year 2099-12-31)
ts_proto.push(0x08); // field 1 (seconds), varint let mut ts_proto = Vec::new();
encode_varint(&mut ts_proto, 4_102_444_800u64); ts_proto.push(0x08); // field 1 (seconds), varint
oauth_proto.push(0x22); // field 4 (expiry), LEN encode_varint(&mut ts_proto, 4_102_444_800u64);
encode_varint(&mut oauth_proto, ts_proto.len() as u64); oauth_proto.push(0x22); // field 4 (expiry), LEN
oauth_proto.extend_from_slice(&ts_proto); encode_varint(&mut oauth_proto, ts_proto.len() as u64);
oauth_proto.extend_from_slice(&ts_proto);
use base64::Engine; use base64::Engine;
let b64_value = base64::engine::general_purpose::STANDARD.encode(&oauth_proto); let b64_value = base64::engine::general_purpose::STANDARD.encode(&oauth_proto);
// Build Row { value = b64_value, e_tag = 1 } // Build Row { value = b64_value, e_tag = 1 }
let mut row = Vec::new(); let mut row = Vec::new();
row.push(0x0A); // field 1 (value), LEN row.push(0x0A); // field 1 (value), LEN
encode_varint(&mut row, b64_value.len() as u64); encode_varint(&mut row, b64_value.len() as u64);
row.extend_from_slice(b64_value.as_bytes()); row.extend_from_slice(b64_value.as_bytes());
row.push(0x10); // field 2 (e_tag), varint row.push(0x10); // field 2 (e_tag), varint
row.push(0x01); row.push(0x01);
// Build map entry: { key = "oauthTokenInfoSentinelKey", value = row } // Build map entry: { key = "oauthTokenInfoSentinelKey", value = row }
let key_str = b"oauthTokenInfoSentinelKey"; let key_str = b"oauthTokenInfoSentinelKey";
let mut map_entry = Vec::new(); let mut map_entry = Vec::new();
map_entry.push(0x0A); // field 1 (key), LEN map_entry.push(0x0A); // field 1 (key), LEN
encode_varint(&mut map_entry, key_str.len() as u64); encode_varint(&mut map_entry, key_str.len() as u64);
map_entry.extend_from_slice(key_str); map_entry.extend_from_slice(key_str);
map_entry.push(0x12); // field 2 (value = Row), LEN map_entry.push(0x12); // field 2 (value = Row), LEN
encode_varint(&mut map_entry, row.len() as u64); encode_varint(&mut map_entry, row.len() as u64);
map_entry.extend_from_slice(&row); map_entry.extend_from_slice(&row);
// Build Topic { data = [map_entry] } // Build Topic { data = [map_entry] }
initial_state_bytes.push(0x0A); // field 1 (data map), LEN initial_state_bytes.push(0x0A); // field 1 (data map), LEN
encode_varint(&mut initial_state_bytes, map_entry.len() as u64); encode_varint(&mut initial_state_bytes, map_entry.len() as u64);
initial_state_bytes.extend_from_slice(&map_entry); initial_state_bytes.extend_from_slice(&map_entry);
}
} }
// Build UnifiedStateSyncUpdate { initial_state = initial_state_bytes } // Build UnifiedStateSyncUpdate { initial_state = initial_state_bytes }
@@ -859,12 +1159,19 @@ fn stub_handle_connection(conn: std::net::TcpStream, oauth_token: &str) {
// (applied_update removed — data is in initial_state) // (applied_update removed — data is in initial_state)
// Hold the connection open — LS will reconnect if it drops. // Keep the stream alive with periodic valid messages.
// Don't send empty envelopes (LS rejects nil update type). // The LS has a ~10s read timeout on streams. After the initial_state,
// the LS only accepts AppliedUpdate (field 2 in the oneof).
// We send an empty AppliedUpdate {} every 5s as keepalive.
//
// AppliedUpdate is field 2 (wire type 2 = length-delimited), so:
// 0x12 = (field 2 << 3) | 2, 0x00 = length 0
// This creates: UnifiedStateSyncUpdate { applied_update: AppliedUpdate {} }
let keepalive_proto: &[u8] = &[0x12, 0x00]; // field 2 (applied_update), LEN=0
let keepalive_env = make_envelope(keepalive_proto);
loop { loop {
std::thread::sleep(std::time::Duration::from_secs(60)); std::thread::sleep(std::time::Duration::from_secs(5));
// Probe: try writing zero bytes to detect closed connection if !send_chunk(&mut writer, &keepalive_env) {
if writer.write_all(b"").is_err() {
break; break;
} }
} }