diff --git a/src/api/responses.rs b/src/api/responses.rs index 3d364e2..40674de 100644 --- a/src/api/responses.rs +++ b/src/api/responses.rs @@ -443,7 +443,7 @@ async fn handle_responses_stream( }), )); - // ── Phase 1: Poll for thinking content (arrives before response text) ── + // ── Stream cascade updates: event-driven instead of timer-based polling ── let start = std::time::Instant::now(); let mut last_text = String::new(); let mut thinking_emitted = false; @@ -451,6 +451,20 @@ async fn handle_responses_stream( let mut message_started = false; let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); + // Try to open a reactive streaming connection for real-time notifications. + // Falls back to timer-based polling if the streaming RPC is unavailable. + let mut reactive_rx = match state.backend.stream_cascade_updates(&cascade_id).await { + Ok(rx) => { + debug!("Using reactive streaming for cascade updates"); + Some(rx) + } + Err(e) => { + debug!("Reactive streaming unavailable, falling back to polling: {e}"); + None + } + }; + + while start.elapsed().as_secs() < timeout { if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await { if status == 200 { @@ -648,8 +662,33 @@ async fn handle_responses_stream( } } - let poll_ms: u64 = rand::thread_rng().gen_range(150..250); - tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; + // Wait for next update: either reactive notification or fallback timer + match reactive_rx { + Some(ref mut rx) => { + // Wait for reactive notification with a safety timeout + let timeout = tokio::time::timeout( + tokio::time::Duration::from_millis(500), + rx.recv(), + ).await; + match timeout { + Ok(Some(_diff)) => { + // Drain any additional queued notifications (coalesce) + while rx.try_recv().is_ok() {} + } + Ok(None) => { + // Stream closed — fall back to polling + debug!("Reactive stream closed, falling back to polling"); + reactive_rx = None; + } + Err(_) => {} // timeout — fetch anyway as safety net + } + } + None => { + // Fallback: timer-based polling + let poll_ms: u64 = rand::thread_rng().gen_range(150..250); + tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; + } + } } // Timeout — emit incomplete response diff --git a/src/backend.rs b/src/backend.rs index 0112725..c9fcf9a 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -366,6 +366,111 @@ impl Backend { let body = serde_json::json!({"cascadeId": cascade_id}); self.call_json("GetCascadeTrajectory", &body).await } + + /// Open a server-streaming reactive updates RPC. + /// `rpc_method` is the ConnectRPC method name, e.g. "StreamCascadeReactiveUpdates". + async fn stream_reactive_rpc( + &self, + rpc_method: &str, + cascade_id: &str, + ) -> Result, String> { + let (base, csrf) = { + let guard = self.inner.read().await; + ( + format!("https://127.0.0.1:{}", guard.https_port), + guard.csrf.clone(), + ) + }; + + let url = format!("{base}/{LS_SERVICE}/{rpc_method}"); + let body = serde_json::json!({ + "protocolVersion": 1, + "id": cascade_id, + }); + + let mut headers = Self::common_headers(&csrf); + headers.insert("Content-Type", HeaderValue::from_static("application/connect+json")); + headers.insert("Connect-Protocol-Version", HeaderValue::from_static("1")); + + // Connect protocol envelope: [flags:1][length:4][payload] + let json_bytes = serde_json::to_vec(&body).unwrap(); + let mut envelope = Vec::with_capacity(5 + json_bytes.len()); + envelope.push(0x00); + envelope.extend_from_slice(&(json_bytes.len() as u32).to_be_bytes()); + envelope.extend_from_slice(&json_bytes); + + let mut resp = self + .client + .post(&url) + .headers(headers) + .body(envelope) + .send() + .await + .map_err(|e| format!("{rpc_method} HTTP error: {e}"))?; + + let status = resp.status().as_u16(); + if status != 200 { + let err_body = resp.bytes().await.unwrap_or_default(); + let err_text = String::from_utf8_lossy(&err_body); + return Err(format!("{rpc_method} failed: {status} — {err_text}")); + } + + let resp_ct = resp.headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("unknown") + .to_string(); + debug!("{rpc_method}: connected for cascade {cascade_id}, content-type: {resp_ct}"); + + let (tx, rx) = tokio::sync::mpsc::channel::(64); + let method = rpc_method.to_string(); + let cid = cascade_id.to_string(); + + tokio::spawn(async move { + let mut buf = Vec::new(); + + while let Ok(Some(chunk)) = resp.chunk().await { + if chunk.is_empty() { + continue; + } + buf.extend_from_slice(&chunk); + + while buf.len() >= 5 { + let flags = buf[0]; + let len = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize; + + if buf.len() < 5 + len { + break; + } + + let payload = &buf[5..5 + len]; + + if flags == 0x02 { + let text = String::from_utf8_lossy(payload); + debug!("{method}: end frame: {text}"); + } else if let Ok(json) = serde_json::from_slice::(payload) { + if tx.send(json).await.is_err() { + buf.drain(..5 + len); + break; + } + } + + buf.drain(..5 + len); + } + } + debug!("{method}: stream ended for {cid}"); + }); + + Ok(rx) + } + + /// StreamCascadeReactiveUpdates — real-time cascade state diffs. + pub async fn stream_cascade_updates( + &self, + cascade_id: &str, + ) -> Result, String> { + self.stream_reactive_rpc("StreamCascadeReactiveUpdates", cascade_id).await + } } // ─── Discovery helpers ─────────────────────────────────────────────────────── diff --git a/src/main.rs b/src/main.rs index 8aa5838..9b333db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,6 +53,19 @@ struct Cli { #[tokio::main] async fn main() { + // Ignore SIGPIPE — prevents instant death when piped through tee/grep + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + let mut sigpipe = signal(SignalKind::pipe()).expect("failed to install SIGPIPE handler"); + tokio::spawn(async move { + loop { + sigpipe.recv().await; + // Silently ignore SIGPIPE + } + }); + } + // Install rustls CryptoProvider early — prevents panic under concurrent load let _ = rustls::crypto::ring::default_provider().install_default(); @@ -267,6 +280,7 @@ async fn main() { } // Remove stale MITM port file let _ = std::fs::remove_file(dirs_data_dir().join("mitm-port")); + eprintln!(" \x1b[1;32m✓ Server shutdown complete\x1b[0m\n"); info!("Server shutdown complete"); } @@ -290,8 +304,14 @@ async fn shutdown_signal() { let terminate = std::future::pending::<()>(); tokio::select! { - _ = ctrl_c => info!("Received SIGINT, shutting down..."), - _ = terminate => info!("Received SIGTERM, shutting down..."), + _ = ctrl_c => { + eprintln!("\n \x1b[1;33m⚡ Shutting down gracefully...\x1b[0m"); + info!("Received SIGINT, shutting down..."); + }, + _ = terminate => { + eprintln!("\n \x1b[1;33m⚡ Received SIGTERM, shutting down...\x1b[0m"); + info!("Received SIGTERM, shutting down..."); + }, } }