feat: capture thinking text from MITM-intercepted API responses
The LS strips thinking/reasoning text from plannerResponse steps — only the thinkingSignature (opaque verification blob) is preserved. The actual thinking text flows through the MITM proxy in the raw Google SSE response (parts with thought: true) and Anthropic SSE (thinking_delta content blocks). Changes: - StreamingAccumulator now accumulates thinking text from SSE events - ApiUsage gains thinking_text: Option<String> - usage_from_poll returns (Usage, Option<thinking_text>) - Thinking text priority: MITM-captured > LS-extracted (fallback) - Reasoning output item now populated from real API data - Removed debug dump code
This commit is contained in:
@@ -261,18 +261,21 @@ struct RequestParams {
|
|||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build Usage from the best available source:
|
/// Build Usage from the best available source, and extract thinking text from MITM:
|
||||||
/// 1. MITM intercepted data (real API tokens, including cache stats)
|
/// 1. MITM intercepted data (real API tokens, including cache stats + thinking text)
|
||||||
/// 2. LS trajectory data (real tokens, no cache info)
|
/// 2. LS trajectory data (real tokens, no cache info)
|
||||||
/// 3. Estimation from text lengths (fallback)
|
/// 3. Estimation from text lengths (fallback)
|
||||||
|
///
|
||||||
|
/// Returns (Usage, Option<thinking_text>). The LS strips thinking text from steps,
|
||||||
|
/// so we capture it from the raw MITM-intercepted API response.
|
||||||
async fn usage_from_poll(
|
async fn usage_from_poll(
|
||||||
mitm_store: &crate::mitm::store::MitmStore,
|
mitm_store: &crate::mitm::store::MitmStore,
|
||||||
cascade_id: &str,
|
cascade_id: &str,
|
||||||
model_usage: &Option<super::polling::ModelUsage>,
|
model_usage: &Option<super::polling::ModelUsage>,
|
||||||
input_text: &str,
|
input_text: &str,
|
||||||
output_text: &str,
|
output_text: &str,
|
||||||
) -> Usage {
|
) -> (Usage, Option<String>) {
|
||||||
// Priority 1: MITM intercepted data (most accurate — includes cache tokens)
|
// Priority 1: MITM intercepted data (most accurate — includes cache tokens + thinking text)
|
||||||
// Try exact cascade_id match first, then fall back to "_latest" (unmatched)
|
// Try exact cascade_id match first, then fall back to "_latest" (unmatched)
|
||||||
let mitm_usage = match mitm_store.take_usage(cascade_id).await {
|
let mitm_usage = match mitm_store.take_usage(cascade_id).await {
|
||||||
Some(u) => Some(u),
|
Some(u) => Some(u),
|
||||||
@@ -285,9 +288,11 @@ async fn usage_from_poll(
|
|||||||
cache_read = mitm_usage.cache_read_input_tokens,
|
cache_read = mitm_usage.cache_read_input_tokens,
|
||||||
cache_create = mitm_usage.cache_creation_input_tokens,
|
cache_create = mitm_usage.cache_creation_input_tokens,
|
||||||
thinking = mitm_usage.thinking_output_tokens,
|
thinking = mitm_usage.thinking_output_tokens,
|
||||||
|
thinking_text_len = mitm_usage.thinking_text.as_ref().map_or(0, |t| t.len()),
|
||||||
"Using MITM intercepted usage"
|
"Using MITM intercepted usage"
|
||||||
);
|
);
|
||||||
return Usage {
|
let thinking_text = mitm_usage.thinking_text;
|
||||||
|
let usage = Usage {
|
||||||
input_tokens: mitm_usage.input_tokens,
|
input_tokens: mitm_usage.input_tokens,
|
||||||
input_tokens_details: InputTokensDetails {
|
input_tokens_details: InputTokensDetails {
|
||||||
cached_tokens: mitm_usage.cache_read_input_tokens,
|
cached_tokens: mitm_usage.cache_read_input_tokens,
|
||||||
@@ -298,21 +303,22 @@ async fn usage_from_poll(
|
|||||||
},
|
},
|
||||||
total_tokens: mitm_usage.input_tokens + mitm_usage.output_tokens,
|
total_tokens: mitm_usage.input_tokens + mitm_usage.output_tokens,
|
||||||
};
|
};
|
||||||
|
return (usage, thinking_text);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Priority 2: LS trajectory data (from CHECKPOINT/metadata steps)
|
// Priority 2: LS trajectory data (from CHECKPOINT/metadata steps)
|
||||||
if let Some(u) = model_usage {
|
if let Some(u) = model_usage {
|
||||||
return Usage {
|
return (Usage {
|
||||||
input_tokens: u.input_tokens,
|
input_tokens: u.input_tokens,
|
||||||
input_tokens_details: InputTokensDetails { cached_tokens: 0 },
|
input_tokens_details: InputTokensDetails { cached_tokens: 0 },
|
||||||
output_tokens: u.output_tokens,
|
output_tokens: u.output_tokens,
|
||||||
output_tokens_details: OutputTokensDetails { reasoning_tokens: 0 },
|
output_tokens_details: OutputTokensDetails { reasoning_tokens: 0 },
|
||||||
total_tokens: u.input_tokens + u.output_tokens,
|
total_tokens: u.input_tokens + u.output_tokens,
|
||||||
};
|
}, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Priority 3: Estimate from text lengths
|
// Priority 3: Estimate from text lengths
|
||||||
Usage::estimate(input_text, output_text)
|
(Usage::estimate(input_text, output_text), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Sync response ───────────────────────────────────────────────────────────
|
// ─── Sync response ───────────────────────────────────────────────────────────
|
||||||
@@ -333,12 +339,15 @@ async fn handle_responses_sync(
|
|||||||
uuid::Uuid::new_v4().to_string().replace('-', "")
|
uuid::Uuid::new_v4().to_string().replace('-', "")
|
||||||
);
|
);
|
||||||
|
|
||||||
let usage = usage_from_poll(&state.mitm_store, &cascade_id, &poll_result.usage, ¶ms.user_text, &poll_result.text).await;
|
let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &poll_result.usage, ¶ms.user_text, &poll_result.text).await;
|
||||||
|
|
||||||
|
// Thinking text priority: MITM-captured (raw API) > LS-extracted (steps)
|
||||||
|
let thinking_text = mitm_thinking.or(poll_result.thinking);
|
||||||
|
|
||||||
// Build output array: [reasoning (if present), message]
|
// Build output array: [reasoning (if present), message]
|
||||||
let mut output_items: Vec<serde_json::Value> = Vec::new();
|
let mut output_items: Vec<serde_json::Value> = Vec::new();
|
||||||
if let Some(ref thinking_text) = poll_result.thinking {
|
if let Some(ref thinking) = thinking_text {
|
||||||
output_items.push(build_reasoning_output(thinking_text));
|
output_items.push(build_reasoning_output(thinking));
|
||||||
}
|
}
|
||||||
output_items.push(build_message_output(&msg_id, &poll_result.text));
|
output_items.push(build_message_output(&msg_id, &poll_result.text));
|
||||||
|
|
||||||
@@ -479,9 +488,9 @@ async fn handle_responses_stream(
|
|||||||
if is_response_done(steps) && !last_text.is_empty() {
|
if is_response_done(steps) && !last_text.is_empty() {
|
||||||
debug!("Response done, text length={}", last_text.len());
|
debug!("Response done, text length={}", last_text.len());
|
||||||
let mu = extract_model_usage(steps);
|
let mu = extract_model_usage(steps);
|
||||||
let usage = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await;
|
let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await;
|
||||||
let ts = extract_thinking_signature(steps);
|
let ts = extract_thinking_signature(steps);
|
||||||
let tc = extract_thinking_content(steps);
|
let tc = mitm_thinking.or_else(|| extract_thinking_content(steps));
|
||||||
let td = extract_thinking_duration(steps);
|
let td = extract_thinking_duration(steps);
|
||||||
for evt in completion_events(
|
for evt in completion_events(
|
||||||
&response_id, &model_name, &msg_id,
|
&response_id, &model_name, &msg_id,
|
||||||
@@ -502,9 +511,9 @@ async fn handle_responses_stream(
|
|||||||
if run_status.contains("IDLE") && !last_text.is_empty() {
|
if run_status.contains("IDLE") && !last_text.is_empty() {
|
||||||
debug!("Trajectory IDLE, text length={}", last_text.len());
|
debug!("Trajectory IDLE, text length={}", last_text.len());
|
||||||
let mu = extract_model_usage(steps);
|
let mu = extract_model_usage(steps);
|
||||||
let usage = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await;
|
let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await;
|
||||||
let ts = extract_thinking_signature(steps);
|
let ts = extract_thinking_signature(steps);
|
||||||
let tc = extract_thinking_content(steps);
|
let tc = mitm_thinking.or_else(|| extract_thinking_content(steps));
|
||||||
let td = extract_thinking_duration(steps);
|
let td = extract_thinking_duration(steps);
|
||||||
for evt in completion_events(
|
for evt in completion_events(
|
||||||
&response_id, &model_name, &msg_id,
|
&response_id, &model_name, &msg_id,
|
||||||
|
|||||||
@@ -57,6 +57,8 @@ pub struct StreamingAccumulator {
|
|||||||
pub cache_creation_input_tokens: u64,
|
pub cache_creation_input_tokens: u64,
|
||||||
pub cache_read_input_tokens: u64,
|
pub cache_read_input_tokens: u64,
|
||||||
pub thinking_tokens: u64,
|
pub thinking_tokens: u64,
|
||||||
|
/// Accumulated thinking/reasoning text from the model.
|
||||||
|
pub thinking_text: String,
|
||||||
pub model: Option<String>,
|
pub model: Option<String>,
|
||||||
pub stop_reason: Option<String>,
|
pub stop_reason: Option<String>,
|
||||||
pub is_complete: bool,
|
pub is_complete: bool,
|
||||||
@@ -81,9 +83,19 @@ impl StreamingAccumulator {
|
|||||||
if let Some(model) = response["modelVersion"].as_str() {
|
if let Some(model) = response["modelVersion"].as_str() {
|
||||||
self.model = Some(model.to_string());
|
self.model = Some(model.to_string());
|
||||||
}
|
}
|
||||||
// Check for completion in candidates
|
// Extract thinking text from parts with thought: true
|
||||||
if let Some(candidates) = response.get("candidates").and_then(|c| c.as_array()) {
|
if let Some(candidates) = response.get("candidates").and_then(|c| c.as_array()) {
|
||||||
for candidate in candidates {
|
for candidate in candidates {
|
||||||
|
if let Some(parts) = candidate["content"]["parts"].as_array() {
|
||||||
|
for part in parts {
|
||||||
|
if part["thought"].as_bool() == Some(true) {
|
||||||
|
if let Some(text) = part["text"].as_str() {
|
||||||
|
self.thinking_text.push_str(text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check for completion
|
||||||
if let Some(reason) = candidate["finishReason"].as_str() {
|
if let Some(reason) = candidate["finishReason"].as_str() {
|
||||||
self.stop_reason = Some(reason.to_string());
|
self.stop_reason = Some(reason.to_string());
|
||||||
if reason == "STOP" {
|
if reason == "STOP" {
|
||||||
@@ -97,6 +109,7 @@ impl StreamingAccumulator {
|
|||||||
input = self.input_tokens,
|
input = self.input_tokens,
|
||||||
output = self.output_tokens,
|
output = self.output_tokens,
|
||||||
thinking = self.thinking_tokens,
|
thinking = self.thinking_tokens,
|
||||||
|
thinking_text_len = self.thinking_text.len(),
|
||||||
complete = self.is_complete,
|
complete = self.is_complete,
|
||||||
"SSE Google: usage update"
|
"SSE Google: usage update"
|
||||||
);
|
);
|
||||||
@@ -136,7 +149,16 @@ impl StreamingAccumulator {
|
|||||||
"SSE Anthropic: stream complete"
|
"SSE Anthropic: stream complete"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
"content_block_start" | "content_block_delta" | "content_block_stop" | "ping" => {}
|
// Anthropic thinking content blocks
|
||||||
|
"content_block_delta" => {
|
||||||
|
// type: "thinking" delta contains thinking text
|
||||||
|
if event["delta"]["type"].as_str() == Some("thinking_delta") {
|
||||||
|
if let Some(text) = event["delta"]["thinking"].as_str() {
|
||||||
|
self.thinking_text.push_str(text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"content_block_start" | "content_block_stop" | "ping" => {}
|
||||||
_ => {
|
_ => {
|
||||||
trace!(event_type, "SSE: unknown event type");
|
trace!(event_type, "SSE: unknown event type");
|
||||||
}
|
}
|
||||||
@@ -145,12 +167,18 @@ impl StreamingAccumulator {
|
|||||||
|
|
||||||
/// Convert accumulated data to an ApiUsage.
|
/// Convert accumulated data to an ApiUsage.
|
||||||
pub fn into_usage(self) -> ApiUsage {
|
pub fn into_usage(self) -> ApiUsage {
|
||||||
|
let thinking_text = if self.thinking_text.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(self.thinking_text)
|
||||||
|
};
|
||||||
ApiUsage {
|
ApiUsage {
|
||||||
input_tokens: self.input_tokens,
|
input_tokens: self.input_tokens,
|
||||||
output_tokens: self.output_tokens,
|
output_tokens: self.output_tokens,
|
||||||
cache_creation_input_tokens: self.cache_creation_input_tokens,
|
cache_creation_input_tokens: self.cache_creation_input_tokens,
|
||||||
cache_read_input_tokens: self.cache_read_input_tokens,
|
cache_read_input_tokens: self.cache_read_input_tokens,
|
||||||
thinking_output_tokens: self.thinking_tokens,
|
thinking_output_tokens: self.thinking_tokens,
|
||||||
|
thinking_text,
|
||||||
response_output_tokens: 0,
|
response_output_tokens: 0,
|
||||||
model: self.model,
|
model: self.model,
|
||||||
stop_reason: self.stop_reason,
|
stop_reason: self.stop_reason,
|
||||||
@@ -174,6 +202,7 @@ fn extract_usage_from_message(msg: &Value) -> Option<ApiUsage> {
|
|||||||
cache_creation_input_tokens: usage["cache_creation_input_tokens"].as_u64().unwrap_or(0),
|
cache_creation_input_tokens: usage["cache_creation_input_tokens"].as_u64().unwrap_or(0),
|
||||||
cache_read_input_tokens: usage["cache_read_input_tokens"].as_u64().unwrap_or(0),
|
cache_read_input_tokens: usage["cache_read_input_tokens"].as_u64().unwrap_or(0),
|
||||||
thinking_output_tokens: 0,
|
thinking_output_tokens: 0,
|
||||||
|
thinking_text: None,
|
||||||
response_output_tokens: 0,
|
response_output_tokens: 0,
|
||||||
model: msg["model"].as_str().map(|s| s.to_string()),
|
model: msg["model"].as_str().map(|s| s.to_string()),
|
||||||
stop_reason: msg["stop_reason"].as_str().map(|s| s.to_string()),
|
stop_reason: msg["stop_reason"].as_str().map(|s| s.to_string()),
|
||||||
|
|||||||
@@ -79,6 +79,7 @@ impl GrpcUsage {
|
|||||||
input_tokens: self.input_tokens,
|
input_tokens: self.input_tokens,
|
||||||
output_tokens: self.output_tokens,
|
output_tokens: self.output_tokens,
|
||||||
thinking_output_tokens: self.thinking_output_tokens,
|
thinking_output_tokens: self.thinking_output_tokens,
|
||||||
|
thinking_text: None, // gRPC proto doesn't carry thinking text
|
||||||
response_output_tokens: self.response_output_tokens,
|
response_output_tokens: self.response_output_tokens,
|
||||||
cache_creation_input_tokens: self.cache_write_tokens,
|
cache_creation_input_tokens: self.cache_write_tokens,
|
||||||
cache_read_input_tokens: self.cache_read_tokens,
|
cache_read_input_tokens: self.cache_read_tokens,
|
||||||
|
|||||||
@@ -22,6 +22,10 @@ pub struct ApiUsage {
|
|||||||
pub cache_read_input_tokens: u64,
|
pub cache_read_input_tokens: u64,
|
||||||
/// Google-specific: thinking/reasoning output tokens (extended thinking)
|
/// Google-specific: thinking/reasoning output tokens (extended thinking)
|
||||||
pub thinking_output_tokens: u64,
|
pub thinking_output_tokens: u64,
|
||||||
|
/// The actual thinking/reasoning text from the model.
|
||||||
|
/// Captured from Google SSE parts with `thought: true` or Anthropic thinking blocks.
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub thinking_text: Option<String>,
|
||||||
/// Google-specific: response output tokens (non-thinking portion)
|
/// Google-specific: response output tokens (non-thinking portion)
|
||||||
pub response_output_tokens: u64,
|
pub response_output_tokens: u64,
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user