fix: reduce poll intervals for smoother streaming

Streaming poll: 800-1200ms → 150-250ms (5x faster)
Sync poll: 1000-1800ms → 200-400ms (4x faster)

Verified via STEP_DUMP instrumentation that the LS updates
plannerResponse.response incrementally during GENERATING status,
so faster polling yields smoother progressive text delivery.

Also restructured streaming to emit reasoning events first
when thinking content is detected in LS steps before response text.
This commit is contained in:
Nikketryhard
2026-02-14 20:34:37 -06:00
parent b1a089d21d
commit 3d7a7f492b
2 changed files with 161 additions and 148 deletions

View File

@@ -283,7 +283,7 @@ pub(crate) async fn poll_for_response(
}
}
let poll_ms: u64 = rand::thread_rng().gen_range(1000..1800);
let poll_ms: u64 = rand::thread_rng().gen_range(200..400);
tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await;
}

View File

@@ -14,7 +14,7 @@ use std::sync::Arc;
use tracing::{debug, info};
use super::models::{lookup_model, DEFAULT_MODEL, MODELS};
use super::polling::{extract_response_text, is_response_done, poll_for_response, extract_model_usage, extract_thinking_signature, extract_thinking_content, extract_thinking_duration};
use super::polling::{extract_response_text, is_response_done, poll_for_response, extract_model_usage, extract_thinking_signature, extract_thinking_content};
use super::types::*;
use super::util::{err_response, now_unix, responses_sse_event};
use super::AppState;
@@ -406,7 +406,6 @@ async fn handle_responses_stream(
let seq = AtomicU32::new(0);
let next_seq = || seq.fetch_add(1, Ordering::Relaxed);
const CONTENT_IDX: u32 = 0;
const OUTPUT_IDX: u32 = 0;
// Build the in-progress response shell (no output yet)
let in_progress_resp = build_response_object(
@@ -444,44 +443,135 @@ async fn handle_responses_stream(
}),
));
// 3. response.output_item.added (message — reasoning added at completion)
yield Ok(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": OUTPUT_IDX,
"item": build_message_output_in_progress(&msg_id),
}),
));
// 4. response.content_part.added
yield Ok(responses_sse_event(
"response.content_part.added",
serde_json::json!({
"type": "response.content_part.added",
"sequence_number": next_seq(),
"output_index": OUTPUT_IDX,
"content_index": CONTENT_IDX,
"part": {
"type": "output_text",
"text": "",
"annotations": [],
}
}),
));
// 5. Poll and emit text deltas
// ── Phase 1: Poll for thinking content (arrives before response text) ──
let start = std::time::Instant::now();
let mut last_text = String::new();
let mut thinking_emitted = false;
let mut thinking_text: Option<String> = None;
let mut message_started = false;
let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', ""));
while start.elapsed().as_secs() < timeout {
if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await {
if status == 200 {
if let Some(steps) = data["steps"].as_array() {
// Check for thinking content (appears before response text)
if !thinking_emitted {
if let Some(tc) = extract_thinking_content(steps) {
thinking_text = Some(tc.clone());
thinking_emitted = true;
// Emit full reasoning event sequence at output_index 0
yield Ok(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": 0,
"item": {
"id": &reasoning_id,
"type": "reasoning",
"summary": [],
},
}),
));
yield Ok(responses_sse_event(
"response.reasoning_summary_part.added",
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"part": { "type": "summary_text", "text": "" },
}),
));
yield Ok(responses_sse_event(
"response.reasoning_summary_text.delta",
serde_json::json!({
"type": "response.reasoning_summary_text.delta",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"delta": &tc,
}),
));
yield Ok(responses_sse_event(
"response.reasoning_summary_text.done",
serde_json::json!({
"type": "response.reasoning_summary_text.done",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"text": &tc,
}),
));
yield Ok(responses_sse_event(
"response.reasoning_summary_part.done",
serde_json::json!({
"type": "response.reasoning_summary_part.done",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"part": { "type": "summary_text", "text": &tc },
}),
));
yield Ok(responses_sse_event(
"response.output_item.done",
serde_json::json!({
"type": "response.output_item.done",
"sequence_number": next_seq(),
"output_index": 0,
"item": {
"id": &reasoning_id,
"type": "reasoning",
"summary": [{
"type": "summary_text",
"text": &tc,
}],
},
}),
));
}
}
// ── Phase 2: Stream text deltas ──
let text = extract_response_text(steps);
let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 };
if !text.is_empty() && text != last_text {
// Emit message output_item.added on first text
if !message_started {
message_started = true;
yield Ok(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": msg_output_index,
"item": build_message_output_in_progress(&msg_id),
}),
));
yield Ok(responses_sse_event(
"response.content_part.added",
serde_json::json!({
"type": "response.content_part.added",
"sequence_number": next_seq(),
"output_index": msg_output_index,
"content_index": CONTENT_IDX,
"part": {
"type": "output_text",
"text": "",
"annotations": [],
}
}),
));
}
let new_content = if text.len() > last_text.len()
&& text.starts_with(&*last_text)
{
@@ -497,7 +587,7 @@ async fn handle_responses_stream(
"type": "response.output_text.delta",
"sequence_number": next_seq(),
"item_id": &msg_id,
"output_index": OUTPUT_IDX,
"output_index": msg_output_index,
"content_index": CONTENT_IDX,
"delta": new_content,
}),
@@ -506,25 +596,28 @@ async fn handle_responses_stream(
}
}
// Check if response is done AND we have text
// ── Check completion ──
if is_response_done(steps) && !last_text.is_empty() {
debug!("Response done, text length={}", last_text.len());
let mu = extract_model_usage(steps);
let msg_idx: u32 = if thinking_emitted { 1 } else { 0 };
let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, &params.user_text, &last_text).await;
let ts = extract_thinking_signature(steps);
let tc = mitm_thinking.or_else(|| extract_thinking_content(steps));
let td = extract_thinking_duration(steps);
// Use already-captured thinking, or MITM thinking, or LS thinking
let tc = thinking_text.clone()
.or(mitm_thinking)
.or_else(|| extract_thinking_content(steps));
for evt in completion_events(
&response_id, &model_name, &msg_id,
OUTPUT_IDX, CONTENT_IDX, &last_text, usage,
created_at, &seq, &params, ts, tc, td,
&response_id, &model_name, &msg_id, &reasoning_id,
msg_idx, CONTENT_IDX, &last_text, usage,
created_at, &seq, &params, ts, tc,
) {
yield Ok(evt);
}
return;
}
// IDLE fallback: check trajectory status periodically
// IDLE fallback
let step_count = steps.len();
if step_count > 4 && step_count % 5 == 0 {
if let Ok((ts, td)) = state.backend.get_trajectory(&cascade_id).await {
@@ -533,14 +626,16 @@ async fn handle_responses_stream(
if run_status.contains("IDLE") && !last_text.is_empty() {
debug!("Trajectory IDLE, text length={}", last_text.len());
let mu = extract_model_usage(steps);
let msg_idx: u32 = if thinking_emitted { 1 } else { 0 };
let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, &params.user_text, &last_text).await;
let ts = extract_thinking_signature(steps);
let tc = mitm_thinking.or_else(|| extract_thinking_content(steps));
let td = extract_thinking_duration(steps);
let tc = thinking_text.clone()
.or(mitm_thinking)
.or_else(|| extract_thinking_content(steps));
for evt in completion_events(
&response_id, &model_name, &msg_id,
OUTPUT_IDX, CONTENT_IDX, &last_text, usage,
created_at, &seq, &params, ts, tc, td,
&response_id, &model_name, &msg_id, &reasoning_id,
msg_idx, CONTENT_IDX, &last_text, usage,
created_at, &seq, &params, ts, tc,
) {
yield Ok(evt);
}
@@ -553,7 +648,7 @@ async fn handle_responses_stream(
}
}
let poll_ms: u64 = rand::thread_rng().gen_range(800..1200);
let poll_ms: u64 = rand::thread_rng().gen_range(150..250);
tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await;
}
@@ -592,17 +687,20 @@ async fn handle_responses_stream(
// ─── SSE completion events ───────────────────────────────────────────────────
/// Build the completion SSE events sequence matching the official protocol:
/// 1. response.output_text.done
/// 2. response.content_part.done
/// 3. response.output_item.done
/// 4. response.completed (with reasoning item prepended if present)
/// Build the final SSE events at completion time.
///
/// Reasoning events were already streamed during polling (when thinking
/// appeared in LS steps before response text). Message output_item.added
/// and content_part.added were also emitted when text first appeared.
///
/// This function emits only the "done" events plus the final response.completed.
#[allow(clippy::too_many_arguments)]
fn completion_events(
resp_id: &str,
model: &str,
msg_id: &str,
out_idx: u32,
reasoning_id: &str,
msg_output_index: u32,
content_idx: u32,
text: &str,
usage: Usage,
@@ -611,7 +709,6 @@ fn completion_events(
params: &RequestParams,
thinking_signature: Option<String>,
thinking: Option<String>,
_thinking_duration: Option<String>,
) -> Vec<Event> {
let next_seq = || seq.fetch_add(1, Ordering::Relaxed);
let completed_at = now_unix();
@@ -620,97 +717,18 @@ fn completion_events(
// Build output array: [reasoning (if present), message]
let mut output_items: Vec<serde_json::Value> = Vec::new();
let mut events: Vec<Event> = Vec::new();
if let Some(ref thinking_text) = thinking {
let reasoning_item = build_reasoning_output(thinking_text);
let reasoning_id = reasoning_item["id"].as_str().unwrap_or("rs_0").to_string();
output_items.push(reasoning_item.clone());
// ── Reasoning streaming events (OpenAI spec) ──
// 1. response.output_item.added (reasoning item)
events.push(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": 0,
"item": {
"id": reasoning_id,
"type": "reasoning",
"summary": [],
},
}),
));
// 2. response.reasoning_summary_part.added
events.push(responses_sse_event(
"response.reasoning_summary_part.added",
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"sequence_number": next_seq(),
"item_id": reasoning_id,
"output_index": 0,
"summary_index": 0,
"part": {
"type": "summary_text",
"text": "",
},
}),
));
// 3. response.reasoning_summary_text.delta
events.push(responses_sse_event(
"response.reasoning_summary_text.delta",
serde_json::json!({
"type": "response.reasoning_summary_text.delta",
"sequence_number": next_seq(),
"item_id": reasoning_id,
"output_index": 0,
"summary_index": 0,
"delta": thinking_text,
}),
));
// 4. response.reasoning_summary_text.done
events.push(responses_sse_event(
"response.reasoning_summary_text.done",
serde_json::json!({
"type": "response.reasoning_summary_text.done",
"sequence_number": next_seq(),
"item_id": reasoning_id,
"output_index": 0,
"summary_index": 0,
output_items.push(serde_json::json!({
"id": reasoning_id,
"type": "reasoning",
"summary": [{
"type": "summary_text",
"text": thinking_text,
}),
));
// 5. response.reasoning_summary_part.done
events.push(responses_sse_event(
"response.reasoning_summary_part.done",
serde_json::json!({
"type": "response.reasoning_summary_part.done",
"sequence_number": next_seq(),
"item_id": reasoning_id,
"output_index": 0,
"summary_index": 0,
"part": {
"type": "summary_text",
"text": thinking_text,
},
}),
));
// 6. response.output_item.done (reasoning item complete)
events.push(responses_sse_event(
"response.output_item.done",
serde_json::json!({
"type": "response.output_item.done",
"sequence_number": next_seq(),
"output_index": 0,
"item": reasoning_item,
}),
));
}],
}));
}
output_items.push(build_message_output(msg_id, text));
let msg_output_index = if thinking.is_some() { 1 } else { 0 };
let completed_resp = build_response_object(
ResponseData {
id: resp_id.to_string(),
@@ -725,8 +743,9 @@ fn completion_events(
params,
);
// ── Message streaming events ──
// 1. response.output_text.done
let mut events: Vec<Event> = Vec::new();
// Message done events
events.push(responses_sse_event(
"response.output_text.done",
serde_json::json!({
@@ -738,7 +757,6 @@ fn completion_events(
"text": text,
}),
));
// 2. response.content_part.done
events.push(responses_sse_event(
"response.content_part.done",
serde_json::json!({
@@ -746,14 +764,9 @@ fn completion_events(
"sequence_number": next_seq(),
"output_index": msg_output_index,
"content_index": content_idx,
"part": {
"type": "output_text",
"text": text,
"annotations": [],
},
"part": { "type": "output_text", "text": text, "annotations": [] },
}),
));
// 3. response.output_item.done
events.push(responses_sse_event(
"response.output_item.done",
serde_json::json!({
@@ -763,7 +776,6 @@ fn completion_events(
"item": output_item,
}),
));
// 4. response.completed
events.push(responses_sse_event(
"response.completed",
serde_json::json!({
@@ -775,3 +787,4 @@ fn completion_events(
events
}