feat: add reactive streaming and remove dead panel stream code
- Subscribe to StreamCascadeReactiveUpdates for real-time cascade state diffs - Fall back to timer-based polling if streaming RPC unavailable - Remove StreamCascadePanelReactiveUpdates code (dead end, only has plan_status/user_settings) - Remove debug diff file-saving code - Add stream_reactive_rpc() helper to backend
This commit is contained in:
@@ -443,7 +443,7 @@ async fn handle_responses_stream(
|
|||||||
}),
|
}),
|
||||||
));
|
));
|
||||||
|
|
||||||
// ── Phase 1: Poll for thinking content (arrives before response text) ──
|
// ── Stream cascade updates: event-driven instead of timer-based polling ──
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
let mut last_text = String::new();
|
let mut last_text = String::new();
|
||||||
let mut thinking_emitted = false;
|
let mut thinking_emitted = false;
|
||||||
@@ -451,6 +451,20 @@ async fn handle_responses_stream(
|
|||||||
let mut message_started = false;
|
let mut message_started = false;
|
||||||
let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', ""));
|
let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', ""));
|
||||||
|
|
||||||
|
// Try to open a reactive streaming connection for real-time notifications.
|
||||||
|
// Falls back to timer-based polling if the streaming RPC is unavailable.
|
||||||
|
let mut reactive_rx = match state.backend.stream_cascade_updates(&cascade_id).await {
|
||||||
|
Ok(rx) => {
|
||||||
|
debug!("Using reactive streaming for cascade updates");
|
||||||
|
Some(rx)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!("Reactive streaming unavailable, falling back to polling: {e}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
while start.elapsed().as_secs() < timeout {
|
while start.elapsed().as_secs() < timeout {
|
||||||
if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await {
|
if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await {
|
||||||
if status == 200 {
|
if status == 200 {
|
||||||
@@ -648,9 +662,34 @@ async fn handle_responses_stream(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for next update: either reactive notification or fallback timer
|
||||||
|
match reactive_rx {
|
||||||
|
Some(ref mut rx) => {
|
||||||
|
// Wait for reactive notification with a safety timeout
|
||||||
|
let timeout = tokio::time::timeout(
|
||||||
|
tokio::time::Duration::from_millis(500),
|
||||||
|
rx.recv(),
|
||||||
|
).await;
|
||||||
|
match timeout {
|
||||||
|
Ok(Some(_diff)) => {
|
||||||
|
// Drain any additional queued notifications (coalesce)
|
||||||
|
while rx.try_recv().is_ok() {}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
// Stream closed — fall back to polling
|
||||||
|
debug!("Reactive stream closed, falling back to polling");
|
||||||
|
reactive_rx = None;
|
||||||
|
}
|
||||||
|
Err(_) => {} // timeout — fetch anyway as safety net
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Fallback: timer-based polling
|
||||||
let poll_ms: u64 = rand::thread_rng().gen_range(150..250);
|
let poll_ms: u64 = rand::thread_rng().gen_range(150..250);
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await;
|
tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Timeout — emit incomplete response
|
// Timeout — emit incomplete response
|
||||||
let timeout_resp = build_response_object(
|
let timeout_resp = build_response_object(
|
||||||
|
|||||||
105
src/backend.rs
105
src/backend.rs
@@ -366,6 +366,111 @@ impl Backend {
|
|||||||
let body = serde_json::json!({"cascadeId": cascade_id});
|
let body = serde_json::json!({"cascadeId": cascade_id});
|
||||||
self.call_json("GetCascadeTrajectory", &body).await
|
self.call_json("GetCascadeTrajectory", &body).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Open a server-streaming reactive updates RPC.
|
||||||
|
/// `rpc_method` is the ConnectRPC method name, e.g. "StreamCascadeReactiveUpdates".
|
||||||
|
async fn stream_reactive_rpc(
|
||||||
|
&self,
|
||||||
|
rpc_method: &str,
|
||||||
|
cascade_id: &str,
|
||||||
|
) -> Result<tokio::sync::mpsc::Receiver<serde_json::Value>, String> {
|
||||||
|
let (base, csrf) = {
|
||||||
|
let guard = self.inner.read().await;
|
||||||
|
(
|
||||||
|
format!("https://127.0.0.1:{}", guard.https_port),
|
||||||
|
guard.csrf.clone(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let url = format!("{base}/{LS_SERVICE}/{rpc_method}");
|
||||||
|
let body = serde_json::json!({
|
||||||
|
"protocolVersion": 1,
|
||||||
|
"id": cascade_id,
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut headers = Self::common_headers(&csrf);
|
||||||
|
headers.insert("Content-Type", HeaderValue::from_static("application/connect+json"));
|
||||||
|
headers.insert("Connect-Protocol-Version", HeaderValue::from_static("1"));
|
||||||
|
|
||||||
|
// Connect protocol envelope: [flags:1][length:4][payload]
|
||||||
|
let json_bytes = serde_json::to_vec(&body).unwrap();
|
||||||
|
let mut envelope = Vec::with_capacity(5 + json_bytes.len());
|
||||||
|
envelope.push(0x00);
|
||||||
|
envelope.extend_from_slice(&(json_bytes.len() as u32).to_be_bytes());
|
||||||
|
envelope.extend_from_slice(&json_bytes);
|
||||||
|
|
||||||
|
let mut resp = self
|
||||||
|
.client
|
||||||
|
.post(&url)
|
||||||
|
.headers(headers)
|
||||||
|
.body(envelope)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("{rpc_method} HTTP error: {e}"))?;
|
||||||
|
|
||||||
|
let status = resp.status().as_u16();
|
||||||
|
if status != 200 {
|
||||||
|
let err_body = resp.bytes().await.unwrap_or_default();
|
||||||
|
let err_text = String::from_utf8_lossy(&err_body);
|
||||||
|
return Err(format!("{rpc_method} failed: {status} — {err_text}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let resp_ct = resp.headers()
|
||||||
|
.get("content-type")
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.unwrap_or("unknown")
|
||||||
|
.to_string();
|
||||||
|
debug!("{rpc_method}: connected for cascade {cascade_id}, content-type: {resp_ct}");
|
||||||
|
|
||||||
|
let (tx, rx) = tokio::sync::mpsc::channel::<serde_json::Value>(64);
|
||||||
|
let method = rpc_method.to_string();
|
||||||
|
let cid = cascade_id.to_string();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
|
||||||
|
while let Ok(Some(chunk)) = resp.chunk().await {
|
||||||
|
if chunk.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
buf.extend_from_slice(&chunk);
|
||||||
|
|
||||||
|
while buf.len() >= 5 {
|
||||||
|
let flags = buf[0];
|
||||||
|
let len = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
|
||||||
|
|
||||||
|
if buf.len() < 5 + len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let payload = &buf[5..5 + len];
|
||||||
|
|
||||||
|
if flags == 0x02 {
|
||||||
|
let text = String::from_utf8_lossy(payload);
|
||||||
|
debug!("{method}: end frame: {text}");
|
||||||
|
} else if let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) {
|
||||||
|
if tx.send(json).await.is_err() {
|
||||||
|
buf.drain(..5 + len);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.drain(..5 + len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug!("{method}: stream ended for {cid}");
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// StreamCascadeReactiveUpdates — real-time cascade state diffs.
|
||||||
|
pub async fn stream_cascade_updates(
|
||||||
|
&self,
|
||||||
|
cascade_id: &str,
|
||||||
|
) -> Result<tokio::sync::mpsc::Receiver<serde_json::Value>, String> {
|
||||||
|
self.stream_reactive_rpc("StreamCascadeReactiveUpdates", cascade_id).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Discovery helpers ───────────────────────────────────────────────────────
|
// ─── Discovery helpers ───────────────────────────────────────────────────────
|
||||||
|
|||||||
24
src/main.rs
24
src/main.rs
@@ -53,6 +53,19 @@ struct Cli {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
// Ignore SIGPIPE — prevents instant death when piped through tee/grep
|
||||||
|
#[cfg(unix)]
|
||||||
|
{
|
||||||
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
|
let mut sigpipe = signal(SignalKind::pipe()).expect("failed to install SIGPIPE handler");
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
sigpipe.recv().await;
|
||||||
|
// Silently ignore SIGPIPE
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Install rustls CryptoProvider early — prevents panic under concurrent load
|
// Install rustls CryptoProvider early — prevents panic under concurrent load
|
||||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
|
|
||||||
@@ -267,6 +280,7 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
// Remove stale MITM port file
|
// Remove stale MITM port file
|
||||||
let _ = std::fs::remove_file(dirs_data_dir().join("mitm-port"));
|
let _ = std::fs::remove_file(dirs_data_dir().join("mitm-port"));
|
||||||
|
eprintln!(" \x1b[1;32m✓ Server shutdown complete\x1b[0m\n");
|
||||||
info!("Server shutdown complete");
|
info!("Server shutdown complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -290,8 +304,14 @@ async fn shutdown_signal() {
|
|||||||
let terminate = std::future::pending::<()>();
|
let terminate = std::future::pending::<()>();
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = ctrl_c => info!("Received SIGINT, shutting down..."),
|
_ = ctrl_c => {
|
||||||
_ = terminate => info!("Received SIGTERM, shutting down..."),
|
eprintln!("\n \x1b[1;33m⚡ Shutting down gracefully...\x1b[0m");
|
||||||
|
info!("Received SIGINT, shutting down...");
|
||||||
|
},
|
||||||
|
_ = terminate => {
|
||||||
|
eprintln!("\n \x1b[1;33m⚡ Received SIGTERM, shutting down...\x1b[0m");
|
||||||
|
info!("Received SIGTERM, shutting down...");
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user