feat: sync all endpoints with MITM LS bypass + real-time thinking streaming

- Responses API (streaming): MITM bypass path polls MitmStore directly
  when custom tools are active, skipping LS step polling entirely.
  Streams thinking text deltas in real-time as they arrive from the MITM.
  Handles function calls, text response, and thinking/reasoning events.

- Responses API (sync): Same MITM bypass for non-streaming responses.
  Polls MitmStore for function calls or completed text before falling
  back to LS path.

- Gemini endpoint: MITM bypass polls MitmStore directly for tool call
  responses, eliminating LS overhead.

- MitmStore: Added captured_thinking_text field with set/peek/take methods
  for real-time thinking text capture from MITM SSE.

- MITM proxy: Now captures both thinking_text and response_text from
  StreamingAccumulator into MitmStore when bypass mode is active.
This commit is contained in:
Nikketryhard
2026-02-15 01:03:39 -06:00
parent 50b53097bc
commit b3af73cebd
5 changed files with 564 additions and 14 deletions

View File

@@ -0,0 +1,46 @@
# Sync All Endpoints + Latency + Thinking Streaming
## Phase 1: Sync Responses API (`/v1/responses`) with LS bypass
Current state:
- `handle_responses_stream` (line 529-859) polls LS steps for text
- Doesn't use MitmStore bypass at all
- Still suffers from LS multi-turn overhead when tools are active
Fix:
- Add MITM bypass path (same as completions) — check MitmStore for text + function calls
- For function calls: emit `response.output_item.added` (function_call type) + done events
- For text: stream from MitmStore `captured_response_text` + `response_complete`
## Phase 2: Sync Gemini endpoint (`/v1/gemini`) with LS bypass
Current state:
- `handle_gemini` (line 57-236) uses `poll_for_response` then checks MitmStore
- Already checks `take_any_function_calls()` after polling
- But `poll_for_response` still goes through LS steps
Fix:
- When tools are active, poll MitmStore directly instead of `poll_for_response`
## Phase 3: Latency improvements
- Reduce poll intervals across all handlers
- Add MITM store thinking_text capture for real-time streaming
## Phase 4: Real-time thinking streaming investigation
Current state:
- Google SSE includes `thought: true` parts with thinking text
- `streaming_acc.thinking_text` accumulates this
- Currently only used for final usage stats, not streamed in real-time
Investigation needed:
- The MITM intercept already captures thinking_text per-chunk
- Need to store thinking_text updates in MitmStore incrementally
- Responses handler can then stream thinking deltas in real-time

View File

