fix: store function calls in MitmStore immediately on detection
Previously, captured function calls were only stored in MitmStore after the response loop ended. The completions handler polls take_any_function_calls() during streaming, creating a race condition where the MitmStore was empty. Now function calls are stored immediately when parse_streaming_chunk detects them, in both the initial body and body chunk paths.
This commit is contained in:
@@ -740,13 +740,18 @@ async fn handle_http_over_tls(
|
|||||||
if is_streaming_response && hdr_end < header_buf.len() {
|
if is_streaming_response && hdr_end < header_buf.len() {
|
||||||
let body = String::from_utf8_lossy(&header_buf[hdr_end..]);
|
let body = String::from_utf8_lossy(&header_buf[hdr_end..]);
|
||||||
parse_streaming_chunk(&body, &mut streaming_acc);
|
parse_streaming_chunk(&body, &mut streaming_acc);
|
||||||
has_function_call = body.contains("functionCall");
|
has_function_call = !streaming_acc.function_calls.is_empty();
|
||||||
|
|
||||||
|
// Immediately store captured function calls
|
||||||
|
if has_function_call {
|
||||||
|
for fc in &streaming_acc.function_calls {
|
||||||
|
store.record_function_call(cascade_hint.as_deref(), fc.clone()).await;
|
||||||
|
}
|
||||||
|
store.set_last_function_calls(streaming_acc.function_calls.clone()).await;
|
||||||
|
info!("MITM: stored {} function call(s) from initial body", streaming_acc.function_calls.len());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we detected a functionCall AND custom tools are active,
|
|
||||||
// forge a dummy "STOP" response for the LS so it doesn't
|
|
||||||
// freak out and retry. The real function call data is already
|
|
||||||
// captured in MitmStore.
|
|
||||||
if has_function_call && modify_requests && store.get_tools().await.is_some() {
|
if has_function_call && modify_requests && store.get_tools().await.is_some() {
|
||||||
info!("MITM: functionCall detected → sending dummy STOP response to LS");
|
info!("MITM: functionCall detected → sending dummy STOP response to LS");
|
||||||
|
|
||||||
@@ -804,13 +809,21 @@ async fn handle_http_over_tls(
|
|||||||
if is_streaming_response {
|
if is_streaming_response {
|
||||||
let s = String::from_utf8_lossy(chunk);
|
let s = String::from_utf8_lossy(chunk);
|
||||||
parse_streaming_chunk(&s, &mut streaming_acc);
|
parse_streaming_chunk(&s, &mut streaming_acc);
|
||||||
chunk_has_fc = s.contains("functionCall");
|
chunk_has_fc = !streaming_acc.function_calls.is_empty();
|
||||||
|
|
||||||
|
// Immediately store captured function calls — don't wait for loop end
|
||||||
|
if chunk_has_fc {
|
||||||
|
for fc in &streaming_acc.function_calls {
|
||||||
|
store.record_function_call(cascade_hint.as_deref(), fc.clone()).await;
|
||||||
|
}
|
||||||
|
store.set_last_function_calls(streaming_acc.function_calls.clone()).await;
|
||||||
|
info!("MITM: stored {} function call(s) from body chunk", streaming_acc.function_calls.len());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If functionCall in body chunk + custom tools → send dummy + stop
|
// If functionCall detected + custom tools → send dummy + stop
|
||||||
if chunk_has_fc && modify_requests && store.get_tools().await.is_some() {
|
if chunk_has_fc && modify_requests && store.get_tools().await.is_some() {
|
||||||
info!("MITM: functionCall in body chunk → sending chunked terminator to LS");
|
info!("MITM: functionCall in body chunk → sending chunked terminator to LS");
|
||||||
// Send the chunked terminator to end the stream
|
|
||||||
let _ = client.write_all(b"0\r\n\r\n").await;
|
let _ = client.write_all(b"0\r\n\r\n").await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user