//! API response interceptor: parses Anthropic/Google API responses to extract usage data. //! //! Handles both streaming (SSE) and non-streaming (JSON) responses. use super::store::ApiUsage; use serde_json::Value; use tracing::{debug, trace}; /// Parse a complete (non-streaming) Anthropic Messages API response body. /// /// Response format: /// ```json /// { /// "id": "msg_...", /// "type": "message", /// "model": "claude-sonnet-4-20250514", /// "usage": { /// "input_tokens": 1234, /// "output_tokens": 567, /// "cache_creation_input_tokens": 0, /// "cache_read_input_tokens": 890 /// }, /// "stop_reason": "end_turn" /// } /// ``` pub fn parse_non_streaming_response(body: &[u8]) -> Option { let json: Value = serde_json::from_slice(body).ok()?; extract_usage_from_message(&json) } /// Parse SSE events from a streaming Anthropic response body chunk. /// /// Events of interest: /// - `message_start` — contains `message.usage.input_tokens` + cache tokens /// - `message_delta` — contains `usage.output_tokens` /// - `message_stop` — marks end (no usage data) /// /// Returns accumulated usage across all events in this chunk. pub fn parse_streaming_chunk(chunk: &str, accumulator: &mut StreamingAccumulator) { for line in chunk.lines() { if let Some(data) = line.strip_prefix("data: ") { if data.trim() == "[DONE]" { continue; } if let Ok(event) = serde_json::from_str::(data) { accumulator.process_event(&event); } } } } /// Accumulates usage data across streaming SSE events. #[derive(Debug, Default)] pub struct StreamingAccumulator { pub input_tokens: u64, pub output_tokens: u64, pub cache_creation_input_tokens: u64, pub cache_read_input_tokens: u64, pub thinking_tokens: u64, pub model: Option, pub stop_reason: Option, pub is_complete: bool, pub api_provider: Option, } impl StreamingAccumulator { pub fn new() -> Self { Self::default() } /// Process a single SSE event. pub fn process_event(&mut self, event: &Value) { // ── Google format: {"response": {"usageMetadata": {...}, "modelVersion": "..."}} ── if let Some(response) = event.get("response") { // Extract usage metadata (each event has cumulative counts) if let Some(usage) = response.get("usageMetadata") { self.input_tokens = usage["promptTokenCount"].as_u64().unwrap_or(self.input_tokens); self.output_tokens = usage["candidatesTokenCount"].as_u64().unwrap_or(self.output_tokens); self.thinking_tokens = usage["thoughtsTokenCount"].as_u64().unwrap_or(self.thinking_tokens); } if let Some(model) = response["modelVersion"].as_str() { self.model = Some(model.to_string()); } // Check for completion in candidates if let Some(candidates) = response.get("candidates").and_then(|c| c.as_array()) { for candidate in candidates { if let Some(reason) = candidate["finishReason"].as_str() { self.stop_reason = Some(reason.to_string()); if reason == "STOP" { self.is_complete = true; } } } } self.api_provider = Some("google".to_string()); trace!( input = self.input_tokens, output = self.output_tokens, thinking = self.thinking_tokens, complete = self.is_complete, "SSE Google: usage update" ); return; } // ── Anthropic format: {"type": "message_start"|"message_delta"|"message_stop"} ── let event_type = event["type"].as_str().unwrap_or(""); match event_type { "message_start" => { if let Some(usage) = event.get("message").and_then(|m| m.get("usage")) { self.input_tokens = usage["input_tokens"].as_u64().unwrap_or(0); self.cache_creation_input_tokens = usage["cache_creation_input_tokens"].as_u64().unwrap_or(0); self.cache_read_input_tokens = usage["cache_read_input_tokens"].as_u64().unwrap_or(0); } if let Some(model) = event.get("message").and_then(|m| m["model"].as_str()) { self.model = Some(model.to_string()); } self.api_provider = Some("anthropic".to_string()); trace!(input = self.input_tokens, "SSE Anthropic: message_start"); } "message_delta" => { if let Some(usage) = event.get("usage") { self.output_tokens = usage["output_tokens"].as_u64().unwrap_or(self.output_tokens); } if let Some(reason) = event["delta"]["stop_reason"].as_str() { self.stop_reason = Some(reason.to_string()); } } "message_stop" => { self.is_complete = true; debug!( input = self.input_tokens, output = self.output_tokens, model = ?self.model, "SSE Anthropic: stream complete" ); } "content_block_start" | "content_block_delta" | "content_block_stop" | "ping" => {} _ => { trace!(event_type, "SSE: unknown event type"); } } } /// Convert accumulated data to an ApiUsage. pub fn into_usage(self) -> ApiUsage { ApiUsage { input_tokens: self.input_tokens, output_tokens: self.output_tokens, cache_creation_input_tokens: self.cache_creation_input_tokens, cache_read_input_tokens: self.cache_read_input_tokens, thinking_output_tokens: self.thinking_tokens, response_output_tokens: 0, model: self.model, stop_reason: self.stop_reason, api_provider: self.api_provider.unwrap_or_else(|| "unknown".to_string()).into(), grpc_method: None, captured_at: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), } } } /// Extract usage from a complete Message JSON object. fn extract_usage_from_message(msg: &Value) -> Option { let usage = msg.get("usage")?; Some(ApiUsage { input_tokens: usage["input_tokens"].as_u64().unwrap_or(0), output_tokens: usage["output_tokens"].as_u64().unwrap_or(0), cache_creation_input_tokens: usage["cache_creation_input_tokens"].as_u64().unwrap_or(0), cache_read_input_tokens: usage["cache_read_input_tokens"].as_u64().unwrap_or(0), thinking_output_tokens: 0, response_output_tokens: 0, model: msg["model"].as_str().map(|s| s.to_string()), stop_reason: msg["stop_reason"].as_str().map(|s| s.to_string()), api_provider: Some("anthropic".to_string()), grpc_method: None, captured_at: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), }) } /// Try to identify a cascade ID from the request body. /// /// The LS includes cascade-related metadata in its API requests (as part of /// the system prompt or metadata field). We try to find it. pub fn extract_cascade_hint(request_body: &[u8]) -> Option { let json: Value = serde_json::from_slice(request_body).ok()?; // Check for metadata field (some API configurations include it) if let Some(metadata) = json.get("metadata") { if let Some(user_id) = metadata["user_id"].as_str() { // The LS often sets user_id to the cascadeId return Some(user_id.to_string()); } } // Check system prompt for cascade/workspace markers if let Some(system) = json.get("system") { let system_str = match system { Value::String(s) => s.clone(), Value::Array(arr) => { // Array of content blocks arr.iter() .filter_map(|b| b["text"].as_str()) .collect::>() .join(" ") } _ => return None, }; // Look for workspace_id or cascade_id patterns if let Some(pos) = system_str.find("workspace_id") { let rest = &system_str[pos..]; // Extract the value after workspace_id if let Some(val) = rest.split_whitespace().nth(1) { return Some(val.to_string()); } } } None } #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_non_streaming() { let body = r#"{ "id": "msg_123", "type": "message", "model": "claude-sonnet-4-20250514", "usage": { "input_tokens": 100, "output_tokens": 50, "cache_creation_input_tokens": 10, "cache_read_input_tokens": 30 }, "stop_reason": "end_turn" }"#; let usage = parse_non_streaming_response(body.as_bytes()).unwrap(); assert_eq!(usage.input_tokens, 100); assert_eq!(usage.output_tokens, 50); assert_eq!(usage.cache_creation_input_tokens, 10); assert_eq!(usage.cache_read_input_tokens, 30); assert_eq!(usage.model.as_deref(), Some("claude-sonnet-4-20250514")); } #[test] fn test_streaming_accumulator() { let mut acc = StreamingAccumulator::new(); // message_start let start = serde_json::json!({ "type": "message_start", "message": { "model": "claude-sonnet-4-20250514", "usage": { "input_tokens": 200, "cache_creation_input_tokens": 5, "cache_read_input_tokens": 50 } } }); acc.process_event(&start); assert_eq!(acc.input_tokens, 200); assert_eq!(acc.cache_read_input_tokens, 50); // message_delta let delta = serde_json::json!({ "type": "message_delta", "delta": { "stop_reason": "end_turn" }, "usage": { "output_tokens": 75 } }); acc.process_event(&delta); assert_eq!(acc.output_tokens, 75); // message_stop let stop = serde_json::json!({ "type": "message_stop" }); acc.process_event(&stop); assert!(acc.is_complete); let usage = acc.into_usage(); assert_eq!(usage.input_tokens, 200); assert_eq!(usage.output_tokens, 75); assert_eq!(usage.api_provider, Some("anthropic".to_string())); } #[test] fn test_google_sse_single_event() { let mut acc = StreamingAccumulator::new(); let event = serde_json::json!({ "response": { "candidates": [{"content": {"role": "model", "parts": [{"text": "4"}]}}], "usageMetadata": { "promptTokenCount": 1514, "candidatesTokenCount": 25, "totalTokenCount": 1539, "thoughtsTokenCount": 52 }, "modelVersion": "gemini-3-flash", "responseId": "abc123" }, "traceId": "trace456", "metadata": {} }); acc.process_event(&event); assert_eq!(acc.input_tokens, 1514); assert_eq!(acc.output_tokens, 25); assert_eq!(acc.thinking_tokens, 52); assert_eq!(acc.model, Some("gemini-3-flash".to_string())); assert!(!acc.is_complete); // no finishReason yet assert_eq!(acc.api_provider, Some("google".to_string())); } #[test] fn test_google_sse_multi_event_accumulation() { let mut acc = StreamingAccumulator::new(); // First event — partial response let event1 = serde_json::json!({ "response": { "candidates": [{"content": {"role": "model", "parts": [{"text": "Hello"}]}}], "usageMetadata": { "promptTokenCount": 1514, "candidatesTokenCount": 6, "totalTokenCount": 1520 }, "modelVersion": "gemini-2.5-flash-lite" }, "traceId": "t1", "metadata": {} }); acc.process_event(&event1); assert_eq!(acc.output_tokens, 6); assert!(!acc.is_complete); // Second event — more output let event2 = serde_json::json!({ "response": { "candidates": [{"content": {"role": "model", "parts": [{"text": " world"}]}}], "usageMetadata": { "promptTokenCount": 1514, "candidatesTokenCount": 22, "totalTokenCount": 1536 }, "modelVersion": "gemini-2.5-flash-lite" }, "traceId": "t1", "metadata": {} }); acc.process_event(&event2); assert_eq!(acc.output_tokens, 22); // cumulative, not additive // Third event — completion let event3 = serde_json::json!({ "response": { "candidates": [{"content": {"role": "model", "parts": [{"text": "!"}]}, "finishReason": "STOP"}], "usageMetadata": { "promptTokenCount": 1514, "candidatesTokenCount": 25, "totalTokenCount": 1539, "thoughtsTokenCount": 52 }, "modelVersion": "gemini-2.5-flash-lite" }, "traceId": "t1", "metadata": {} }); acc.process_event(&event3); assert!(acc.is_complete); assert_eq!(acc.output_tokens, 25); assert_eq!(acc.thinking_tokens, 52); assert_eq!(acc.stop_reason, Some("STOP".to_string())); let usage = acc.into_usage(); assert_eq!(usage.input_tokens, 1514); assert_eq!(usage.output_tokens, 25); assert_eq!(usage.thinking_output_tokens, 52); assert_eq!(usage.model, Some("gemini-2.5-flash-lite".to_string())); assert_eq!(usage.api_provider, Some("google".to_string())); } #[test] fn test_google_sse_parse_streaming_chunk() { // Simulates real SSE data with HTTP chunked framing (hex sizes on their own lines) let chunk = r#"150 data: {"response": {"candidates": [{"content": {"role": "model","parts": [{"text": "4"}]}}],"usageMetadata": {"promptTokenCount": 14615,"candidatesTokenCount": 1,"totalTokenCount": 14668,"thoughtsTokenCount": 52},"modelVersion": "gemini-3-flash","responseId": "agaRacPLC4WHz7IPreOl8QM"},"traceId": "8145be7112baf823","metadata": {}} 2f1 data: {"response": {"candidates": [{"content": {"role": "model","parts": [{"text": ""}]},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 14615,"candidatesTokenCount": 1,"totalTokenCount": 14668,"thoughtsTokenCount": 52},"modelVersion": "gemini-3-flash","responseId": "agaRacPLC4WHz7IPreOl8QM"},"traceId": "8145be7112baf823","metadata": {}} 0 "#; let mut acc = StreamingAccumulator::new(); parse_streaming_chunk(chunk, &mut acc); assert_eq!(acc.input_tokens, 14615); assert_eq!(acc.output_tokens, 1); assert_eq!(acc.thinking_tokens, 52); assert!(acc.is_complete); assert_eq!(acc.model, Some("gemini-3-flash".to_string())); assert_eq!(acc.stop_reason, Some("STOP".to_string())); } #[test] fn test_google_sse_no_thinking_tokens() { let mut acc = StreamingAccumulator::new(); let event = serde_json::json!({ "response": { "candidates": [{"content": {"role": "model", "parts": [{"text": "hi"}]}, "finishReason": "STOP"}], "usageMetadata": { "promptTokenCount": 100, "candidatesTokenCount": 5, "totalTokenCount": 105 }, "modelVersion": "gemini-2.5-flash-lite" }, "traceId": "t1", "metadata": {} }); acc.process_event(&event); assert_eq!(acc.thinking_tokens, 0); // no thoughtsTokenCount field assert!(acc.is_complete); let usage = acc.into_usage(); assert_eq!(usage.thinking_output_tokens, 0); } }