@@ -183,6 +183,75 @@ pub(crate) async fn handle_gemini(
}
}
let has_custom_tools = state.mitm_store.get_tools().await.is_some();
// Clear stale response
state.mitm_store.clear_response_async().await;
// ── MITM bypass: when tools active, poll MitmStore directly ──
if has_custom_tools {
let start = std::time::Instant::now();
while start.elapsed().as_secs() < body.timeout {
// Check for function calls
let captured = state.mitm_store.take_any_function_calls().await;
if let Some(ref calls) = captured {
if !calls.is_empty() {
let parts: Vec<serde_json::Value> = calls
.iter()
.map(|fc| {
serde_json::json!({
"functionCall": {
"name": fc.name,
"args": fc.args,
}
})
})
.collect();
return Json(serde_json::json!({
"candidates": [{
"content": {
"parts": parts,
"role": "model",
},
"finishReason": "STOP",
}],
"modelVersion": model_name,
}))
.into_response();
}
}
// Check for completed text response
if state.mitm_store.is_response_complete() {
let text = state.mitm_store.take_response_text().await.unwrap_or_default();
return Json(serde_json::json!({
"candidates": [{
"content": {
"parts": [{"text": text}],
"role": "model",
},
"finishReason": "STOP",
}],
"modelVersion": model_name,
}))
.into_response();
}
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
// Timeout
return Json(serde_json::json!({
"error": {
"message": "Request timed out",
"type": "timeout_error",
}
}))
.into_response();
}
// ── Normal LS path (no custom tools) ──
// Poll for response
let poll_result = poll_for_response(&state, &cascade_id, body.timeout).await;

View File

@@ -440,6 +440,103 @@ async fn handle_responses_sync(
params: RequestParams,
) -> axum::response::Response {
let created_at = now_unix();
let has_custom_tools = state.mitm_store.get_tools().await.is_some();
// Clear stale captured response
state.mitm_store.clear_response_async().await;
// ── MITM bypass: poll MitmStore directly when custom tools active ──
if has_custom_tools {
let start = std::time::Instant::now();
while start.elapsed().as_secs() < timeout {
// Check for function calls
let captured = state.mitm_store.take_any_function_calls().await;
if let Some(ref calls) = captured {
if !calls.is_empty() {
let mut output_items: Vec<serde_json::Value> = Vec::new();
for fc in calls {
let call_id = format!(
"call_{}",
uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string()
);
state.mitm_store.register_call_id(call_id.clone(), fc.name.clone()).await;
let arguments = serde_json::to_string(&fc.args).unwrap_or_default();
output_items.push(build_function_call_output(&call_id, &fc.name, &arguments));
}
let (usage, _) = usage_from_poll(
&state.mitm_store, &cascade_id, &None,
&params.user_text, "",
).await;
let resp = build_response_object(
ResponseData {
id: response_id,
model: model_name,
status: "completed",
created_at,
completed_at: Some(now_unix()),
output: output_items,
usage: Some(usage),
thinking_signature: None,
},
&params,
);
return Json(resp).into_response();
}
}
// Check for completed text response
if state.mitm_store.is_response_complete() {
let text = state.mitm_store.take_response_text().await.unwrap_or_default();
let thinking = state.mitm_store.take_thinking_text().await;
let (usage, _) = usage_from_poll(
&state.mitm_store, &cascade_id, &None,
&params.user_text, &text,
).await;
let mut output_items: Vec<serde_json::Value> = Vec::new();
if let Some(ref t) = thinking {
output_items.push(build_reasoning_output(t));
}
let msg_id = format!("msg_{}", uuid::Uuid::new_v4().to_string().replace('-', ""));
output_items.push(build_message_output(&msg_id, &text));
let resp = build_response_object(
ResponseData {
id: response_id,
model: model_name,
status: "completed",
created_at,
completed_at: Some(now_unix()),
output: output_items,
usage: Some(usage),
thinking_signature: None,
},
&params,
);
return Json(resp).into_response();
}
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
// Timeout
let resp = build_response_object(
ResponseData {
id: response_id,
model: model_name,
status: "incomplete",
created_at,
completed_at: None,
output: vec![],
usage: Some(Usage::estimate(&params.user_text, "")),
thinking_signature: None,
},
&params,
);
return Json(resp).into_response();
}
// ── Normal LS path (no custom tools) ──
let poll_result = poll_for_response(&state, &cascade_id, timeout).await;
let completed_at = now_unix();
let msg_id = format!(
@@ -584,9 +681,316 @@ async fn handle_responses_stream(
let mut thinking_text: Option<String> = None;
let mut message_started = false;
let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', ""));
let has_custom_tools = state.mitm_store.get_tools().await.is_some();
// Clear stale captured response
state.mitm_store.clear_response_async().await;
// ── MITM bypass mode (when custom tools are active) ──
// Skip LS entirely — read text, thinking, and tool calls directly from MitmStore.
if has_custom_tools {
let mut last_thinking = String::new();
while start.elapsed().as_secs() < timeout {
// Check for function calls first
let captured = state.mitm_store.take_any_function_calls().await;
if let Some(ref calls) = captured {
if !calls.is_empty() {
let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 };
for (i, fc) in calls.iter().enumerate() {
let call_id = format!(
"call_{}",
uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string()
);
let arguments = serde_json::to_string(&fc.args).unwrap_or_default();
state.mitm_store.register_call_id(call_id.clone(), fc.name.clone()).await;
let fc_item_id = format!("fc_{}", uuid::Uuid::new_v4().to_string().replace('-', ""));
yield Ok(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": msg_output_index as usize + i,
"item": {
"id": &fc_item_id,
"type": "function_call",
"call_id": &call_id,
"name": &fc.name,
"arguments": &arguments,
"status": "completed",
},
}),
));
yield Ok(responses_sse_event(
"response.output_item.done",
serde_json::json!({
"type": "response.output_item.done",
"sequence_number": next_seq(),
"output_index": msg_output_index as usize + i,
"item": {
"id": &fc_item_id,
"type": "function_call",
"call_id": &call_id,
"name": &fc.name,
"arguments": &arguments,
"status": "completed",
},
}),
));
}
// Build output for final response
let mut output_items: Vec<serde_json::Value> = Vec::new();
for fc in calls {
let call_id = format!(
"call_{}",
uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string()
);
let arguments = serde_json::to_string(&fc.args).unwrap_or_default();
output_items.push(build_function_call_output(&call_id, &fc.name, &arguments));
}
let (usage, _) = usage_from_poll(
&state.mitm_store, &cascade_id, &None,
&params.user_text, "",
).await;
let final_resp = build_response_object(
ResponseData {
id: response_id.clone(),
model: model_name.clone(),
status: "completed",
created_at,
completed_at: Some(now_unix()),
output: output_items,
usage: Some(usage),
thinking_signature: None,
},
&params,
);
yield Ok(responses_sse_event(
"response.completed",
serde_json::json!({
"type": "response.completed",
"sequence_number": next_seq(),
"response": response_to_json(&final_resp),
}),
));
return;
}
}
// Stream thinking text in real-time
if !thinking_emitted {
if let Some(thinking) = state.mitm_store.peek_thinking_text().await {
if !thinking.is_empty() && thinking != last_thinking {
// First thinking text — emit reasoning output_item.added
if last_thinking.is_empty() {
yield Ok(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": 0,
"item": {
"id": &reasoning_id,
"type": "reasoning",
"summary": [],
},
}),
));
yield Ok(responses_sse_event(
"response.reasoning_summary_part.added",
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"part": { "type": "summary_text", "text": "" },
}),
));
}
// Delta of new thinking text
let delta = if thinking.len() > last_thinking.len()
&& thinking.starts_with(&*last_thinking)
{
thinking[last_thinking.len()..].to_string()
} else {
thinking.clone()
};
if !delta.is_empty() {
yield Ok(responses_sse_event(
"response.reasoning_summary_text.delta",
serde_json::json!({
"type": "response.reasoning_summary_text.delta",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"delta": &delta,
}),
));
}
last_thinking = thinking;
}
}
}
// Stream response text
if let Some(text) = state.mitm_store.peek_response_text().await {
if !text.is_empty() && text != last_text {
// Finalize thinking if started but not done
if !thinking_emitted && !last_thinking.is_empty() {
thinking_emitted = true;
thinking_text = Some(last_thinking.clone());
yield Ok(responses_sse_event(
"response.reasoning_summary_text.done",
serde_json::json!({
"type": "response.reasoning_summary_text.done",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"text": &last_thinking,
}),
));
yield Ok(responses_sse_event(
"response.reasoning_summary_part.done",
serde_json::json!({
"type": "response.reasoning_summary_part.done",
"sequence_number": next_seq(),
"item_id": &reasoning_id,
"output_index": 0,
"summary_index": 0,
"part": { "type": "summary_text", "text": &last_thinking },
}),
));
yield Ok(responses_sse_event(
"response.output_item.done",
serde_json::json!({
"type": "response.output_item.done",
"sequence_number": next_seq(),
"output_index": 0,
"item": {
"id": &reasoning_id,
"type": "reasoning",
"summary": [{
"type": "summary_text",
"text": &last_thinking,
}],
},
}),
));
}
let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 };
if !message_started {
message_started = true;
yield Ok(responses_sse_event(
"response.output_item.added",
serde_json::json!({
"type": "response.output_item.added",
"sequence_number": next_seq(),
"output_index": msg_output_index,
"item": build_message_output_in_progress(&msg_id),
}),
));
yield Ok(responses_sse_event(
"response.content_part.added",
serde_json::json!({
"type": "response.content_part.added",
"sequence_number": next_seq(),
"output_index": msg_output_index,
"content_index": CONTENT_IDX,
"part": {
"type": "output_text",
"text": "",
"annotations": [],
}
}),
));
}
let new_content = if text.len() > last_text.len()
&& text.starts_with(&*last_text)
{
text[last_text.len()..].to_string()
} else {
text.clone()
};
if !new_content.is_empty() {
yield Ok(responses_sse_event(
"response.output_text.delta",
serde_json::json!({
"type": "response.output_text.delta",
"sequence_number": next_seq(),
"item_id": &msg_id,
"output_index": msg_output_index,
"content_index": CONTENT_IDX,
"delta": &new_content,
}),
));
last_text = text;
}
}
// Check if response is complete
if state.mitm_store.is_response_complete() && !last_text.is_empty() {
let msg_idx: u32 = if thinking_emitted { 1 } else { 0 };
let (usage, _) = usage_from_poll(
&state.mitm_store, &cascade_id, &None,
&params.user_text, &last_text,
).await;
let tc = thinking_text.clone();
for evt in completion_events(
&response_id, &model_name, &msg_id, &reasoning_id,
msg_idx, CONTENT_IDX, &last_text, usage,
created_at, &seq, &params, None, tc,
) {
yield Ok(evt);
}
return;
}
}
// Poll interval
let poll_ms: u64 = rand::thread_rng().gen_range(150..300);
tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await;
}
// Timeout in bypass mode
let timeout_resp = build_response_object(
ResponseData {
id: response_id.clone(),
model: model_name.clone(),
status: "incomplete",
created_at,
completed_at: None,
output: vec![],
usage: Some(Usage::estimate(&params.user_text, "")),
thinking_signature: None,
},
&params,
);
yield Ok(responses_sse_event(
"response.completed",
serde_json::json!({
"type": "response.completed",
"sequence_number": next_seq(),
"response": response_to_json(&timeout_resp),
}),
));
return;
}
// ── Normal LS path (no custom tools) ──
// Try to open a reactive streaming connection for real-time notifications.
// Falls back to timer-based polling if the streaming RPC is unavailable.
let mut reactive_rx = match state.backend.stream_cascade_updates(&cascade_id).await {
Ok(rx) => {
debug!("Using reactive streaming for cascade updates");

View File

@@ -753,12 +753,17 @@ async fn handle_http_over_tls(
info!("MITM: stored {} function call(s) from initial body", streaming_acc.function_calls.len());
}
// Capture response text directly into MitmStore
if bypass_ls && !streaming_acc.response_text.is_empty() {
store.set_response_text(&streaming_acc.response_text).await;
}
if bypass_ls && streaming_acc.is_complete {
store.mark_response_complete();
// Capture response + thinking text directly into MitmStore
if bypass_ls {
if !streaming_acc.response_text.is_empty() {
store.set_response_text(&streaming_acc.response_text).await;
}
if !streaming_acc.thinking_text.is_empty() {
store.set_thinking_text(&streaming_acc.thinking_text).await;
}
if streaming_acc.is_complete {
store.mark_response_complete();
}
}
}
@@ -820,12 +825,17 @@ async fn handle_http_over_tls(
info!("MITM: stored {} function call(s) from body chunk", streaming_acc.function_calls.len());
}
// Capture response text directly into MitmStore
if bypass_ls && !streaming_acc.response_text.is_empty() {
store.set_response_text(&streaming_acc.response_text).await;
}
if bypass_ls && streaming_acc.is_complete {
store.mark_response_complete();
// Capture response + thinking text directly into MitmStore
if bypass_ls {
if !streaming_acc.response_text.is_empty() {
store.set_response_text(&streaming_acc.response_text).await;
}
if !streaming_acc.thinking_text.is_empty() {
store.set_thinking_text(&streaming_acc.thinking_text).await;
}
if streaming_acc.is_complete {
store.mark_response_complete();
}
}
}

View File

@@ -91,8 +91,10 @@ pub struct MitmStore {
// ── Direct response capture (bypasses LS) ────────────────────────────
/// Captured response text from MITM when custom tools are active.
/// The completions handler reads this instead of polling LS steps.
/// The completions/responses handler reads this instead of polling LS steps.
captured_response_text: Arc<RwLock<Option<String>>>,
/// Captured thinking/reasoning text from MITM (for real-time streaming).
captured_thinking_text: Arc<RwLock<Option<String>>>,
/// Whether the captured response is complete (finishReason received).
response_complete: Arc<AtomicBool>,
}
@@ -134,6 +136,7 @@ impl MitmStore {
call_id_to_name: Arc::new(RwLock::new(HashMap::new())),
last_function_calls: Arc::new(RwLock::new(Vec::new())),
captured_response_text: Arc::new(RwLock::new(None)),
captured_thinking_text: Arc::new(RwLock::new(None)),
response_complete: Arc::new(AtomicBool::new(false)),
}
}
@@ -414,5 +417,23 @@ impl MitmStore {
pub async fn clear_response_async(&self) {
self.response_complete.store(false, Ordering::SeqCst);
*self.captured_response_text.write().await = None;
*self.captured_thinking_text.write().await = None;
}
// ── Thinking text capture ────────────────────────────────────────────
/// Set (replace) the captured thinking text.
pub async fn set_thinking_text(&self, text: &str) {
*self.captured_thinking_text.write().await = Some(text.to_string());
}
/// Peek at the captured thinking text without consuming it.
pub async fn peek_thinking_text(&self) -> Option<String> {
self.captured_thinking_text.read().await.clone()
}
/// Take the captured thinking text (consumes it).
pub async fn take_thinking_text(&self) -> Option<String> {
self.captured_thinking_text.write().await.take()
}
}