fix: block ALL LS follow-up requests, deduplicate function calls

- Add request_in_flight flag to MitmStore, set immediately when first
  LLM request is forwarded with custom tools active
- Block ALL subsequent LS requests (agentic loop + internal flash-lite)
  with fake SSE responses instead of waiting for response_complete
- Fix function call deduplication: drain() accumulator after storing
  to prevent 3x duplicate tool calls across SSE chunks
- Clear all stale state (response, thinking, function calls, errors)
  at the start of each streaming request
- Handle response_complete with no content (thoughtSignature-only)
  gracefully with timeout instead of infinite hang
This commit is contained in:
Nikketryhard
2026-02-16 00:51:56 -06:00
parent 5f40385c8d
commit a8f3c8915f
6 changed files with 419 additions and 326 deletions

View File

@@ -488,9 +488,12 @@ async fn chat_completions_stream(
let mut last_text = String::new();
let has_custom_tools = state.mitm_store.get_tools().await.is_some();
// Clear any stale captured response and upstream errors from previous requests
// Clear ALL stale state from previous requests
state.mitm_store.clear_response_async().await;
state.mitm_store.clear_upstream_error().await;
state.mitm_store.clear_active_function_call();
// Drain any stale function calls from previous requests
let _ = state.mitm_store.take_any_function_calls().await;
// Initial role chunk
yield Ok::<_, std::convert::Infallible>(Event::default().data(chunk_json(
@@ -501,6 +504,7 @@ async fn chat_completions_stream(
let mut keepalive_counter: u64 = 0;
let mut last_thinking_len: usize = 0;
let mut complete_polls: u32 = 0;
// Helper: build usage JSON from MITM tokens
let build_usage = |pt: u64, ct: u64, crt: u64, tt: u64| -> serde_json::Value {
@@ -586,19 +590,13 @@ async fn chat_completions_stream(
}
}
// ── Check for MITM-captured response text (bypass LS) ──
// ── Primary: MITM-captured response (when custom tools are active) ──
// The MITM intercepts the real Google SSE stream and captures text,
// thinking, and function calls. This is the authoritative data source.
// The LS only gets rewritten responses (function calls → text placeholders)
// so it doesn't provide useful streaming data when MITM is active.
if has_custom_tools {
let peek = state.mitm_store.peek_response_text().await;
let complete = state.mitm_store.is_response_complete();
let has_fc = state.mitm_store.has_active_function_call();
if keepalive_counter % 10 == 0 || peek.is_some() || complete || has_fc {
debug!(
"Completions bypass poll: peek={}, complete={}, has_fc={}, last_text_len={}",
peek.as_ref().map(|t| t.len()).unwrap_or(0),
complete, has_fc, last_text.len()
);
}
// Stream thinking text as reasoning_content deltas (MITM bypass)
// Stream thinking text as reasoning_content deltas
if let Some(tc) = state.mitm_store.peek_thinking_text().await {
if tc.len() > last_thinking_len {
let delta = &tc[last_thinking_len..];
@@ -612,7 +610,8 @@ async fn chat_completions_stream(
}
}
if let Some(text) = peek {
// Stream response text as content deltas
if let Some(text) = state.mitm_store.peek_response_text().await {
if !text.is_empty() && text != last_text {
let delta = if text.len() > last_text.len() && text.starts_with(&*last_text) {
text[last_text.len()..].to_string()
@@ -629,139 +628,18 @@ async fn chat_completions_stream(
last_text = text;
}
}
// Check if MITM response is complete
if state.mitm_store.is_response_complete() && !last_text.is_empty() {
debug!("Completions: MITM response complete (bypass), text length={}", last_text.len());
// Take usage FIRST so we can read stop_reason for finish_reason
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let fr = google_to_openai_finish_reason(mitm.as_ref().and_then(|u| u.stop_reason.as_deref()));
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some(fr))]),
None,
)));
if include_usage {
let (pt, ct, crt, tt) = if let Some(ref u) = mitm {
(u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens)
} else { (0, 0, 0, 0) };
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([]),
Some(build_usage(pt, ct, crt, tt)),
)));
}
yield Ok(Event::default().data("[DONE]"));
return;
}
} else if complete {
// Response complete but no text — might be a tool call arriving shortly,
// stale state from a previous request, or an empty response.
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
// Re-check function calls one more time
let final_check = state.mitm_store.take_any_function_calls().await;
if let Some(ref calls) = final_check {
if !calls.is_empty() {
let mut tool_calls = Vec::new();
for (i, fc) in calls.iter().enumerate() {
let call_id = format!(
"call_{}",
uuid::Uuid::new_v4().to_string().replace('-', "")[..24].to_string()
);
let arguments = serde_json::to_string(&fc.args).unwrap_or_default();
tool_calls.push(serde_json::json!({
"index": i,
"id": call_id,
"type": "function",
"function": {
"name": fc.name,
"arguments": arguments,
},
}));
}
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({"tool_calls": tool_calls}), None)]),
None,
)));
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some("tool_calls"))]),
None,
)));
if include_usage {
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let (pt, ct, crt, tt) = if let Some(ref u) = mitm {
(u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens)
} else { (0, 0, 0, 0) };
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([]),
Some(build_usage(pt, ct, crt, tt)),
)));
}
yield Ok(Event::default().data("[DONE]"));
return;
}
}
// No text and no function calls but complete=true: stale state.
// Clear the flag so we wait for the real response from this request.
warn!("Completions: stale response_complete detected (no text, no FC) — clearing");
state.mitm_store.clear_response_async().await;
}
// When using bypass mode, skip LS step polling
keepalive_counter += 1;
if keepalive_counter % 10 == 0 {
yield Ok(Event::default().comment("keepalive"));
}
let poll_ms: u64 = rand::thread_rng().gen_range(200..350);
tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await;
continue;
}
// ── Check LS steps for text streaming ──
if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await {
if status == 200 {
if let Some(steps) = data["steps"].as_array() {
// Stream thinking deltas (reasoning_content)
if let Some(tc) = extract_thinking_content(steps) {
if tc.len() > last_thinking_len {
let delta = &tc[last_thinking_len..];
last_thinking_len = tc.len();
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({"reasoning_content": delta}), None)]),
None,
)));
}
}
let text = extract_response_text(steps);
if !text.is_empty() && text != last_text {
let delta = if text.len() > last_text.len() && text.starts_with(&*last_text) {
&text[last_text.len()..]
} else {
&text
};
if !delta.is_empty() {
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({"content": delta}), None)]),
None,
)));
last_text = text.to_string();
}
}
// Done check: need DONE status AND non-empty text
if is_response_done(steps) && !last_text.is_empty() {
debug!("Completions stream done, text length={}", last_text.len());
// Check if MITM response is complete
// Must have ACTUAL content (response text or function calls) — not just thinking.
// The LS makes multiple API calls and response_complete flips on each one,
// so we wait for it to be stable across 2+ polls with real content.
if state.mitm_store.is_response_complete() {
if !last_text.is_empty() {
// Have actual response text — done
complete_polls += 1;
if complete_polls >= 2 {
debug!("Completions: MITM response complete, text_len={}, thinking_len={}", last_text.len(), last_thinking_len);
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let fr = google_to_openai_finish_reason(mitm.as_ref().and_then(|u| u.stop_reason.as_deref()));
@@ -783,35 +661,147 @@ async fn chat_completions_stream(
yield Ok(Event::default().data("[DONE]"));
return;
}
} else if last_thinking_len > 0 {
// Only thinking so far — wait for actual text/tools to arrive
// The LS may still be processing and will make follow-up API calls
complete_polls += 1;
if complete_polls >= 6 {
// Waited ~2s with no text/tools after complete — emit what we have
debug!("Completions: MITM thinking-only timeout, thinking_len={}", last_thinking_len);
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let fr = google_to_openai_finish_reason(mitm.as_ref().and_then(|u| u.stop_reason.as_deref()));
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some(fr))]),
None,
)));
if include_usage {
let (pt, ct, crt, tt) = if let Some(ref u) = mitm {
(u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens)
} else { (0, 0, 0, 0) };
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([]),
Some(build_usage(pt, ct, crt, tt)),
)));
}
yield Ok(Event::default().data("[DONE]"));
return;
}
} else {
// response_complete but no text AND no thinking — might be
// a function-call-only response that was already consumed,
// or empty response. Wait a bit then give up.
complete_polls += 1;
if complete_polls >= 4 {
info!("Completions: MITM response complete but no content (text/thinking/tools all empty), ending stream");
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some("stop"))]),
None,
)));
yield Ok(Event::default().data("[DONE]"));
return;
}
}
} else {
complete_polls = 0; // Reset — not complete yet
}
} else {
// ── Fallback: LS steps (no MITM capture active) ──
if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await {
if status == 200 {
if let Some(steps) = data["steps"].as_array() {
// Stream thinking deltas (reasoning_content)
if let Some(tc) = extract_thinking_content(steps) {
if tc.len() > last_thinking_len {
let delta = &tc[last_thinking_len..];
last_thinking_len = tc.len();
// IDLE fallback
let step_count = steps.len();
if step_count > 4 && step_count % 5 == 0 {
if let Ok((ts, td)) = state.backend.get_trajectory(&cascade_id).await {
if ts == 200 {
let run_status = td["status"].as_str().unwrap_or("");
if run_status.contains("IDLE") && !last_text.is_empty() {
debug!("Completions IDLE, text length={}", last_text.len());
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let fr = google_to_openai_finish_reason(mitm.as_ref().and_then(|u| u.stop_reason.as_deref()));
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some(fr))]),
None,
)));
if include_usage {
let (pt, ct, crt, tt) = if let Some(ref u) = mitm {
(u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens)
} else { (0, 0, 0, 0) };
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({"reasoning_content": delta}), None)]),
None,
)));
}
}
let text = extract_response_text(steps);
if !text.is_empty() && text != last_text {
let delta = if text.len() > last_text.len() && text.starts_with(&*last_text) {
&text[last_text.len()..]
} else {
&text
};
if !delta.is_empty() {
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({"content": delta}), None)]),
None,
)));
last_text = text.to_string();
}
}
// Done check
let has_content = !last_text.is_empty() || last_thinking_len > 0;
if is_response_done(steps) && has_content {
debug!("Completions stream done, text length={}, thinking_len={}", last_text.len(), last_thinking_len);
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let fr = google_to_openai_finish_reason(mitm.as_ref().and_then(|u| u.stop_reason.as_deref()));
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some(fr))]),
None,
)));
if include_usage {
let (pt, ct, crt, tt) = if let Some(ref u) = mitm {
(u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens)
} else { (0, 0, 0, 0) };
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([]),
Some(build_usage(pt, ct, crt, tt)),
)));
}
yield Ok(Event::default().data("[DONE]"));
return;
}
// IDLE fallback
let step_count = steps.len();
if step_count > 4 && step_count % 5 == 0 {
if let Ok((ts, td)) = state.backend.get_trajectory(&cascade_id).await {
if ts == 200 {
let run_status = td["status"].as_str().unwrap_or("");
let has_content_idle = !last_text.is_empty() || last_thinking_len > 0;
if run_status.contains("IDLE") && has_content_idle {
debug!("Completions IDLE, text length={}, thinking_len={}", last_text.len(), last_thinking_len);
let mitm = state.mitm_store.take_usage(&cascade_id).await
.or(state.mitm_store.take_usage("_latest").await);
let fr = google_to_openai_finish_reason(mitm.as_ref().and_then(|u| u.stop_reason.as_deref()));
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([]),
Some(build_usage(pt, ct, crt, tt)),
serde_json::json!([chunk_choice(0, serde_json::json!({}), Some(fr))]),
None,
)));
if include_usage {
let (pt, ct, crt, tt) = if let Some(ref u) = mitm {
(u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens)
} else { (0, 0, 0, 0) };
yield Ok(Event::default().data(chunk_json(
&completion_id, &model_name,
serde_json::json!([]),
Some(build_usage(pt, ct, crt, tt)),
)));
}
yield Ok(Event::default().data("[DONE]"));
return;
}
yield Ok(Event::default().data("[DONE]"));
return;
}
}
}