Files
zerogravity/src/standalone.rs
Nikketryhard 3fdd0368a0 fix: block ALL LS follow-up requests across connections
Move the in-flight blocking check to the top of the LLM request flow,
BEFORE request modification. This catches follow-ups on ALL connections
(the LS opens multiple parallel TLS connections). Only the very first
modified request reaches Google — all others get fake STOP responses.

Previously, each new connection independently allowed one request
through before blocking, letting 4-5 requests leak per turn.
2026-02-16 00:57:33 -06:00

1376 lines
52 KiB
Rust

//! Standalone Language Server — spawn and lifecycle management.
//!
//! Launches an isolated LS instance as a child process that the proxy fully owns.
//! In **headless** mode, the LS runs completely independently — no running
//! Antigravity app required. Extension server is disabled (`port=0`), CSRF is
//! self-generated, and MITM uses `HTTPS_PROXY` instead of iptables.
use crate::constants;
use crate::proto;
use std::io::Write;
use std::net::TcpListener;
use std::process::{Child, Command, Stdio};
use tokio::time::{sleep, Duration};
use tracing::{debug, info};
use uuid::Uuid;
/// Default path to the LS binary.
const LS_BINARY_PATH: &str =
"/usr/share/antigravity/resources/app/extensions/antigravity/bin/language_server_linux_x64";
/// App root for ANTIGRAVITY_EDITOR_APP_ROOT env var.
const APP_ROOT: &str = "/usr/share/antigravity/resources/app";
/// Data directory for the standalone LS.
const DATA_DIR: &str = "/tmp/antigravity-standalone";
/// System user for UID-scoped iptables isolation.
const LS_USER: &str = "antigravity-ls";
/// Path for the compiled dns_redirect.so preload library.
const DNS_REDIRECT_SO_PATH: &str = "/tmp/antigravity-dns-redirect.so";
/// Source file for the DNS redirect preload library (relative to binary).
const DNS_REDIRECT_C_SOURCE: &str = include_str!("mitm/dns_redirect.c");
/// Build the dns_redirect.so preload library if it doesn't already exist.
///
/// The library hooks `getaddrinfo()` via LD_PRELOAD to redirect Google API
/// domain lookups to 127.0.0.1. This is needed because the LS binary uses
/// CGO for DNS resolution (libc getaddrinfo) but raw syscalls for connect(),
/// so only DNS can be intercepted via LD_PRELOAD.
///
/// Returns the path to the .so on success, None on failure.
fn build_dns_redirect_so() -> Option<String> {
let so_path = DNS_REDIRECT_SO_PATH;
// Skip rebuild if already exists
if std::path::Path::new(so_path).exists() {
return Some(so_path.to_string());
}
// Write C source to a temp file
let c_path = format!("{so_path}.c");
if let Err(e) = std::fs::write(&c_path, DNS_REDIRECT_C_SOURCE) {
tracing::warn!("Failed to write dns_redirect.c: {e}");
return None;
}
// Compile: gcc -shared -fPIC -o dns_redirect.so dns_redirect.c -ldl
let output = Command::new("gcc")
.args(["-shared", "-fPIC", "-o", so_path, &c_path, "-ldl"])
.output();
match output {
Ok(out) if out.status.success() => {
info!("Built dns_redirect.so at {so_path}");
// Clean up source
let _ = std::fs::remove_file(&c_path);
Some(so_path.to_string())
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
tracing::warn!("Failed to compile dns_redirect.so: {stderr}");
None
}
Err(e) => {
tracing::warn!("gcc not found, cannot build dns_redirect.so: {e}");
None
}
}
}
/// A running standalone LS process.
pub struct StandaloneLS {
child: Child,
/// The actual LS process PID (may differ from child PID when spawned via sudo).
ls_pid: Option<u32>,
/// Whether the LS was spawned via sudo (needs sudo kill).
use_sudo: bool,
/// Whether kill() has already been called.
killed: bool,
pub port: u16,
pub csrf: String,
}
/// Config needed to bootstrap the standalone LS.
///
/// In normal mode, discovered from the running main LS.
/// In headless mode, generated entirely by the proxy.
pub struct MainLSConfig {
pub extension_server_port: String,
pub csrf: String,
}
/// Generate a fully self-contained config for headless mode.
///
/// No running Antigravity instance needed — extension server is disabled
/// and CSRF is a random UUID.
pub fn generate_standalone_config() -> MainLSConfig {
let csrf = Uuid::new_v4().to_string();
info!(
csrf_len = csrf.len(),
"Generated standalone config (headless)"
);
MainLSConfig {
extension_server_port: "0".to_string(), // disables extension server
csrf,
}
}
/// Optional MITM proxy config for the standalone LS.
pub struct StandaloneMitmConfig {
pub proxy_addr: String, // Full URL with scheme, e.g. "http://127.0.0.1:8742"
pub ca_cert_path: String, // path to MITM CA .pem
}
impl StandaloneLS {
/// Spawn a standalone LS process.
///
/// Discovers the main LS's extension server port and CSRF token,
/// picks a free port, builds init metadata, and launches the binary.
///
/// If `mitm_config` is provided, sets HTTPS_PROXY and SSL_CERT_FILE
/// so the LS routes LLM API calls through the MITM proxy.
pub fn spawn(
main_config: &MainLSConfig,
mitm_config: Option<&StandaloneMitmConfig>,
headless: bool,
) -> Result<Self, String> {
// Kill any orphaned LS processes from previous runs
cleanup_orphaned_ls();
let port = find_free_port()?;
let lsp_port = find_free_port()?;
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Build init metadata protobuf
let api_key = format!("standalone-api-key-{ts}");
let session_id = format!("standalone-session-{ts}");
let metadata = proto::build_init_metadata(
&api_key,
constants::antigravity_version(),
constants::client_version(),
&session_id,
1, // DETECT_AND_USE_PROXY_ENABLED
);
// Setup data dir (mode 1777 so both current user and antigravity-ls can write)
let gemini_dir = format!("{DATA_DIR}/.gemini");
let app_data_dir = format!("{DATA_DIR}/.gemini/antigravity-standalone");
let annotations_dir = format!("{app_data_dir}/annotations");
let brain_dir = format!("{app_data_dir}/brain");
for dir in [
DATA_DIR,
&gemini_dir,
&app_data_dir,
&annotations_dir,
&brain_dir,
] {
let _ = std::fs::create_dir_all(dir);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(dir, std::fs::Permissions::from_mode(0o1777));
}
}
// Check if data dir is writable by writing a test file.
// Old runs as `antigravity-ls` user leave dirs owned by that user.
let test_path = format!("{app_data_dir}/.write_test");
if std::fs::write(&test_path, b"ok").is_err() {
eprintln!(
"\n ⚠ Data dir {} is not writable (owned by another user from previous sudo run)\n \
Fix with: sudo chmod -R a+rwX {}\n",
app_data_dir, DATA_DIR
);
} else {
let _ = std::fs::remove_file(&test_path);
}
// Pre-seed user_settings.pb with detect_and_use_proxy = ENABLED.
// The LS reads this at startup when creating its HTTP transport.
// Without it, the LS ignores HTTPS_PROXY and API traffic bypasses MITM.
// UserSettings proto: field 34 (varint) = 1 (DETECT_AND_USE_PROXY_ENABLED)
// Tag: (34 << 3) | 0 = 272 → varint [0x90, 0x02]
// Value: 1 → varint [0x01]
let settings_path = format!("{app_data_dir}/user_settings.pb");
let settings_bytes: &[u8] = &[0x90, 0x02, 0x01];
if let Err(e) = std::fs::write(&settings_path, settings_bytes) {
tracing::warn!("Failed to pre-seed user_settings.pb: {e}");
} else {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(
&settings_path,
std::fs::Permissions::from_mode(0o0666),
);
}
tracing::info!("Pre-seeded user_settings.pb (detect_and_use_proxy=ENABLED)");
}
// In headless mode, spawn a stub TCP listener to serve as the extension server.
// The LS connects to this port and calls LanguageServerStarted — without it,
// the LS never fully initializes and won't accept connections on its server_port.
let _stub_listener = if headless {
let stub_port: u16 = main_config.extension_server_port.parse().unwrap_or(0);
if stub_port == 0 {
// Create a real listener so the LS can connect
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to bind stub extension server: {e}"))?;
let actual_port = listener
.local_addr()
.map_err(|e| format!("Failed to get stub port: {e}"))?
.port();
info!(
port = actual_port,
"Stub extension server listening (headless)"
);
// Read OAuth state from Antigravity's state.vscdb if available.
// The DB stores the exact Topic proto (access_token + refresh_token + expiry)
// which lets the LS auto-refresh tokens via its built-in Google OAuth2 client.
let (oauth_token, oauth_topic_bytes) = read_oauth_from_state_db()
.map(|(token, topic)| {
info!("Loaded OAuth token from Antigravity state.vscdb");
(token, Some(topic))
})
.unwrap_or_else(|| {
// Fall back to env var / token file
let token = std::env::var("ANTIGRAVITY_OAUTH_TOKEN")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| {
let home = std::env::var("HOME").unwrap_or_default();
let path = format!("{home}/.config/antigravity-proxy-token");
std::fs::read_to_string(&path)
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
})
.unwrap_or_default();
if !token.is_empty() {
info!("Loaded OAuth token from env/file (no refresh token — manual refresh needed)");
} else {
eprintln!("[headless] ⚠ No OAuth token found. Login to Antigravity first, or set ANTIGRAVITY_OAUTH_TOKEN");
}
(token, None)
});
let oauth_arc = std::sync::Arc::new(oauth_token);
let topic_arc = std::sync::Arc::new(oauth_topic_bytes);
// Spawn a thread to accept connections (just hold them open)
let listener_clone = listener
.try_clone()
.map_err(|e| format!("Failed to clone stub listener: {e}"))?;
std::thread::spawn(move || {
for stream in listener_clone.incoming() {
match stream {
Ok(conn) => {
let token = std::sync::Arc::clone(&oauth_arc);
let topic = std::sync::Arc::clone(&topic_arc);
// Handle each connection in its own thread
std::thread::spawn(move || {
stub_handle_connection(conn, &token, &topic);
});
}
Err(_) => break,
}
}
});
// Update the extension_server_port to the stub's port
// (we need to use this in args below)
Some((listener, actual_port))
} else {
None
}
} else {
None
};
// Determine the actual extension_server_port to use
let ext_port = if let Some((_, stub_port)) = &_stub_listener {
stub_port.to_string()
} else {
main_config.extension_server_port.clone()
};
// LS args — NO -standalone flag (it disables TCP listeners entirely)
// NOTE: do NOT use -random_port — it overrides -server_port and the LS
// would listen on a random port we can't discover.
let args = vec![
"-enable_lsp".to_string(),
format!("-lsp_port={}", lsp_port),
"-extension_server_port".to_string(),
ext_port,
"-csrf_token".to_string(),
main_config.csrf.clone(),
"-server_port".to_string(),
port.to_string(),
"-workspace_id".to_string(),
format!("standalone_{ts}"),
"-cloud_code_endpoint".to_string(),
// When MITM is active, append the MITM port to the endpoint URL.
// The LS's CodeAssistClient ignores HTTPS_PROXY (hardcoded Proxy:nil),
// so we redirect at the DNS+port level instead:
// 1. LD_PRELOAD hooks getaddrinfo() → 127.0.0.1 for API domains
// 2. Custom port in URL → LS connects to 127.0.0.1:MITM_PORT
// 3. MITM proxy intercepts the transparent TLS connection via SNI
if let Some(mitm) = mitm_config {
// Extract port from proxy_addr (e.g. "http://127.0.0.1:8742" → "8742")
let mitm_port = mitm.proxy_addr.rsplit(':').next().unwrap_or("8742");
format!("https://daily-cloudcode-pa.googleapis.com:{mitm_port}")
} else {
"https://daily-cloudcode-pa.googleapis.com".to_string()
},
"-app_data_dir".to_string(),
"antigravity-standalone".to_string(),
"-gemini_dir".to_string(),
gemini_dir,
];
info!(port, "Spawning standalone LS");
debug!(?args, "LS args");
// Build env vars for the LS process
let mut env_vars: Vec<(String, String)> =
vec![("ANTIGRAVITY_EDITOR_APP_ROOT".into(), APP_ROOT.into())];
// If MITM is enabled, add SSL + proxy env vars
if let Some(mitm) = mitm_config {
// Go's SSL_CERT_FILE replaces the entire system cert pool, so we
// need a combined bundle: system CAs + our MITM CA
// Write to /tmp — accessible by antigravity-ls user
// (user's ~/.config/ is not traversable by other UIDs)
let combined_ca_path = "/tmp/antigravity-mitm-combined-ca.pem".to_string();
let system_ca =
std::fs::read_to_string("/etc/ssl/certs/ca-certificates.crt").unwrap_or_default();
let mitm_ca = std::fs::read_to_string(&mitm.ca_cert_path)
.map_err(|e| format!("Failed to read MITM CA cert: {e}"))?;
std::fs::write(&combined_ca_path, format!("{system_ca}\n{mitm_ca}"))
.map_err(|e| format!("Failed to write combined CA bundle: {e}"))?;
// Make readable by antigravity-ls user
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(
&combined_ca_path,
std::fs::Permissions::from_mode(0o644),
);
}
info!(
proxy = %mitm.proxy_addr,
ca = %combined_ca_path,
"Setting MITM env vars on standalone LS (combined CA bundle)"
);
env_vars.push(("SSL_CERT_FILE".into(), combined_ca_path));
env_vars.push(("SSL_CERT_DIR".into(), "/dev/null".into()));
env_vars.push(("NODE_EXTRA_CA_CERTS".into(), mitm.ca_cert_path.clone()));
// Only set HTTPS_PROXY when iptables UID isolation is NOT available
// OR when running in headless mode (no sudo at all).
// With iptables, all outbound traffic is transparently redirected at the
// kernel level — setting HTTPS_PROXY on top causes double-proxying.
if headless || !has_ls_user() {
// proxy_addr already includes the scheme (e.g. "http://127.0.0.1:8742")
env_vars.push(("HTTPS_PROXY".into(), mitm.proxy_addr.clone()));
env_vars.push(("HTTP_PROXY".into(), mitm.proxy_addr.clone()));
// LD_PRELOAD DNS redirect: hooks getaddrinfo() so Google API domains
// resolve to 127.0.0.1. Combined with the port-modified endpoint URL,
// this makes the LS connect to our MITM proxy for ALL API calls —
// even the CodeAssistClient which has Proxy:nil hardcoded.
let so_path = build_dns_redirect_so();
if let Some(so) = so_path {
info!(path = %so, "Enabling LD_PRELOAD DNS redirect for headless MITM");
env_vars.push(("LD_PRELOAD".into(), so));
env_vars.push((
"DNS_REDIRECT_LOG".into(),
"/tmp/antigravity-dns-redirect.log".into(),
));
}
}
}
// In headless mode, never use sudo — run as current user
// In normal mode, use sudo if 'antigravity-ls' user exists
let use_sudo = !headless && has_ls_user();
let mut cmd = if use_sudo {
info!("Using UID isolation: spawning LS as 'antigravity-ls' user");
let mut c = Command::new("sudo");
c.args(["-n", "-u", LS_USER, "--", "/usr/bin/env"]);
for (k, v) in &env_vars {
c.arg(format!("{k}={v}"));
}
c.arg(LS_BINARY_PATH);
c.args(&args);
c
} else {
debug!("Spawning LS as current user");
let mut c = Command::new(LS_BINARY_PATH);
c.args(&args);
for (k, v) in &env_vars {
c.env(k, v);
}
c
};
// Capture stderr for debugging — logs to /tmp so we can diagnose LS failures
let stderr_file = std::fs::File::create("/tmp/antigravity-ls-debug.log")
.map_err(|e| format!("Failed to create LS debug log: {e}"))?;
cmd.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::from(stderr_file));
let mut child = cmd
.spawn()
.map_err(|e| format!("Failed to spawn LS binary: {e}"))?;
// Feed init metadata via stdin, then close it
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(&metadata)
.map_err(|e| format!("Failed to write init metadata to stdin: {e}"))?;
// stdin drops here → EOF (LS handles this fine in non-standalone mode)
}
info!(pid = child.id(), port, "Standalone LS spawned");
// When spawned via sudo, the child is the sudo process which exits after
// launching the LS as the target user. We need the actual LS PID for cleanup.
let ls_pid = if use_sudo {
// Give sudo a moment to spawn the real process
std::thread::sleep(std::time::Duration::from_millis(500));
// Find the LS process owned by antigravity-ls user
find_ls_pid_for_user(LS_USER).ok()
} else {
Some(child.id())
};
if let Some(pid) = ls_pid {
info!(
ls_pid = pid,
sudo = use_sudo,
"Discovered actual LS process"
);
}
Ok(StandaloneLS {
child,
ls_pid,
use_sudo,
killed: false,
port,
csrf: main_config.csrf.clone(),
})
}
/// Wait for the standalone LS to be ready (accepting TCP connections).
///
/// Retries up to `max_attempts` times with a 1-second delay between each.
pub async fn wait_ready(&mut self, max_attempts: u32) -> Result<(), String> {
info!(port = self.port, "Waiting for standalone LS to be ready...");
for attempt in 1..=max_attempts {
sleep(Duration::from_secs(1)).await;
// Check if the process is still alive
match self.child.try_wait() {
Ok(Some(status)) => {
return Err(format!(
"Standalone LS exited prematurely with status: {status}"
));
}
Ok(None) => {} // still running
Err(e) => {
return Err(format!("Failed to check LS process status: {e}"));
}
}
// Simple TCP connect check — if the LS is listening, it's ready
match tokio::net::TcpStream::connect(format!("127.0.0.1:{}", self.port)).await {
Ok(_) => {
info!(attempt, "Standalone LS is ready (accepting connections)");
return Ok(());
}
Err(e) => {
debug!(attempt, error = %e, "LS not ready yet");
}
}
}
Err(format!(
"Standalone LS failed to become ready after {max_attempts} attempts on port {}",
self.port
))
}
/// Check if the child process is still running.
#[allow(dead_code)]
pub fn is_alive(&mut self) -> bool {
matches!(self.child.try_wait(), Ok(None))
}
/// Kill the standalone LS process.
pub fn kill(&mut self) {
if self.killed {
return;
}
self.killed = true;
info!("Killing standalone LS");
if self.use_sudo {
// The child is sudo which already exited. Kill the actual LS.
if let Some(pid) = self.ls_pid {
info!(pid, "Killing LS process via sudo -u {}", LS_USER);
// Run kill AS the antigravity-ls user (same UID can signal)
let ok = std::process::Command::new("sudo")
.args(["-n", "-u", LS_USER, "kill", "-TERM", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false);
if ok {
std::thread::sleep(std::time::Duration::from_millis(500));
// Force kill if still alive
let _ = std::process::Command::new("sudo")
.args(["-n", "-u", LS_USER, "kill", "-KILL", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
} else {
// Fallback: try with root sudo, then cleanup
info!("sudo -u kill failed, trying fallback cleanup");
cleanup_orphaned_ls();
}
} else {
// No PID recorded, try blanket cleanup
cleanup_orphaned_ls();
}
} else {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
}
impl Drop for StandaloneLS {
fn drop(&mut self) {
self.kill();
}
}
/// Discover only the extension_server_port and csrf_token from the running main LS.
///
/// This does NOT discover the HTTPS port — we don't need to talk to the real LS,
/// only steal its extension server connection info.
pub fn discover_main_ls_config() -> Result<MainLSConfig, String> {
let pid = find_main_ls_pid()?;
let cmdline = std::fs::read(format!("/proc/{pid}/cmdline"))
.map_err(|e| format!("Can't read cmdline for PID {pid}: {e}"))?;
let args: Vec<&[u8]> = cmdline.split(|&b| b == 0).collect();
let mut csrf = String::new();
let mut ext_port = String::new();
for (i, arg) in args.iter().enumerate() {
if let Ok(s) = std::str::from_utf8(arg) {
match s {
"--csrf_token" | "-csrf_token" => {
if let Some(next) = args.get(i + 1) {
if let Ok(val) = std::str::from_utf8(next) {
csrf = val.to_string();
}
}
}
"--extension_server_port" | "-extension_server_port" => {
if let Some(next) = args.get(i + 1) {
if let Ok(val) = std::str::from_utf8(next) {
ext_port = val.to_string();
}
}
}
_ => {}
}
}
}
if csrf.is_empty() {
return Err("Could not find CSRF token from main LS".to_string());
}
if ext_port.is_empty() {
return Err("Could not find extension_server_port from main LS".to_string());
}
info!(
pid,
ext_port,
csrf_len = csrf.len(),
"Discovered main LS config"
);
Ok(MainLSConfig {
extension_server_port: ext_port,
csrf,
})
}
/// Find the PID of the main (real) LS process.
///
/// Checks `/proc/<pid>/exe` to ensure we find the actual LS binary,
/// not bash scripts that happen to mention `language_server_linux` in their args.
fn find_main_ls_pid() -> Result<String, String> {
let proc = std::path::Path::new("/proc");
if !proc.exists() {
return Err("No /proc filesystem".to_string());
}
let entries = std::fs::read_dir(proc).map_err(|e| format!("Cannot read /proc: {e}"))?;
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
// Only numeric dirs (PIDs)
if !name_str.chars().all(|c| c.is_ascii_digit()) {
continue;
}
let exe_link = entry.path().join("exe");
if let Ok(target) = std::fs::read_link(&exe_link) {
let target_str = target.to_string_lossy().to_string();
let target_clean = target_str.trim_end_matches(" (deleted)");
// Must be the actual LS binary, not a bash script
if target_clean.contains("language_server_linux")
|| target_clean.contains("antigravity-language-server")
{
return Ok(name_str.to_string());
}
}
}
Err("No main LS process found — Antigravity must be running".to_string())
}
/// Find a free TCP port by binding to port 0.
fn find_free_port() -> Result<u16, String> {
let listener =
TcpListener::bind("127.0.0.1:0").map_err(|e| format!("Failed to bind for port: {e}"))?;
listener
.local_addr()
.map(|a| a.port())
.map_err(|e| format!("Failed to get port: {e}"))
}
/// Check if the dedicated LS system user exists.
///
/// When the user exists, the proxy spawns the LS as that UID so iptables
/// can scope the :443 redirect to only the standalone LS process.
fn has_ls_user() -> bool {
Command::new("id")
.args(["-u", LS_USER])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
/// Find the PID of a language_server process owned by a specific user.
///
/// Used to discover the actual LS process after sudo spawns it as a different user.
fn find_ls_pid_for_user(user: &str) -> Result<u32, String> {
let output = Command::new("pgrep")
.args(["-u", user, "-f", "language_server_linux"])
.output()
.map_err(|e| format!("pgrep failed: {e}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
stdout
.lines()
.next()
.and_then(|line| line.trim().parse::<u32>().ok())
.ok_or_else(|| format!("No LS process found for user {user}"))
}
/// Kill any orphaned standalone LS processes from previous runs.
///
/// This handles the case where the proxy crashed or was killed without
/// properly cleaning up the sudo-spawned LS process.
///
/// Key insight: the sudoers rule allows running commands AS antigravity-ls
/// (`ALL=(antigravity-ls) NOPASSWD: ALL`). A process can send signals to
/// other processes with the same UID, so we run `kill` as antigravity-ls
/// rather than as root.
fn cleanup_orphaned_ls() {
if !has_ls_user() {
return;
}
// Find all LS processes owned by antigravity-ls user
let output = Command::new("pgrep")
.args(["-u", LS_USER, "-f", "language_server_linux"])
.output();
let pids: Vec<u32> = match output {
Ok(out) => String::from_utf8_lossy(&out.stdout)
.lines()
.filter_map(|l| l.trim().parse().ok())
.collect(),
Err(_) => return,
};
if pids.is_empty() {
return;
}
info!(
count = pids.len(),
?pids,
"Cleaning up orphaned standalone LS processes"
);
// Kill each PID by running `kill` AS the antigravity-ls user.
// This works because same-UID processes can signal each other,
// and the sudoers rule allows ALL commands as antigravity-ls.
for pid in &pids {
let ok = Command::new("sudo")
.args(["-n", "-u", LS_USER, "kill", "-TERM", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false);
if ok {
info!(pid, "Killed orphaned LS process");
} else {
// Fallback: try as root (needs separate sudoers entry)
let _ = Command::new("sudo")
.args(["-n", "kill", "-TERM", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
}
// Wait for graceful exit
std::thread::sleep(std::time::Duration::from_millis(500));
// Force-kill any survivors
let still_alive = Command::new("pgrep")
.args(["-u", LS_USER, "-f", "language_server_linux"])
.output()
.map(|o| !o.stdout.is_empty())
.unwrap_or(false);
if still_alive {
info!("Orphaned LS still alive, force killing");
for pid in &pids {
let _ = Command::new("sudo")
.args(["-n", "-u", LS_USER, "kill", "-KILL", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
std::thread::sleep(std::time::Duration::from_millis(300));
// Final check
let still_alive = Command::new("pgrep")
.args(["-u", LS_USER, "-f", "language_server_linux"])
.output()
.map(|o| !o.stdout.is_empty())
.unwrap_or(false);
if still_alive {
eprintln!("\n \x1b[1;31m⚠ Cannot kill orphaned LS process\x1b[0m");
eprintln!(" Run: \x1b[1msudo pkill -u {LS_USER} -f language_server_linux\x1b[0m\n");
}
} else {
info!("Orphaned LS processes cleaned up");
}
}
/// Read OAuth token state directly from Antigravity's state.vscdb.
///
/// The DB stores the exact Topic proto bytes under key `antigravityUnifiedStateSync.oauthToken`.
/// This includes access_token + refresh_token + expiry, allowing the LS to auto-refresh.
/// Returns (access_token, topic_proto_bytes) or None if unavailable.
fn read_oauth_from_state_db() -> Option<(String, Vec<u8>)> {
use base64::Engine;
let home = std::env::var("HOME").ok()?;
let db_path = format!("{home}/.config/Antigravity/User/globalStorage/state.vscdb");
// Check the DB file exists
if !std::path::Path::new(&db_path).exists() {
return None;
}
// Read the Topic proto (base64-encoded in the DB)
let output = std::process::Command::new("sqlite3")
.args([
&db_path,
"SELECT value FROM ItemTable WHERE key='antigravityUnifiedStateSync.oauthToken'",
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let b64_str = String::from_utf8_lossy(&output.stdout).trim().to_string();
if b64_str.is_empty() {
return None;
}
// Decode the base64 to get the raw Topic proto bytes
let topic_bytes = base64::engine::general_purpose::STANDARD
.decode(&b64_str)
.ok()?;
if topic_bytes.is_empty() {
return None;
}
// Extract the access_token from the OAuthTokenInfo inside the Topic proto.
// The inner value (Row.value) is also base64, containing a serialized OAuthTokenInfo.
// For the access_token (used by GetSecretValue), we can read it from the authStatus.
let access_token = read_access_token_from_auth_status(&db_path)
.or_else(|| extract_access_token_from_topic(&topic_bytes))
.unwrap_or_default();
Some((access_token, topic_bytes))
}
/// Read the current access token from `antigravityAuthStatus` in state.vscdb.
/// This JSON object has an `apiKey` field with the latest access token.
fn read_access_token_from_auth_status(db_path: &str) -> Option<String> {
let output = std::process::Command::new("sqlite3")
.args([
db_path,
"SELECT value FROM ItemTable WHERE key='antigravityAuthStatus'",
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let json_str = String::from_utf8_lossy(&output.stdout).trim().to_string();
// Simple extraction: find "apiKey":"..." pattern
let marker = "\"apiKey\":\"";
let start = json_str.find(marker)? + marker.len();
let end = json_str[start..].find('"')? + start;
let api_key = &json_str[start..end];
if api_key.starts_with("ya29.") {
Some(api_key.to_string())
} else {
None
}
}
/// Extract access_token from the Topic proto bytes by finding the inner
/// base64-encoded OAuthTokenInfo and decoding its first string field.
fn extract_access_token_from_topic(topic_bytes: &[u8]) -> Option<String> {
use base64::Engine;
// Find long base64 strings in the proto (the Row.value field)
// Simple approach: convert to string and find base64 pattern
let as_str = String::from_utf8_lossy(topic_bytes);
// The base64 OAuthTokenInfo starts with "Co" (0x0A = field 1, len-delimited)
for segment in as_str.split(|c: char| !c.is_alphanumeric() && c != '+' && c != '/' && c != '=')
{
if segment.len() > 50 {
if let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(segment) {
// Try to extract field 1 (access_token) from the OAuthTokenInfo proto
if let Some(token) = extract_proto_string_from_bytes(&decoded, 1) {
if token.starts_with("ya29.") {
return Some(token);
}
}
}
}
}
None
}
/// Extract a string field from raw protobuf bytes by field number.
fn extract_proto_string_from_bytes(buf: &[u8], target_field: u32) -> Option<String> {
let mut i = 0;
while i < buf.len() {
let (tag, bytes_read) = decode_varint_at(buf, i)?;
i += bytes_read;
let field_num = (tag >> 3) as u32;
let wire_type = (tag & 0x07) as u8;
match wire_type {
0 => {
// varint — skip it
let (_, vr) = decode_varint_at(buf, i)?;
i += vr;
}
2 => {
// length-delimited
let (len, lr) = decode_varint_at(buf, i)?;
i += lr;
let len = len as usize;
if i + len > buf.len() {
return None;
}
if field_num == target_field {
return String::from_utf8(buf[i..i + len].to_vec()).ok();
}
i += len;
}
_ => return None, // unsupported wire type
}
}
None
}
/// Decode a varint from a byte slice at the given offset.
/// Returns (value, bytes_consumed).
fn decode_varint_at(buf: &[u8], offset: usize) -> Option<(u64, usize)> {
let mut val: u64 = 0;
let mut shift = 0u32;
let mut i = offset;
loop {
if i >= buf.len() {
return None;
}
let b = buf[i];
i += 1;
val |= ((b & 0x7F) as u64) << shift;
if b & 0x80 == 0 {
return Some((val, i - offset));
}
shift += 7;
if shift >= 64 {
return None;
}
}
}
/// Handle a single connection from the LS to the stub extension server.
///
/// The LS uses Connect RPC (HTTP/1.1, ServerStream) to call ExtensionServerService methods.
/// ALL methods are ServerStream — responses use Connect streaming envelope framing:
/// [0x00 | len(4) | protobuf_data]... (0+ data messages)
/// [0x02 | len(4) | json_trailer] (exactly 1 end-of-stream)
///
/// IMPORTANT: `SubscribeToUnifiedStateSyncTopic` is a long-lived stream.
/// If we immediately close it, the LS reconnects in a tight loop and never
/// proceeds to fetch OAuth tokens. We keep subscription connections OPEN.
fn stub_handle_connection(
conn: std::net::TcpStream,
oauth_token: &str,
oauth_topic_bytes: &Option<Vec<u8>>,
) {
use std::io::{BufRead, BufReader, Read, Write};
let mut reader = BufReader::new(match conn.try_clone() {
Ok(c) => c,
Err(_) => return,
});
let mut writer = conn;
// Read the HTTP request line
let mut request_line = String::new();
match reader.read_line(&mut request_line) {
Ok(0) | Err(_) => return,
_ => {}
}
// Extract method path for logging
let path = request_line
.split_whitespace()
.nth(1)
.unwrap_or("/unknown")
.to_string();
// Read headers
let mut content_len: usize = 0;
loop {
let mut line = String::new();
if reader.read_line(&mut line).unwrap_or(0) == 0 {
return;
}
if line.trim().is_empty() {
break;
}
if line.to_lowercase().starts_with("content-length:") {
content_len = line
.split(':')
.nth(1)
.and_then(|v| v.trim().parse().ok())
.unwrap_or(0);
}
}
// Read body
let mut body = Vec::new();
if content_len > 0 {
body.resize(content_len, 0u8);
if Read::read_exact(&mut reader, &mut body).is_err() {
return;
}
}
// ─── Long-lived streams ──────────────────────────────────────────────
// SubscribeToUnifiedStateSyncTopic must stay open — the LS subscribes
// once and expects updates (OAuth, settings) delivered over this stream.
// If we close immediately, the LS reconnects in a tight loop (~30/sec).
if path.contains("SubscribeToUnifiedStateSyncTopic") {
// Parse the request body to extract the topic name.
// Connect envelope: [flag(1)] [len(4)] [proto(N)]
let proto_body = if body.len() > 5 {
&body[5..]
} else {
&body[..]
};
// SubscribeToUnifiedStateSyncTopicRequest { string topic = 1; }
let mut topic_name = String::new();
let mut i = 0;
while i < proto_body.len() {
let tag_byte = proto_body[i];
let field_num = tag_byte >> 3;
let wire_type = tag_byte & 0x07;
i += 1;
if wire_type == 2 && i < proto_body.len() {
let len = proto_body[i] as usize;
i += 1;
if i + len <= proto_body.len() {
if field_num == 1 {
topic_name = String::from_utf8_lossy(&proto_body[i..i + len]).to_string();
}
i += len;
} else {
break;
}
} else {
break;
}
}
eprintln!("[stub-ext] STREAM → {path} topic={topic_name:?}");
// Protocol:
// UnifiedStateSyncUpdate {
// oneof UpdateType {
// string initial_state = 1; // ← STRING, not a submessage!
// AppliedUpdate applied_update = 2;
// }
// }
//
// Flow:
// 1. Send initial_state = "" (empty string = initial snapshot marker)
// 2. For uss-oauth topic: send applied_update with OAuth token
// 3. Hold stream open for future updates
// Helper: wrap protobuf bytes in a Connect data envelope
let make_envelope = |proto: &[u8]| -> Vec<u8> {
let mut env = Vec::with_capacity(5 + proto.len());
env.push(0x00u8); // data flag
env.extend_from_slice(&(proto.len() as u32).to_be_bytes());
env.extend_from_slice(proto);
env
};
// Helper: write a chunk
let send_chunk = |w: &mut std::net::TcpStream, data: &[u8]| -> bool {
let hdr = format!("{:x}\r\n", data.len());
w.write_all(hdr.as_bytes()).is_ok()
&& w.write_all(data).is_ok()
&& w.write_all(b"\r\n").is_ok()
&& w.flush().is_ok()
};
// --- Message 1: initial_state = Topic { data: { "authToken": Row { value: token, e_tag: 1 } } } ---
// Topic { map<string, Row> data = 1; }
// Row { string value = 1; int64 e_tag = 2; }
// Map entry: { string key = 1, Row value = 2 }
let mut initial_state_bytes = Vec::new();
if topic_name == "uss-oauth" {
if let Some(topic_bytes) = oauth_topic_bytes {
// Use the exact Topic proto from Antigravity's state.vscdb.
// This includes access_token + refresh_token + expiry, so the
// LS can auto-refresh tokens via its built-in Google OAuth2 client.
initial_state_bytes = topic_bytes.clone();
eprintln!(
"[stub-ext] using state.vscdb topic ({} bytes)",
topic_bytes.len()
);
} else if !oauth_token.is_empty() {
// Manual token fallback — construct OAuthTokenInfo with far-future expiry
// (no refresh_token, so the LS can't auto-refresh)
let mut oauth_proto = Vec::new();
// field 1 (access_token), LEN
oauth_proto.push(0x0A);
encode_varint(&mut oauth_proto, oauth_token.len() as u64);
oauth_proto.extend_from_slice(oauth_token.as_bytes());
// field 2 (token_type), LEN
let token_type = b"Bearer";
oauth_proto.push(0x12);
encode_varint(&mut oauth_proto, token_type.len() as u64);
oauth_proto.extend_from_slice(token_type);
// field 4 (expiry) = Timestamp { seconds = 4_102_444_800 } (year 2099-12-31)
let mut ts_proto = Vec::new();
ts_proto.push(0x08); // field 1 (seconds), varint
encode_varint(&mut ts_proto, 4_102_444_800u64);
oauth_proto.push(0x22); // field 4 (expiry), LEN
encode_varint(&mut oauth_proto, ts_proto.len() as u64);
oauth_proto.extend_from_slice(&ts_proto);
use base64::Engine;
let b64_value = base64::engine::general_purpose::STANDARD.encode(&oauth_proto);
// Build Row { value = b64_value, e_tag = 1 }
let mut row = Vec::new();
row.push(0x0A); // field 1 (value), LEN
encode_varint(&mut row, b64_value.len() as u64);
row.extend_from_slice(b64_value.as_bytes());
row.push(0x10); // field 2 (e_tag), varint
row.push(0x01);
// Build map entry: { key = "oauthTokenInfoSentinelKey", value = row }
let key_str = b"oauthTokenInfoSentinelKey";
let mut map_entry = Vec::new();
map_entry.push(0x0A); // field 1 (key), LEN
encode_varint(&mut map_entry, key_str.len() as u64);
map_entry.extend_from_slice(key_str);
map_entry.push(0x12); // field 2 (value = Row), LEN
encode_varint(&mut map_entry, row.len() as u64);
map_entry.extend_from_slice(&row);
// Build Topic { data = [map_entry] }
initial_state_bytes.push(0x0A); // field 1 (data map), LEN
encode_varint(&mut initial_state_bytes, map_entry.len() as u64);
initial_state_bytes.extend_from_slice(&map_entry);
}
}
// Build UnifiedStateSyncUpdate { initial_state = initial_state_bytes }
let mut initial_proto = Vec::new();
initial_proto.push(0x0A); // field 1 (initial_state), LEN
encode_varint(&mut initial_proto, initial_state_bytes.len() as u64);
initial_proto.extend_from_slice(&initial_state_bytes);
let initial_env = make_envelope(&initial_proto);
let header = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: application/connect+proto\r\n\
Transfer-Encoding: chunked\r\n\
\r\n"
);
if writer.write_all(header.as_bytes()).is_err() {
return;
}
if !send_chunk(&mut writer, &initial_env) {
return;
}
eprintln!(
"[stub-ext] STREAM → sent initial_state ({} bytes)",
initial_state_bytes.len()
);
// (applied_update removed — data is in initial_state)
// Keep the stream alive with periodic valid messages.
// The LS has a ~10s read timeout on streams. After the initial_state,
// the LS only accepts AppliedUpdate (field 2 in the oneof).
// We send an empty AppliedUpdate {} every 5s as keepalive.
//
// AppliedUpdate is field 2 (wire type 2 = length-delimited), so:
// 0x12 = (field 2 << 3) | 2, 0x00 = length 0
// This creates: UnifiedStateSyncUpdate { applied_update: AppliedUpdate {} }
let keepalive_proto: &[u8] = &[0x12, 0x00]; // field 2 (applied_update), LEN=0
let keepalive_env = make_envelope(keepalive_proto);
loop {
std::thread::sleep(std::time::Duration::from_secs(5));
if !send_chunk(&mut writer, &keepalive_env) {
break;
}
}
return;
}
// ─── Short-lived methods (everything else) ───────────────────────────
let is_noisy = path.contains("GetChromeDevtoolsMcpUrl")
|| path.contains("FetchMCPAuthToken")
|| path.contains("PushUnifiedStateSyncUpdate");
if !is_noisy {
eprintln!("[stub-ext] 200 OK → {path}");
}
// Build Connect streaming response body with proper envelope framing.
let mut envelope = Vec::new();
if path.contains("GetSecretValue") {
// Parse request body to extract the key (protobuf: field 1 = key, string)
let key = extract_proto_string(&body, 1).unwrap_or_default();
eprintln!("[stub-ext] ← GetSecretValue key={key:?}");
if !oauth_token.is_empty() {
// Build protobuf: GetSecretValueResponse { string value = 1 }
let proto = encode_proto_string(1, oauth_token.as_bytes());
eprintln!(
"[stub-ext] → serving token ({} bytes) for key={key:?}",
oauth_token.len()
);
// Data envelope: flag=0x00, length, data
envelope.push(0x00u8);
envelope.extend_from_slice(&(proto.len() as u32).to_be_bytes());
envelope.extend_from_slice(&proto);
} else {
eprintln!("[stub-ext] ⚠ no OAuth token available for key={key:?}");
}
} else if path.contains("StoreSecretValue") {
// Parse and log what the LS is storing (for debugging)
let key = extract_proto_string(&body, 1).unwrap_or_default();
let value = extract_proto_string(&body, 2).unwrap_or_default();
let val_preview = if value.len() > 32 {
format!("{}...", &value[..32])
} else {
value
};
eprintln!("[stub-ext] ← StoreSecretValue key={key:?} value={val_preview:?}");
}
if path.contains("PushUnifiedStateSyncUpdate") {
// Unary proto — respond with empty PushUnifiedStateSyncUpdateResponse (0 bytes body)
let header = "HTTP/1.1 200 OK\r\n\
Content-Type: application/proto\r\n\
Content-Length: 0\r\n\
Connection: close\r\n\
\r\n";
let _ = writer.write_all(header.as_bytes());
let _ = writer.flush();
return;
}
// End-of-stream envelope: flag=0x02, length=2, data="{}"
envelope.push(0x02u8);
envelope.extend_from_slice(&2u32.to_be_bytes());
envelope.extend_from_slice(b"{}");
// Respond with 200 OK + Connection: close (one request per connection)
let header = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: application/connect+proto\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n",
envelope.len()
);
let _ = writer.write_all(header.as_bytes());
let _ = writer.write_all(&envelope);
let _ = writer.flush();
}
/// Extract a string field from a protobuf message by field number.
/// Only handles simple string (wire type 2) fields at the top level.
fn extract_proto_string(buf: &[u8], target_field: u32) -> Option<String> {
let mut i = 0;
while i < buf.len() {
// Read field tag (varint)
let (tag, consumed) = decode_varint(&buf[i..])?;
i += consumed;
let field_num = (tag >> 3) as u32;
let wire_type = (tag & 0x07) as u8;
match wire_type {
0 => {
// Varint — skip
let (_, c) = decode_varint(&buf[i..])?;
i += c;
}
1 => {
// 64-bit — skip 8 bytes
i += 8;
}
2 => {
// Length-delimited
let (len, c) = decode_varint(&buf[i..])?;
i += c;
let len = len as usize;
if i + len > buf.len() {
return None;
}
if field_num == target_field {
return String::from_utf8(buf[i..i + len].to_vec()).ok();
}
i += len;
}
5 => {
// 32-bit — skip 4 bytes
i += 4;
}
_ => return None, // Unknown wire type
}
}
None
}
/// Decode a protobuf varint, returning (value, bytes_consumed).
fn decode_varint(buf: &[u8]) -> Option<(u64, usize)> {
let mut result: u64 = 0;
let mut shift = 0u32;
for (i, &byte) in buf.iter().enumerate() {
result |= ((byte & 0x7f) as u64) << shift;
if byte & 0x80 == 0 {
return Some((result, i + 1));
}
shift += 7;
if shift >= 64 {
return None;
}
}
None
}
/// Encode a string value as a protobuf field (field_num, wire type 2).
fn encode_proto_string(field_num: u32, data: &[u8]) -> Vec<u8> {
let tag = (field_num << 3) | 2; // wire type 2 = length-delimited
let mut buf = Vec::with_capacity(1 + 5 + data.len());
encode_varint(&mut buf, tag as u64);
encode_varint(&mut buf, data.len() as u64);
buf.extend_from_slice(data);
buf
}
/// Encode a u64 as a protobuf varint.
fn encode_varint(buf: &mut Vec<u8>, mut val: u64) {
loop {
let byte = (val & 0x7f) as u8;
val >>= 7;
if val == 0 {
buf.push(byte);
break;
}
buf.push(byte | 0x80);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_find_free_port() {
let port = find_free_port().unwrap();
assert!(port > 0);
// Port should be available — try binding to it
let listener = TcpListener::bind(format!("127.0.0.1:{port}"));
assert!(listener.is_ok(), "Port {port} should be free");
}
}