diff --git a/src/api/responses.rs b/src/api/responses.rs index 90cfe88..969d79e 100644 --- a/src/api/responses.rs +++ b/src/api/responses.rs @@ -261,18 +261,21 @@ struct RequestParams { metadata: serde_json::Value, } -/// Build Usage from the best available source: -/// 1. MITM intercepted data (real API tokens, including cache stats) +/// Build Usage from the best available source, and extract thinking text from MITM: +/// 1. MITM intercepted data (real API tokens, including cache stats + thinking text) /// 2. LS trajectory data (real tokens, no cache info) /// 3. Estimation from text lengths (fallback) +/// +/// Returns (Usage, Option). The LS strips thinking text from steps, +/// so we capture it from the raw MITM-intercepted API response. async fn usage_from_poll( mitm_store: &crate::mitm::store::MitmStore, cascade_id: &str, model_usage: &Option, input_text: &str, output_text: &str, -) -> Usage { - // Priority 1: MITM intercepted data (most accurate — includes cache tokens) +) -> (Usage, Option) { + // Priority 1: MITM intercepted data (most accurate — includes cache tokens + thinking text) // Try exact cascade_id match first, then fall back to "_latest" (unmatched) let mitm_usage = match mitm_store.take_usage(cascade_id).await { Some(u) => Some(u), @@ -285,9 +288,11 @@ async fn usage_from_poll( cache_read = mitm_usage.cache_read_input_tokens, cache_create = mitm_usage.cache_creation_input_tokens, thinking = mitm_usage.thinking_output_tokens, + thinking_text_len = mitm_usage.thinking_text.as_ref().map_or(0, |t| t.len()), "Using MITM intercepted usage" ); - return Usage { + let thinking_text = mitm_usage.thinking_text; + let usage = Usage { input_tokens: mitm_usage.input_tokens, input_tokens_details: InputTokensDetails { cached_tokens: mitm_usage.cache_read_input_tokens, @@ -298,21 +303,22 @@ async fn usage_from_poll( }, total_tokens: mitm_usage.input_tokens + mitm_usage.output_tokens, }; + return (usage, thinking_text); } // Priority 2: LS trajectory data (from CHECKPOINT/metadata steps) if let Some(u) = model_usage { - return Usage { + return (Usage { input_tokens: u.input_tokens, input_tokens_details: InputTokensDetails { cached_tokens: 0 }, output_tokens: u.output_tokens, output_tokens_details: OutputTokensDetails { reasoning_tokens: 0 }, total_tokens: u.input_tokens + u.output_tokens, - }; + }, None); } // Priority 3: Estimate from text lengths - Usage::estimate(input_text, output_text) + (Usage::estimate(input_text, output_text), None) } // ─── Sync response ─────────────────────────────────────────────────────────── @@ -333,12 +339,15 @@ async fn handle_responses_sync( uuid::Uuid::new_v4().to_string().replace('-', "") ); - let usage = usage_from_poll(&state.mitm_store, &cascade_id, &poll_result.usage, ¶ms.user_text, &poll_result.text).await; + let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &poll_result.usage, ¶ms.user_text, &poll_result.text).await; + + // Thinking text priority: MITM-captured (raw API) > LS-extracted (steps) + let thinking_text = mitm_thinking.or(poll_result.thinking); // Build output array: [reasoning (if present), message] let mut output_items: Vec = Vec::new(); - if let Some(ref thinking_text) = poll_result.thinking { - output_items.push(build_reasoning_output(thinking_text)); + if let Some(ref thinking) = thinking_text { + output_items.push(build_reasoning_output(thinking)); } output_items.push(build_message_output(&msg_id, &poll_result.text)); @@ -479,9 +488,9 @@ async fn handle_responses_stream( if is_response_done(steps) && !last_text.is_empty() { debug!("Response done, text length={}", last_text.len()); let mu = extract_model_usage(steps); - let usage = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; + let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); - let tc = extract_thinking_content(steps); + let tc = mitm_thinking.or_else(|| extract_thinking_content(steps)); let td = extract_thinking_duration(steps); for evt in completion_events( &response_id, &model_name, &msg_id, @@ -502,9 +511,9 @@ async fn handle_responses_stream( if run_status.contains("IDLE") && !last_text.is_empty() { debug!("Trajectory IDLE, text length={}", last_text.len()); let mu = extract_model_usage(steps); - let usage = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; + let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); - let tc = extract_thinking_content(steps); + let tc = mitm_thinking.or_else(|| extract_thinking_content(steps)); let td = extract_thinking_duration(steps); for evt in completion_events( &response_id, &model_name, &msg_id, diff --git a/src/mitm/intercept.rs b/src/mitm/intercept.rs index 6d965a2..cd94941 100644 --- a/src/mitm/intercept.rs +++ b/src/mitm/intercept.rs @@ -57,6 +57,8 @@ pub struct StreamingAccumulator { pub cache_creation_input_tokens: u64, pub cache_read_input_tokens: u64, pub thinking_tokens: u64, + /// Accumulated thinking/reasoning text from the model. + pub thinking_text: String, pub model: Option, pub stop_reason: Option, pub is_complete: bool, @@ -81,9 +83,19 @@ impl StreamingAccumulator { if let Some(model) = response["modelVersion"].as_str() { self.model = Some(model.to_string()); } - // Check for completion in candidates + // Extract thinking text from parts with thought: true if let Some(candidates) = response.get("candidates").and_then(|c| c.as_array()) { for candidate in candidates { + if let Some(parts) = candidate["content"]["parts"].as_array() { + for part in parts { + if part["thought"].as_bool() == Some(true) { + if let Some(text) = part["text"].as_str() { + self.thinking_text.push_str(text); + } + } + } + } + // Check for completion if let Some(reason) = candidate["finishReason"].as_str() { self.stop_reason = Some(reason.to_string()); if reason == "STOP" { @@ -97,6 +109,7 @@ impl StreamingAccumulator { input = self.input_tokens, output = self.output_tokens, thinking = self.thinking_tokens, + thinking_text_len = self.thinking_text.len(), complete = self.is_complete, "SSE Google: usage update" ); @@ -136,7 +149,16 @@ impl StreamingAccumulator { "SSE Anthropic: stream complete" ); } - "content_block_start" | "content_block_delta" | "content_block_stop" | "ping" => {} + // Anthropic thinking content blocks + "content_block_delta" => { + // type: "thinking" delta contains thinking text + if event["delta"]["type"].as_str() == Some("thinking_delta") { + if let Some(text) = event["delta"]["thinking"].as_str() { + self.thinking_text.push_str(text); + } + } + } + "content_block_start" | "content_block_stop" | "ping" => {} _ => { trace!(event_type, "SSE: unknown event type"); } @@ -145,12 +167,18 @@ impl StreamingAccumulator { /// Convert accumulated data to an ApiUsage. pub fn into_usage(self) -> ApiUsage { + let thinking_text = if self.thinking_text.is_empty() { + None + } else { + Some(self.thinking_text) + }; ApiUsage { input_tokens: self.input_tokens, output_tokens: self.output_tokens, cache_creation_input_tokens: self.cache_creation_input_tokens, cache_read_input_tokens: self.cache_read_input_tokens, thinking_output_tokens: self.thinking_tokens, + thinking_text, response_output_tokens: 0, model: self.model, stop_reason: self.stop_reason, @@ -174,6 +202,7 @@ fn extract_usage_from_message(msg: &Value) -> Option { cache_creation_input_tokens: usage["cache_creation_input_tokens"].as_u64().unwrap_or(0), cache_read_input_tokens: usage["cache_read_input_tokens"].as_u64().unwrap_or(0), thinking_output_tokens: 0, + thinking_text: None, response_output_tokens: 0, model: msg["model"].as_str().map(|s| s.to_string()), stop_reason: msg["stop_reason"].as_str().map(|s| s.to_string()), diff --git a/src/mitm/proto.rs b/src/mitm/proto.rs index 3f1e958..5d016ad 100644 --- a/src/mitm/proto.rs +++ b/src/mitm/proto.rs @@ -79,6 +79,7 @@ impl GrpcUsage { input_tokens: self.input_tokens, output_tokens: self.output_tokens, thinking_output_tokens: self.thinking_output_tokens, + thinking_text: None, // gRPC proto doesn't carry thinking text response_output_tokens: self.response_output_tokens, cache_creation_input_tokens: self.cache_write_tokens, cache_read_input_tokens: self.cache_read_tokens, diff --git a/src/mitm/store.rs b/src/mitm/store.rs index 76e6ff2..71931d3 100644 --- a/src/mitm/store.rs +++ b/src/mitm/store.rs @@ -22,6 +22,10 @@ pub struct ApiUsage { pub cache_read_input_tokens: u64, /// Google-specific: thinking/reasoning output tokens (extended thinking) pub thinking_output_tokens: u64, + /// The actual thinking/reasoning text from the model. + /// Captured from Google SSE parts with `thought: true` or Anthropic thinking blocks. + #[serde(skip_serializing_if = "Option::is_none")] + pub thinking_text: Option, /// Google-specific: response output tokens (non-thinking portion) pub response_output_tokens: u64,