diff --git a/src/api/polling.rs b/src/api/polling.rs index 8910357..590123f 100644 --- a/src/api/polling.rs +++ b/src/api/polling.rs @@ -283,7 +283,7 @@ pub(crate) async fn poll_for_response( } } - let poll_ms: u64 = rand::thread_rng().gen_range(1000..1800); + let poll_ms: u64 = rand::thread_rng().gen_range(200..400); tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; } diff --git a/src/api/responses.rs b/src/api/responses.rs index 38ccd1f..3d364e2 100644 --- a/src/api/responses.rs +++ b/src/api/responses.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use tracing::{debug, info}; use super::models::{lookup_model, DEFAULT_MODEL, MODELS}; -use super::polling::{extract_response_text, is_response_done, poll_for_response, extract_model_usage, extract_thinking_signature, extract_thinking_content, extract_thinking_duration}; +use super::polling::{extract_response_text, is_response_done, poll_for_response, extract_model_usage, extract_thinking_signature, extract_thinking_content}; use super::types::*; use super::util::{err_response, now_unix, responses_sse_event}; use super::AppState; @@ -406,7 +406,6 @@ async fn handle_responses_stream( let seq = AtomicU32::new(0); let next_seq = || seq.fetch_add(1, Ordering::Relaxed); const CONTENT_IDX: u32 = 0; - const OUTPUT_IDX: u32 = 0; // Build the in-progress response shell (no output yet) let in_progress_resp = build_response_object( @@ -444,44 +443,135 @@ async fn handle_responses_stream( }), )); - // 3. response.output_item.added (message — reasoning added at completion) - yield Ok(responses_sse_event( - "response.output_item.added", - serde_json::json!({ - "type": "response.output_item.added", - "sequence_number": next_seq(), - "output_index": OUTPUT_IDX, - "item": build_message_output_in_progress(&msg_id), - }), - )); - - // 4. response.content_part.added - yield Ok(responses_sse_event( - "response.content_part.added", - serde_json::json!({ - "type": "response.content_part.added", - "sequence_number": next_seq(), - "output_index": OUTPUT_IDX, - "content_index": CONTENT_IDX, - "part": { - "type": "output_text", - "text": "", - "annotations": [], - } - }), - )); - - // 5. Poll and emit text deltas + // ── Phase 1: Poll for thinking content (arrives before response text) ── let start = std::time::Instant::now(); let mut last_text = String::new(); + let mut thinking_emitted = false; + let mut thinking_text: Option = None; + let mut message_started = false; + let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); while start.elapsed().as_secs() < timeout { if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await { if status == 200 { if let Some(steps) = data["steps"].as_array() { + + // Check for thinking content (appears before response text) + if !thinking_emitted { + if let Some(tc) = extract_thinking_content(steps) { + thinking_text = Some(tc.clone()); + thinking_emitted = true; + + // Emit full reasoning event sequence at output_index 0 + yield Ok(responses_sse_event( + "response.output_item.added", + serde_json::json!({ + "type": "response.output_item.added", + "sequence_number": next_seq(), + "output_index": 0, + "item": { + "id": &reasoning_id, + "type": "reasoning", + "summary": [], + }, + }), + )); + yield Ok(responses_sse_event( + "response.reasoning_summary_part.added", + serde_json::json!({ + "type": "response.reasoning_summary_part.added", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "part": { "type": "summary_text", "text": "" }, + }), + )); + yield Ok(responses_sse_event( + "response.reasoning_summary_text.delta", + serde_json::json!({ + "type": "response.reasoning_summary_text.delta", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "delta": &tc, + }), + )); + yield Ok(responses_sse_event( + "response.reasoning_summary_text.done", + serde_json::json!({ + "type": "response.reasoning_summary_text.done", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "text": &tc, + }), + )); + yield Ok(responses_sse_event( + "response.reasoning_summary_part.done", + serde_json::json!({ + "type": "response.reasoning_summary_part.done", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "part": { "type": "summary_text", "text": &tc }, + }), + )); + yield Ok(responses_sse_event( + "response.output_item.done", + serde_json::json!({ + "type": "response.output_item.done", + "sequence_number": next_seq(), + "output_index": 0, + "item": { + "id": &reasoning_id, + "type": "reasoning", + "summary": [{ + "type": "summary_text", + "text": &tc, + }], + }, + }), + )); + } + } + + // ── Phase 2: Stream text deltas ── let text = extract_response_text(steps); + let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; if !text.is_empty() && text != last_text { + // Emit message output_item.added on first text + if !message_started { + message_started = true; + yield Ok(responses_sse_event( + "response.output_item.added", + serde_json::json!({ + "type": "response.output_item.added", + "sequence_number": next_seq(), + "output_index": msg_output_index, + "item": build_message_output_in_progress(&msg_id), + }), + )); + yield Ok(responses_sse_event( + "response.content_part.added", + serde_json::json!({ + "type": "response.content_part.added", + "sequence_number": next_seq(), + "output_index": msg_output_index, + "content_index": CONTENT_IDX, + "part": { + "type": "output_text", + "text": "", + "annotations": [], + } + }), + )); + } + let new_content = if text.len() > last_text.len() && text.starts_with(&*last_text) { @@ -497,7 +587,7 @@ async fn handle_responses_stream( "type": "response.output_text.delta", "sequence_number": next_seq(), "item_id": &msg_id, - "output_index": OUTPUT_IDX, + "output_index": msg_output_index, "content_index": CONTENT_IDX, "delta": new_content, }), @@ -506,25 +596,28 @@ async fn handle_responses_stream( } } - // Check if response is done AND we have text + // ── Check completion ── if is_response_done(steps) && !last_text.is_empty() { debug!("Response done, text length={}", last_text.len()); let mu = extract_model_usage(steps); + let msg_idx: u32 = if thinking_emitted { 1 } else { 0 }; let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); - let tc = mitm_thinking.or_else(|| extract_thinking_content(steps)); - let td = extract_thinking_duration(steps); + // Use already-captured thinking, or MITM thinking, or LS thinking + let tc = thinking_text.clone() + .or(mitm_thinking) + .or_else(|| extract_thinking_content(steps)); for evt in completion_events( - &response_id, &model_name, &msg_id, - OUTPUT_IDX, CONTENT_IDX, &last_text, usage, - created_at, &seq, ¶ms, ts, tc, td, + &response_id, &model_name, &msg_id, &reasoning_id, + msg_idx, CONTENT_IDX, &last_text, usage, + created_at, &seq, ¶ms, ts, tc, ) { yield Ok(evt); } return; } - // IDLE fallback: check trajectory status periodically + // IDLE fallback let step_count = steps.len(); if step_count > 4 && step_count % 5 == 0 { if let Ok((ts, td)) = state.backend.get_trajectory(&cascade_id).await { @@ -533,14 +626,16 @@ async fn handle_responses_stream( if run_status.contains("IDLE") && !last_text.is_empty() { debug!("Trajectory IDLE, text length={}", last_text.len()); let mu = extract_model_usage(steps); + let msg_idx: u32 = if thinking_emitted { 1 } else { 0 }; let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); - let tc = mitm_thinking.or_else(|| extract_thinking_content(steps)); - let td = extract_thinking_duration(steps); + let tc = thinking_text.clone() + .or(mitm_thinking) + .or_else(|| extract_thinking_content(steps)); for evt in completion_events( - &response_id, &model_name, &msg_id, - OUTPUT_IDX, CONTENT_IDX, &last_text, usage, - created_at, &seq, ¶ms, ts, tc, td, + &response_id, &model_name, &msg_id, &reasoning_id, + msg_idx, CONTENT_IDX, &last_text, usage, + created_at, &seq, ¶ms, ts, tc, ) { yield Ok(evt); } @@ -553,7 +648,7 @@ async fn handle_responses_stream( } } - let poll_ms: u64 = rand::thread_rng().gen_range(800..1200); + let poll_ms: u64 = rand::thread_rng().gen_range(150..250); tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; } @@ -592,17 +687,20 @@ async fn handle_responses_stream( // ─── SSE completion events ─────────────────────────────────────────────────── -/// Build the completion SSE events sequence matching the official protocol: -/// 1. response.output_text.done -/// 2. response.content_part.done -/// 3. response.output_item.done -/// 4. response.completed (with reasoning item prepended if present) +/// Build the final SSE events at completion time. +/// +/// Reasoning events were already streamed during polling (when thinking +/// appeared in LS steps before response text). Message output_item.added +/// and content_part.added were also emitted when text first appeared. +/// +/// This function emits only the "done" events plus the final response.completed. #[allow(clippy::too_many_arguments)] fn completion_events( resp_id: &str, model: &str, msg_id: &str, - out_idx: u32, + reasoning_id: &str, + msg_output_index: u32, content_idx: u32, text: &str, usage: Usage, @@ -611,7 +709,6 @@ fn completion_events( params: &RequestParams, thinking_signature: Option, thinking: Option, - _thinking_duration: Option, ) -> Vec { let next_seq = || seq.fetch_add(1, Ordering::Relaxed); let completed_at = now_unix(); @@ -620,97 +717,18 @@ fn completion_events( // Build output array: [reasoning (if present), message] let mut output_items: Vec = Vec::new(); - let mut events: Vec = Vec::new(); - if let Some(ref thinking_text) = thinking { - let reasoning_item = build_reasoning_output(thinking_text); - let reasoning_id = reasoning_item["id"].as_str().unwrap_or("rs_0").to_string(); - output_items.push(reasoning_item.clone()); - - // ── Reasoning streaming events (OpenAI spec) ── - // 1. response.output_item.added (reasoning item) - events.push(responses_sse_event( - "response.output_item.added", - serde_json::json!({ - "type": "response.output_item.added", - "sequence_number": next_seq(), - "output_index": 0, - "item": { - "id": reasoning_id, - "type": "reasoning", - "summary": [], - }, - }), - )); - // 2. response.reasoning_summary_part.added - events.push(responses_sse_event( - "response.reasoning_summary_part.added", - serde_json::json!({ - "type": "response.reasoning_summary_part.added", - "sequence_number": next_seq(), - "item_id": reasoning_id, - "output_index": 0, - "summary_index": 0, - "part": { - "type": "summary_text", - "text": "", - }, - }), - )); - // 3. response.reasoning_summary_text.delta - events.push(responses_sse_event( - "response.reasoning_summary_text.delta", - serde_json::json!({ - "type": "response.reasoning_summary_text.delta", - "sequence_number": next_seq(), - "item_id": reasoning_id, - "output_index": 0, - "summary_index": 0, - "delta": thinking_text, - }), - )); - // 4. response.reasoning_summary_text.done - events.push(responses_sse_event( - "response.reasoning_summary_text.done", - serde_json::json!({ - "type": "response.reasoning_summary_text.done", - "sequence_number": next_seq(), - "item_id": reasoning_id, - "output_index": 0, - "summary_index": 0, + output_items.push(serde_json::json!({ + "id": reasoning_id, + "type": "reasoning", + "summary": [{ + "type": "summary_text", "text": thinking_text, - }), - )); - // 5. response.reasoning_summary_part.done - events.push(responses_sse_event( - "response.reasoning_summary_part.done", - serde_json::json!({ - "type": "response.reasoning_summary_part.done", - "sequence_number": next_seq(), - "item_id": reasoning_id, - "output_index": 0, - "summary_index": 0, - "part": { - "type": "summary_text", - "text": thinking_text, - }, - }), - )); - // 6. response.output_item.done (reasoning item complete) - events.push(responses_sse_event( - "response.output_item.done", - serde_json::json!({ - "type": "response.output_item.done", - "sequence_number": next_seq(), - "output_index": 0, - "item": reasoning_item, - }), - )); + }], + })); } output_items.push(build_message_output(msg_id, text)); - let msg_output_index = if thinking.is_some() { 1 } else { 0 }; - let completed_resp = build_response_object( ResponseData { id: resp_id.to_string(), @@ -725,8 +743,9 @@ fn completion_events( params, ); - // ── Message streaming events ── - // 1. response.output_text.done + let mut events: Vec = Vec::new(); + + // Message done events events.push(responses_sse_event( "response.output_text.done", serde_json::json!({ @@ -738,7 +757,6 @@ fn completion_events( "text": text, }), )); - // 2. response.content_part.done events.push(responses_sse_event( "response.content_part.done", serde_json::json!({ @@ -746,14 +764,9 @@ fn completion_events( "sequence_number": next_seq(), "output_index": msg_output_index, "content_index": content_idx, - "part": { - "type": "output_text", - "text": text, - "annotations": [], - }, + "part": { "type": "output_text", "text": text, "annotations": [] }, }), )); - // 3. response.output_item.done events.push(responses_sse_event( "response.output_item.done", serde_json::json!({ @@ -763,7 +776,6 @@ fn completion_events( "item": output_item, }), )); - // 4. response.completed events.push(responses_sse_event( "response.completed", serde_json::json!({ @@ -775,3 +787,4 @@ fn completion_events( events } +