From b3af73cebd36684731fa77806685fac64f41ec27 Mon Sep 17 00:00:00 2001 From: Nikketryhard Date: Sun, 15 Feb 2026 01:03:39 -0600 Subject: [PATCH] feat: sync all endpoints with MITM LS bypass + real-time thinking streaming - Responses API (streaming): MITM bypass path polls MitmStore directly when custom tools are active, skipping LS step polling entirely. Streams thinking text deltas in real-time as they arrive from the MITM. Handles function calls, text response, and thinking/reasoning events. - Responses API (sync): Same MITM bypass for non-streaming responses. Polls MitmStore for function calls or completed text before falling back to LS path. - Gemini endpoint: MITM bypass polls MitmStore directly for tool call responses, eliminating LS overhead. - MitmStore: Added captured_thinking_text field with set/peek/take methods for real-time thinking text capture from MITM SSE. - MITM proxy: Now captures both thinking_text and response_text from StreamingAccumulator into MitmStore when bypass mode is active. --- .gemini/plans/sync-and-latency.md | 46 ++++ src/api/gemini.rs | 69 +++++ src/api/responses.rs | 406 +++++++++++++++++++++++++++++- src/mitm/proxy.rs | 34 ++- src/mitm/store.rs | 23 +- 5 files changed, 564 insertions(+), 14 deletions(-) create mode 100644 .gemini/plans/sync-and-latency.md diff --git a/.gemini/plans/sync-and-latency.md b/.gemini/plans/sync-and-latency.md new file mode 100644 index 0000000..d3b7c68 --- /dev/null +++ b/.gemini/plans/sync-and-latency.md @@ -0,0 +1,46 @@ +# Sync All Endpoints + Latency + Thinking Streaming + +## Phase 1: Sync Responses API (`/v1/responses`) with LS bypass + +Current state: + +- `handle_responses_stream` (line 529-859) polls LS steps for text +- Doesn't use MitmStore bypass at all +- Still suffers from LS multi-turn overhead when tools are active + +Fix: + +- Add MITM bypass path (same as completions) — check MitmStore for text + function calls +- For function calls: emit `response.output_item.added` (function_call type) + done events +- For text: stream from MitmStore `captured_response_text` + `response_complete` + +## Phase 2: Sync Gemini endpoint (`/v1/gemini`) with LS bypass + +Current state: + +- `handle_gemini` (line 57-236) uses `poll_for_response` then checks MitmStore +- Already checks `take_any_function_calls()` after polling +- But `poll_for_response` still goes through LS steps + +Fix: + +- When tools are active, poll MitmStore directly instead of `poll_for_response` + +## Phase 3: Latency improvements + +- Reduce poll intervals across all handlers +- Add MITM store thinking_text capture for real-time streaming + +## Phase 4: Real-time thinking streaming investigation + +Current state: + +- Google SSE includes `thought: true` parts with thinking text +- `streaming_acc.thinking_text` accumulates this +- Currently only used for final usage stats, not streamed in real-time + +Investigation needed: + +- The MITM intercept already captures thinking_text per-chunk +- Need to store thinking_text updates in MitmStore incrementally +- Responses handler can then stream thinking deltas in real-time diff --git a/src/api/gemini.rs b/src/api/gemini.rs index add9327..48ebd3a 100644 --- a/src/api/gemini.rs +++ b/src/api/gemini.rs @@ -183,6 +183,75 @@ pub(crate) async fn handle_gemini( } } + let has_custom_tools = state.mitm_store.get_tools().await.is_some(); + + // Clear stale response + state.mitm_store.clear_response_async().await; + + // ── MITM bypass: when tools active, poll MitmStore directly ── + if has_custom_tools { + let start = std::time::Instant::now(); + while start.elapsed().as_secs() < body.timeout { + // Check for function calls + let captured = state.mitm_store.take_any_function_calls().await; + if let Some(ref calls) = captured { + if !calls.is_empty() { + let parts: Vec = calls + .iter() + .map(|fc| { + serde_json::json!({ + "functionCall": { + "name": fc.name, + "args": fc.args, + } + }) + }) + .collect(); + + return Json(serde_json::json!({ + "candidates": [{ + "content": { + "parts": parts, + "role": "model", + }, + "finishReason": "STOP", + }], + "modelVersion": model_name, + })) + .into_response(); + } + } + + // Check for completed text response + if state.mitm_store.is_response_complete() { + let text = state.mitm_store.take_response_text().await.unwrap_or_default(); + return Json(serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": text}], + "role": "model", + }, + "finishReason": "STOP", + }], + "modelVersion": model_name, + })) + .into_response(); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + + // Timeout + return Json(serde_json::json!({ + "error": { + "message": "Request timed out", + "type": "timeout_error", + } + })) + .into_response(); + } + + // ── Normal LS path (no custom tools) ── // Poll for response let poll_result = poll_for_response(&state, &cascade_id, body.timeout).await; diff --git a/src/api/responses.rs b/src/api/responses.rs index 188df60..24b7af4 100644 --- a/src/api/responses.rs +++ b/src/api/responses.rs @@ -440,6 +440,103 @@ async fn handle_responses_sync( params: RequestParams, ) -> axum::response::Response { let created_at = now_unix(); + let has_custom_tools = state.mitm_store.get_tools().await.is_some(); + + // Clear stale captured response + state.mitm_store.clear_response_async().await; + + // ── MITM bypass: poll MitmStore directly when custom tools active ── + if has_custom_tools { + let start = std::time::Instant::now(); + while start.elapsed().as_secs() < timeout { + // Check for function calls + let captured = state.mitm_store.take_any_function_calls().await; + if let Some(ref calls) = captured { + if !calls.is_empty() { + let mut output_items: Vec = Vec::new(); + for fc in calls { + let call_id = format!( + "call_{}", + uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string() + ); + state.mitm_store.register_call_id(call_id.clone(), fc.name.clone()).await; + let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); + output_items.push(build_function_call_output(&call_id, &fc.name, &arguments)); + } + let (usage, _) = usage_from_poll( + &state.mitm_store, &cascade_id, &None, + ¶ms.user_text, "", + ).await; + let resp = build_response_object( + ResponseData { + id: response_id, + model: model_name, + status: "completed", + created_at, + completed_at: Some(now_unix()), + output: output_items, + usage: Some(usage), + thinking_signature: None, + }, + ¶ms, + ); + return Json(resp).into_response(); + } + } + + // Check for completed text response + if state.mitm_store.is_response_complete() { + let text = state.mitm_store.take_response_text().await.unwrap_or_default(); + let thinking = state.mitm_store.take_thinking_text().await; + let (usage, _) = usage_from_poll( + &state.mitm_store, &cascade_id, &None, + ¶ms.user_text, &text, + ).await; + + let mut output_items: Vec = Vec::new(); + if let Some(ref t) = thinking { + output_items.push(build_reasoning_output(t)); + } + let msg_id = format!("msg_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); + output_items.push(build_message_output(&msg_id, &text)); + + let resp = build_response_object( + ResponseData { + id: response_id, + model: model_name, + status: "completed", + created_at, + completed_at: Some(now_unix()), + output: output_items, + usage: Some(usage), + thinking_signature: None, + }, + ¶ms, + ); + return Json(resp).into_response(); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + + // Timeout + let resp = build_response_object( + ResponseData { + id: response_id, + model: model_name, + status: "incomplete", + created_at, + completed_at: None, + output: vec![], + usage: Some(Usage::estimate(¶ms.user_text, "")), + thinking_signature: None, + }, + ¶ms, + ); + return Json(resp).into_response(); + } + + // ── Normal LS path (no custom tools) ── let poll_result = poll_for_response(&state, &cascade_id, timeout).await; let completed_at = now_unix(); let msg_id = format!( @@ -584,9 +681,316 @@ async fn handle_responses_stream( let mut thinking_text: Option = None; let mut message_started = false; let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); + let has_custom_tools = state.mitm_store.get_tools().await.is_some(); + // Clear stale captured response + state.mitm_store.clear_response_async().await; + + // ── MITM bypass mode (when custom tools are active) ── + // Skip LS entirely — read text, thinking, and tool calls directly from MitmStore. + if has_custom_tools { + let mut last_thinking = String::new(); + + while start.elapsed().as_secs() < timeout { + // Check for function calls first + let captured = state.mitm_store.take_any_function_calls().await; + if let Some(ref calls) = captured { + if !calls.is_empty() { + let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; + for (i, fc) in calls.iter().enumerate() { + let call_id = format!( + "call_{}", + uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string() + ); + let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); + state.mitm_store.register_call_id(call_id.clone(), fc.name.clone()).await; + let fc_item_id = format!("fc_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); + + yield Ok(responses_sse_event( + "response.output_item.added", + serde_json::json!({ + "type": "response.output_item.added", + "sequence_number": next_seq(), + "output_index": msg_output_index as usize + i, + "item": { + "id": &fc_item_id, + "type": "function_call", + "call_id": &call_id, + "name": &fc.name, + "arguments": &arguments, + "status": "completed", + }, + }), + )); + yield Ok(responses_sse_event( + "response.output_item.done", + serde_json::json!({ + "type": "response.output_item.done", + "sequence_number": next_seq(), + "output_index": msg_output_index as usize + i, + "item": { + "id": &fc_item_id, + "type": "function_call", + "call_id": &call_id, + "name": &fc.name, + "arguments": &arguments, + "status": "completed", + }, + }), + )); + } + + // Build output for final response + let mut output_items: Vec = Vec::new(); + for fc in calls { + let call_id = format!( + "call_{}", + uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string() + ); + let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); + output_items.push(build_function_call_output(&call_id, &fc.name, &arguments)); + } + + let (usage, _) = usage_from_poll( + &state.mitm_store, &cascade_id, &None, + ¶ms.user_text, "", + ).await; + + let final_resp = build_response_object( + ResponseData { + id: response_id.clone(), + model: model_name.clone(), + status: "completed", + created_at, + completed_at: Some(now_unix()), + output: output_items, + usage: Some(usage), + thinking_signature: None, + }, + ¶ms, + ); + yield Ok(responses_sse_event( + "response.completed", + serde_json::json!({ + "type": "response.completed", + "sequence_number": next_seq(), + "response": response_to_json(&final_resp), + }), + )); + return; + } + } + + // Stream thinking text in real-time + if !thinking_emitted { + if let Some(thinking) = state.mitm_store.peek_thinking_text().await { + if !thinking.is_empty() && thinking != last_thinking { + // First thinking text — emit reasoning output_item.added + if last_thinking.is_empty() { + yield Ok(responses_sse_event( + "response.output_item.added", + serde_json::json!({ + "type": "response.output_item.added", + "sequence_number": next_seq(), + "output_index": 0, + "item": { + "id": &reasoning_id, + "type": "reasoning", + "summary": [], + }, + }), + )); + yield Ok(responses_sse_event( + "response.reasoning_summary_part.added", + serde_json::json!({ + "type": "response.reasoning_summary_part.added", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "part": { "type": "summary_text", "text": "" }, + }), + )); + } + + // Delta of new thinking text + let delta = if thinking.len() > last_thinking.len() + && thinking.starts_with(&*last_thinking) + { + thinking[last_thinking.len()..].to_string() + } else { + thinking.clone() + }; + + if !delta.is_empty() { + yield Ok(responses_sse_event( + "response.reasoning_summary_text.delta", + serde_json::json!({ + "type": "response.reasoning_summary_text.delta", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "delta": &delta, + }), + )); + } + last_thinking = thinking; + } + } + } + + // Stream response text + if let Some(text) = state.mitm_store.peek_response_text().await { + if !text.is_empty() && text != last_text { + // Finalize thinking if started but not done + if !thinking_emitted && !last_thinking.is_empty() { + thinking_emitted = true; + thinking_text = Some(last_thinking.clone()); + yield Ok(responses_sse_event( + "response.reasoning_summary_text.done", + serde_json::json!({ + "type": "response.reasoning_summary_text.done", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "text": &last_thinking, + }), + )); + yield Ok(responses_sse_event( + "response.reasoning_summary_part.done", + serde_json::json!({ + "type": "response.reasoning_summary_part.done", + "sequence_number": next_seq(), + "item_id": &reasoning_id, + "output_index": 0, + "summary_index": 0, + "part": { "type": "summary_text", "text": &last_thinking }, + }), + )); + yield Ok(responses_sse_event( + "response.output_item.done", + serde_json::json!({ + "type": "response.output_item.done", + "sequence_number": next_seq(), + "output_index": 0, + "item": { + "id": &reasoning_id, + "type": "reasoning", + "summary": [{ + "type": "summary_text", + "text": &last_thinking, + }], + }, + }), + )); + } + + let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; + + if !message_started { + message_started = true; + yield Ok(responses_sse_event( + "response.output_item.added", + serde_json::json!({ + "type": "response.output_item.added", + "sequence_number": next_seq(), + "output_index": msg_output_index, + "item": build_message_output_in_progress(&msg_id), + }), + )); + yield Ok(responses_sse_event( + "response.content_part.added", + serde_json::json!({ + "type": "response.content_part.added", + "sequence_number": next_seq(), + "output_index": msg_output_index, + "content_index": CONTENT_IDX, + "part": { + "type": "output_text", + "text": "", + "annotations": [], + } + }), + )); + } + + let new_content = if text.len() > last_text.len() + && text.starts_with(&*last_text) + { + text[last_text.len()..].to_string() + } else { + text.clone() + }; + + if !new_content.is_empty() { + yield Ok(responses_sse_event( + "response.output_text.delta", + serde_json::json!({ + "type": "response.output_text.delta", + "sequence_number": next_seq(), + "item_id": &msg_id, + "output_index": msg_output_index, + "content_index": CONTENT_IDX, + "delta": &new_content, + }), + )); + last_text = text; + } + } + + // Check if response is complete + if state.mitm_store.is_response_complete() && !last_text.is_empty() { + let msg_idx: u32 = if thinking_emitted { 1 } else { 0 }; + let (usage, _) = usage_from_poll( + &state.mitm_store, &cascade_id, &None, + ¶ms.user_text, &last_text, + ).await; + let tc = thinking_text.clone(); + for evt in completion_events( + &response_id, &model_name, &msg_id, &reasoning_id, + msg_idx, CONTENT_IDX, &last_text, usage, + created_at, &seq, ¶ms, None, tc, + ) { + yield Ok(evt); + } + return; + } + } + + // Poll interval + let poll_ms: u64 = rand::thread_rng().gen_range(150..300); + tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; + } + + // Timeout in bypass mode + let timeout_resp = build_response_object( + ResponseData { + id: response_id.clone(), + model: model_name.clone(), + status: "incomplete", + created_at, + completed_at: None, + output: vec![], + usage: Some(Usage::estimate(¶ms.user_text, "")), + thinking_signature: None, + }, + ¶ms, + ); + yield Ok(responses_sse_event( + "response.completed", + serde_json::json!({ + "type": "response.completed", + "sequence_number": next_seq(), + "response": response_to_json(&timeout_resp), + }), + )); + return; + } + + // ── Normal LS path (no custom tools) ── // Try to open a reactive streaming connection for real-time notifications. - // Falls back to timer-based polling if the streaming RPC is unavailable. let mut reactive_rx = match state.backend.stream_cascade_updates(&cascade_id).await { Ok(rx) => { debug!("Using reactive streaming for cascade updates"); diff --git a/src/mitm/proxy.rs b/src/mitm/proxy.rs index 3b09dc9..089e834 100644 --- a/src/mitm/proxy.rs +++ b/src/mitm/proxy.rs @@ -753,12 +753,17 @@ async fn handle_http_over_tls( info!("MITM: stored {} function call(s) from initial body", streaming_acc.function_calls.len()); } - // Capture response text directly into MitmStore - if bypass_ls && !streaming_acc.response_text.is_empty() { - store.set_response_text(&streaming_acc.response_text).await; - } - if bypass_ls && streaming_acc.is_complete { - store.mark_response_complete(); + // Capture response + thinking text directly into MitmStore + if bypass_ls { + if !streaming_acc.response_text.is_empty() { + store.set_response_text(&streaming_acc.response_text).await; + } + if !streaming_acc.thinking_text.is_empty() { + store.set_thinking_text(&streaming_acc.thinking_text).await; + } + if streaming_acc.is_complete { + store.mark_response_complete(); + } } } @@ -820,12 +825,17 @@ async fn handle_http_over_tls( info!("MITM: stored {} function call(s) from body chunk", streaming_acc.function_calls.len()); } - // Capture response text directly into MitmStore - if bypass_ls && !streaming_acc.response_text.is_empty() { - store.set_response_text(&streaming_acc.response_text).await; - } - if bypass_ls && streaming_acc.is_complete { - store.mark_response_complete(); + // Capture response + thinking text directly into MitmStore + if bypass_ls { + if !streaming_acc.response_text.is_empty() { + store.set_response_text(&streaming_acc.response_text).await; + } + if !streaming_acc.thinking_text.is_empty() { + store.set_thinking_text(&streaming_acc.thinking_text).await; + } + if streaming_acc.is_complete { + store.mark_response_complete(); + } } } diff --git a/src/mitm/store.rs b/src/mitm/store.rs index e0cbf29..a4dcaa4 100644 --- a/src/mitm/store.rs +++ b/src/mitm/store.rs @@ -91,8 +91,10 @@ pub struct MitmStore { // ── Direct response capture (bypasses LS) ──────────────────────────── /// Captured response text from MITM when custom tools are active. - /// The completions handler reads this instead of polling LS steps. + /// The completions/responses handler reads this instead of polling LS steps. captured_response_text: Arc>>, + /// Captured thinking/reasoning text from MITM (for real-time streaming). + captured_thinking_text: Arc>>, /// Whether the captured response is complete (finishReason received). response_complete: Arc, } @@ -134,6 +136,7 @@ impl MitmStore { call_id_to_name: Arc::new(RwLock::new(HashMap::new())), last_function_calls: Arc::new(RwLock::new(Vec::new())), captured_response_text: Arc::new(RwLock::new(None)), + captured_thinking_text: Arc::new(RwLock::new(None)), response_complete: Arc::new(AtomicBool::new(false)), } } @@ -414,5 +417,23 @@ impl MitmStore { pub async fn clear_response_async(&self) { self.response_complete.store(false, Ordering::SeqCst); *self.captured_response_text.write().await = None; + *self.captured_thinking_text.write().await = None; + } + + // ── Thinking text capture ──────────────────────────────────────────── + + /// Set (replace) the captured thinking text. + pub async fn set_thinking_text(&self, text: &str) { + *self.captured_thinking_text.write().await = Some(text.to_string()); + } + + /// Peek at the captured thinking text without consuming it. + pub async fn peek_thinking_text(&self) -> Option { + self.captured_thinking_text.read().await.clone() + } + + /// Take the captured thinking text (consumes it). + pub async fn take_thinking_text(&self) -> Option { + self.captured_thinking_text.write().await.take() } }