feat: capture thinking text via MITM dual-call merge
The LS makes TWO separate Google API calls for thinking models: Call 1: response + thinking token count (no thinking text) Call 2: thinking summary text (no thinking tokens) Each hits a different StreamingAccumulator, so we: 1. Capture response_text in StreamingAccumulator (non-thinking parts) 2. In MitmStore::record_usage, detect when Call 2 arrives for a cascade that already has thinking tokens from Call 1 3. Merge Call 2's response_text as thinking_text on Call 1's usage Also injects includeThoughts into Google API requests via MITM modify to ensure thinking text is available in SSE responses.
This commit is contained in:
@@ -59,6 +59,9 @@ pub struct StreamingAccumulator {
|
|||||||
pub thinking_tokens: u64,
|
pub thinking_tokens: u64,
|
||||||
/// Accumulated thinking/reasoning text from the model.
|
/// Accumulated thinking/reasoning text from the model.
|
||||||
pub thinking_text: String,
|
pub thinking_text: String,
|
||||||
|
/// Accumulated response text (non-thinking parts).
|
||||||
|
/// Used to identify "thinking summary" calls in the v1internal API.
|
||||||
|
pub response_text: String,
|
||||||
pub model: Option<String>,
|
pub model: Option<String>,
|
||||||
pub stop_reason: Option<String>,
|
pub stop_reason: Option<String>,
|
||||||
pub is_complete: bool,
|
pub is_complete: bool,
|
||||||
@@ -83,16 +86,24 @@ impl StreamingAccumulator {
|
|||||||
if let Some(model) = response["modelVersion"].as_str() {
|
if let Some(model) = response["modelVersion"].as_str() {
|
||||||
self.model = Some(model.to_string());
|
self.model = Some(model.to_string());
|
||||||
}
|
}
|
||||||
// Extract thinking text from parts with thought: true
|
|
||||||
if let Some(candidates) = response.get("candidates").and_then(|c| c.as_array()) {
|
if let Some(candidates) = response.get("candidates").and_then(|c| c.as_array()) {
|
||||||
for candidate in candidates {
|
for candidate in candidates {
|
||||||
if let Some(parts) = candidate["content"]["parts"].as_array() {
|
if let Some(parts) = candidate["content"]["parts"].as_array() {
|
||||||
for part in parts {
|
for part in parts {
|
||||||
|
// Public Gemini API: explicit thought flag
|
||||||
if part["thought"].as_bool() == Some(true) {
|
if part["thought"].as_bool() == Some(true) {
|
||||||
if let Some(text) = part["text"].as_str() {
|
if let Some(text) = part["text"].as_str() {
|
||||||
self.thinking_text.push_str(text);
|
self.thinking_text.push_str(text);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Capture non-thinking response text (skip thoughtSignature parts)
|
||||||
|
else if part.get("thoughtSignature").is_none() {
|
||||||
|
if let Some(text) = part["text"].as_str() {
|
||||||
|
if !text.is_empty() {
|
||||||
|
self.response_text.push_str(text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Check for completion
|
// Check for completion
|
||||||
@@ -172,6 +183,11 @@ impl StreamingAccumulator {
|
|||||||
} else {
|
} else {
|
||||||
Some(self.thinking_text)
|
Some(self.thinking_text)
|
||||||
};
|
};
|
||||||
|
let response_text = if self.response_text.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(self.response_text)
|
||||||
|
};
|
||||||
ApiUsage {
|
ApiUsage {
|
||||||
input_tokens: self.input_tokens,
|
input_tokens: self.input_tokens,
|
||||||
output_tokens: self.output_tokens,
|
output_tokens: self.output_tokens,
|
||||||
@@ -179,6 +195,7 @@ impl StreamingAccumulator {
|
|||||||
cache_read_input_tokens: self.cache_read_input_tokens,
|
cache_read_input_tokens: self.cache_read_input_tokens,
|
||||||
thinking_output_tokens: self.thinking_tokens,
|
thinking_output_tokens: self.thinking_tokens,
|
||||||
thinking_text,
|
thinking_text,
|
||||||
|
response_text,
|
||||||
response_output_tokens: 0,
|
response_output_tokens: 0,
|
||||||
model: self.model,
|
model: self.model,
|
||||||
stop_reason: self.stop_reason,
|
stop_reason: self.stop_reason,
|
||||||
@@ -203,6 +220,7 @@ fn extract_usage_from_message(msg: &Value) -> Option<ApiUsage> {
|
|||||||
cache_read_input_tokens: usage["cache_read_input_tokens"].as_u64().unwrap_or(0),
|
cache_read_input_tokens: usage["cache_read_input_tokens"].as_u64().unwrap_or(0),
|
||||||
thinking_output_tokens: 0,
|
thinking_output_tokens: 0,
|
||||||
thinking_text: None,
|
thinking_text: None,
|
||||||
|
response_text: None,
|
||||||
response_output_tokens: 0,
|
response_output_tokens: 0,
|
||||||
model: msg["model"].as_str().map(|s| s.to_string()),
|
model: msg["model"].as_str().map(|s| s.to_string()),
|
||||||
stop_reason: msg["stop_reason"].as_str().map(|s| s.to_string()),
|
stop_reason: msg["stop_reason"].as_str().map(|s| s.to_string()),
|
||||||
|
|||||||
@@ -152,6 +152,47 @@ pub fn modify_request(body: &[u8]) -> Option<Vec<u8>> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── 4. Inject includeThoughts to capture thinking text ───────────────
|
||||||
|
// Without this flag, Google only reports thinking token counts
|
||||||
|
// but doesn't send the thinking text in SSE parts.
|
||||||
|
{
|
||||||
|
// Ensure request.generationConfig.thinkingConfig.includeThoughts = true
|
||||||
|
let request = json.get_mut("request").and_then(|v| v.as_object_mut());
|
||||||
|
if let Some(req) = request {
|
||||||
|
let gen_config = req
|
||||||
|
.entry("generationConfig")
|
||||||
|
.or_insert_with(|| serde_json::json!({}));
|
||||||
|
if let Some(gc) = gen_config.as_object_mut() {
|
||||||
|
let thinking_config = gc
|
||||||
|
.entry("thinkingConfig")
|
||||||
|
.or_insert_with(|| serde_json::json!({}));
|
||||||
|
if let Some(tc) = thinking_config.as_object_mut() {
|
||||||
|
if !tc.contains_key("includeThoughts") {
|
||||||
|
tc.insert("includeThoughts".to_string(), Value::Bool(true));
|
||||||
|
changes.push("inject includeThoughts".to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Not wrapped in request — try top-level (public API format)
|
||||||
|
let gen_config = json.as_object_mut().and_then(|o| {
|
||||||
|
Some(o.entry("generationConfig")
|
||||||
|
.or_insert_with(|| serde_json::json!({})))
|
||||||
|
});
|
||||||
|
if let Some(gc) = gen_config.and_then(|v| v.as_object_mut()) {
|
||||||
|
let thinking_config = gc
|
||||||
|
.entry("thinkingConfig")
|
||||||
|
.or_insert_with(|| serde_json::json!({}));
|
||||||
|
if let Some(tc) = thinking_config.as_object_mut() {
|
||||||
|
if !tc.contains_key("includeThoughts") {
|
||||||
|
tc.insert("includeThoughts".to_string(), Value::Bool(true));
|
||||||
|
changes.push("inject includeThoughts (top-level)".to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if changes.is_empty() {
|
if changes.is_empty() {
|
||||||
return None; // Nothing modified
|
return None; // Nothing modified
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,6 +80,7 @@ impl GrpcUsage {
|
|||||||
output_tokens: self.output_tokens,
|
output_tokens: self.output_tokens,
|
||||||
thinking_output_tokens: self.thinking_output_tokens,
|
thinking_output_tokens: self.thinking_output_tokens,
|
||||||
thinking_text: None, // gRPC proto doesn't carry thinking text
|
thinking_text: None, // gRPC proto doesn't carry thinking text
|
||||||
|
response_text: None,
|
||||||
response_output_tokens: self.response_output_tokens,
|
response_output_tokens: self.response_output_tokens,
|
||||||
cache_creation_input_tokens: self.cache_write_tokens,
|
cache_creation_input_tokens: self.cache_write_tokens,
|
||||||
cache_read_input_tokens: self.cache_read_tokens,
|
cache_read_input_tokens: self.cache_read_tokens,
|
||||||
|
|||||||
@@ -26,6 +26,9 @@ pub struct ApiUsage {
|
|||||||
/// Captured from Google SSE parts with `thought: true` or Anthropic thinking blocks.
|
/// Captured from Google SSE parts with `thought: true` or Anthropic thinking blocks.
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub thinking_text: Option<String>,
|
pub thinking_text: Option<String>,
|
||||||
|
/// The response text captured from SSE parts (for merge detection).
|
||||||
|
#[serde(skip)]
|
||||||
|
pub response_text: Option<String>,
|
||||||
/// Google-specific: response output tokens (non-thinking portion)
|
/// Google-specific: response output tokens (non-thinking portion)
|
||||||
pub response_output_tokens: u64,
|
pub response_output_tokens: u64,
|
||||||
|
|
||||||
@@ -122,10 +125,36 @@ impl MitmStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store latest usage for the cascade (if we can identify it)
|
// Store latest usage for the cascade (if we can identify it).
|
||||||
|
//
|
||||||
|
// Merge logic for v1internal thinking summaries:
|
||||||
|
// The LS makes TWO Google API calls per thinking request:
|
||||||
|
// Call 1: response + thinking token count (thinking_output_tokens > 0, no thinking text)
|
||||||
|
// Call 2: thinking summary text (thinking_output_tokens == 0, response_text has the summary)
|
||||||
|
//
|
||||||
|
// When Call 2 arrives, we merge its response_text as thinking_text into Call 1's usage.
|
||||||
let key = cascade_id.map(|s| s.to_string()).unwrap_or_else(|| "_latest".to_string());
|
let key = cascade_id.map(|s| s.to_string()).unwrap_or_else(|| "_latest".to_string());
|
||||||
let mut latest = self.latest_usage.write().await;
|
let mut latest = self.latest_usage.write().await;
|
||||||
latest.insert(key, usage);
|
|
||||||
|
if let Some(existing) = latest.get_mut(&key) {
|
||||||
|
if existing.thinking_output_tokens > 0
|
||||||
|
&& existing.thinking_text.is_none()
|
||||||
|
&& usage.thinking_output_tokens == 0
|
||||||
|
&& usage.response_text.is_some()
|
||||||
|
{
|
||||||
|
// Call 2: thinking summary — merge into existing Call 1 usage
|
||||||
|
existing.thinking_text = usage.response_text;
|
||||||
|
debug!(
|
||||||
|
thinking_text_len = existing.thinking_text.as_ref().map_or(0, |t| t.len()),
|
||||||
|
"MITM: merged thinking summary text into existing usage"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Normal case: replace existing usage
|
||||||
|
latest.insert(key, usage);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
latest.insert(key, usage);
|
||||||
|
}
|
||||||
|
|
||||||
// Evict old entries to prevent unbounded memory growth
|
// Evict old entries to prevent unbounded memory growth
|
||||||
const MAX_ENTRIES: usize = 500;
|
const MAX_ENTRIES: usize = 500;
|
||||||
|
|||||||
Reference in New Issue
Block a user