fix: add retry logic for MITM thinking text merge race condition
The LS makes two Google API calls for thinking models. Call 2 (thinking summary) may not have arrived by the time usage_from_poll runs after Call 1 (response). Now we peek first, and if thinking tokens exist but text is missing, wait up to 1s for the merge to happen. Also adds peek_usage method to MitmStore for non-consuming reads.
This commit is contained in:
@@ -276,11 +276,33 @@ async fn usage_from_poll(
|
|||||||
output_text: &str,
|
output_text: &str,
|
||||||
) -> (Usage, Option<String>) {
|
) -> (Usage, Option<String>) {
|
||||||
// Priority 1: MITM intercepted data (most accurate — includes cache tokens + thinking text)
|
// Priority 1: MITM intercepted data (most accurate — includes cache tokens + thinking text)
|
||||||
// Try exact cascade_id match first, then fall back to "_latest" (unmatched)
|
// Try exact cascade_id match first, then fall back to "_latest" (unmatched).
|
||||||
let mitm_usage = match mitm_store.take_usage(cascade_id).await {
|
//
|
||||||
Some(u) => Some(u),
|
// Race condition: The LS makes TWO Google API calls for thinking models:
|
||||||
None => mitm_store.take_usage("_latest").await,
|
// Call 1: response + thinking token count (recorded first)
|
||||||
};
|
// Call 2: thinking summary text (merged into Call 1 by the store)
|
||||||
|
// We may read the usage after Call 1 but before Call 2 arrives.
|
||||||
|
// If we see thinking tokens but no text, wait briefly for the merge.
|
||||||
|
let keys_to_try: Vec<&str> = vec![cascade_id, "_latest"];
|
||||||
|
let mut mitm_usage = None;
|
||||||
|
for key in &keys_to_try {
|
||||||
|
if let Some(u) = mitm_store.peek_usage(key).await {
|
||||||
|
if u.thinking_output_tokens > 0 && u.thinking_text.is_none() {
|
||||||
|
// Call 2 hasn't arrived yet — wait briefly for the merge
|
||||||
|
tracing::debug!("MITM: thinking tokens found but no text, waiting for summary merge...");
|
||||||
|
for _ in 0..10 {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
if let Some(u2) = mitm_store.peek_usage(key).await {
|
||||||
|
if u2.thinking_text.is_some() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mitm_usage = mitm_store.take_usage(key).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
if let Some(mitm_usage) = mitm_usage {
|
if let Some(mitm_usage) = mitm_usage {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
input = mitm_usage.input_tokens,
|
input = mitm_usage.input_tokens,
|
||||||
|
|||||||
@@ -172,6 +172,13 @@ impl MitmStore {
|
|||||||
|
|
||||||
/// Get the latest usage for a cascade, consuming it (one-shot read).
|
/// Get the latest usage for a cascade, consuming it (one-shot read).
|
||||||
///
|
///
|
||||||
|
/// Peek at usage data for a cascade without consuming it.
|
||||||
|
/// Used to check if thinking text has been merged before taking.
|
||||||
|
pub async fn peek_usage(&self, cascade_id: &str) -> Option<ApiUsage> {
|
||||||
|
let latest = self.latest_usage.read().await;
|
||||||
|
latest.get(cascade_id).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
/// Only returns exact cascade_id matches — no cross-cascade fallback.
|
/// Only returns exact cascade_id matches — no cross-cascade fallback.
|
||||||
/// The `_latest` key is only consumed when the caller explicitly requests it
|
/// The `_latest` key is only consumed when the caller explicitly requests it
|
||||||
/// (i.e., when the MITM couldn't identify the cascade).
|
/// (i.e., when the MITM couldn't identify the cascade).
|
||||||
|
|||||||
Reference in New Issue
Block a user