//! OpenAI Responses API (/v1/responses) handler. //! //! Strictly adheres to the official OpenAI Responses API protocol: //! https://platform.openai.com/docs/api-reference/responses use axum::{ extract::State, http::StatusCode, response::{sse::Event, IntoResponse, Json, Sse}, }; use rand::Rng; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use tracing::{debug, info, warn}; use super::models::{lookup_model, DEFAULT_MODEL, MODELS}; use super::polling::{ extract_model_usage, extract_response_text, extract_thinking_content, extract_thinking_signature, is_response_done, poll_for_response, }; use super::types::*; use super::util::{err_response, now_unix, responses_sse_event, upstream_err_response}; use super::AppState; use crate::mitm::modify::{openai_tool_choice_to_gemini, openai_tools_to_gemini}; use crate::mitm::store::PendingToolResult; // ─── Input extraction ──────────────────────────────────────────────────────── /// Parsed tool result from function_call_output items in input. struct ToolResultInput { call_id: String, output: String, } /// Extract user text from Responses API `input` field. /// Also extracts any function_call_output items for tool result handling, /// and the first inline image (base64 data URI) if present. fn extract_responses_input( input: &serde_json::Value, instructions: Option<&str>, ) -> ( String, Vec, Option, ) { let mut tool_results: Vec = Vec::new(); let mut image: Option = None; let user_text = match input { serde_json::Value::String(s) => s.clone(), serde_json::Value::Array(items) => { // Check for function_call_output items for item in items { if item["type"].as_str() == Some("function_call_output") { if let (Some(call_id), Some(output)) = (item["call_id"].as_str(), item["output"].as_str()) { tool_results.push(ToolResultInput { call_id: call_id.to_string(), output: output.to_string(), }); } } // Extract first image from top-level input items if image.is_none() { image = super::util::extract_image_from_content(item); } } // If we have tool results but no text, generate a follow-up prompt if !tool_results.is_empty() { // Look for any text items alongside the tool results let text_items: String = items .iter() .filter(|item| { let t = item["type"].as_str().unwrap_or(""); t == "input_text" || t == "text" }) .filter_map(|p| p["text"].as_str()) .collect::>() .join(" "); if text_items.is_empty() { "Use the tool results to answer the original question.".to_string() } else { text_items } } else { // Normal input extraction // First try: flat content parts (input_text / input_image) let flat_text: String = items .iter() .filter(|item| { let t = item["type"].as_str().unwrap_or(""); t == "input_text" || t == "text" }) .filter_map(|p| p["text"].as_str()) .collect::>() .join(" "); if !flat_text.is_empty() { flat_text } else { // Fallback: conversation-style with role: "user" items .iter() .rev() .find(|item| item["role"].as_str() == Some("user")) .and_then(|item| { // Also scan content array for images if image.is_none() { image = super::util::extract_first_image(&item["content"]); } match &item["content"] { serde_json::Value::String(s) => Some(s.clone()), serde_json::Value::Array(parts) => Some( parts .iter() .filter(|p| { let t = p["type"].as_str().unwrap_or(""); t == "input_text" || t == "text" }) .filter_map(|p| p["text"].as_str()) .collect::>() .join(" "), ), _ => None, } }) .unwrap_or_default() } } } _ => String::new(), }; let final_text = match instructions { Some(inst) if !inst.is_empty() => format!("{inst}\n\n{user_text}"), _ => user_text, }; (final_text, tool_results, image) } /// Response-specific data for building a Response object. struct ResponseData { id: String, model: String, status: &'static str, created_at: u64, completed_at: Option, output: Vec, usage: Option, thinking_signature: Option, } /// Build a full Response object matching the official OpenAI schema. fn build_response_object(data: ResponseData, params: &RequestParams) -> ResponsesResponse { ResponsesResponse { id: data.id, object: "response", created_at: data.created_at, status: data.status, completed_at: data.completed_at, error: None, incomplete_details: None, instructions: params.instructions.clone(), max_output_tokens: params.max_output_tokens, model: data.model, output: data.output, parallel_tool_calls: true, previous_response_id: params.previous_response_id.clone(), reasoning: Reasoning { effort: params.reasoning_effort.clone(), summary: None, }, store: params.store, temperature: params.temperature, text: params.text_format.clone(), tool_choice: params.tool_choice.clone(), tools: params.tools.clone(), top_p: params.top_p, truncation: "disabled", usage: data.usage, user: params.user.clone(), metadata: params.metadata.clone(), thinking_signature: data.thinking_signature, } } /// Serialize a ResponsesResponse to serde_json::Value for SSE embedding. fn response_to_json(resp: &ResponsesResponse) -> serde_json::Value { serde_json::to_value(resp).unwrap_or(serde_json::json!({})) } // ─── Handler ───────────────────────────────────────────────────────────────── pub(crate) async fn handle_responses( State(state): State>, Json(body): Json, ) -> axum::response::Response { info!( "POST /v1/responses model={} stream={}", body.model.as_deref().unwrap_or(DEFAULT_MODEL), body.stream ); let model_name = body.model.as_deref().unwrap_or(DEFAULT_MODEL); let model = match lookup_model(model_name) { Some(m) => m, None => { let names: Vec<&str> = MODELS.iter().map(|m| m.name).collect(); return err_response( StatusCode::BAD_REQUEST, format!("Unknown model: {model_name}. Available: {names:?}"), "invalid_request_error", ); } }; let token = state.backend.oauth_token().await; if token.is_empty() { return err_response( StatusCode::UNAUTHORIZED, "No OAuth token. POST to /v1/token or set ZEROGRAVITY_TOKEN env var.".into(), "authentication_error", ); } let (user_text, tool_results, image) = extract_responses_input(&body.input, body.instructions.as_deref()); // Handle tool result submission (function_call_output in input) let is_tool_result_turn = !tool_results.is_empty(); let mut pending_tool_results: Vec = Vec::new(); if is_tool_result_turn { for tr in &tool_results { // For tool result turns, we use the call_id as the name directly. // The proxy captured function calls (with real names) are paired in // the ToolRound when we know the cascade_id later. let name = tr.call_id.clone(); let result_value = serde_json::from_str::(&tr.output) .unwrap_or_else(|_| serde_json::json!({"result": tr.output})); pending_tool_results.push(PendingToolResult { name, result: result_value, }); } info!( count = tool_results.len(), "Tool results for MITM injection (will build tool round after cascade_id)" ); } if user_text.is_empty() && !is_tool_result_turn { return err_response( StatusCode::BAD_REQUEST, "No user input found".to_string(), "invalid_request_error", ); } // ── Build per-request state locally ────────────────────────────────── // Detect web_search_preview tool (OpenAI spec) → enable Google Search grounding let has_web_search = body.tools.as_ref().is_some_and(|tools| { tools.iter().any(|t| { let t_type = t["type"].as_str().unwrap_or(""); t_type == "web_search_preview" || t_type == "web_search" }) }); // Convert OpenAI tools to Gemini format let tools = body.tools.as_ref().and_then(|t| { let gemini_tools = openai_tools_to_gemini(t); if gemini_tools.is_empty() { None } else { info!(count = t.len(), "Client tools for MITM injection"); Some(gemini_tools) } }); let tool_config = body.tool_choice.as_ref().map(openai_tool_choice_to_gemini); // Build generation params locally let (response_mime_type, response_schema, text_format) = if let Some(ref text_val) = body.text { let fmt_type = text_val["format"]["type"].as_str().unwrap_or("text"); if fmt_type == "json_schema" { let name = text_val["format"]["name"].as_str().map(|s| s.to_string()); let schema = text_val["format"]["schema"] .as_object() .map(|o| serde_json::Value::Object(o.clone())); let strict = text_val["format"]["strict"].as_bool(); let tf = TextFormat { format: TextFormatInner { format_type: "json_schema".to_string(), name: name.clone(), schema: schema.clone(), strict, }, }; (Some("application/json".to_string()), schema, tf) } else { (None, None, TextFormat::default()) } } else { (None, None, TextFormat::default()) }; use crate::mitm::store::GenerationParams; let gp = GenerationParams { temperature: body.temperature, top_p: body.top_p, top_k: None, max_output_tokens: body.max_output_tokens, stop_sequences: None, frequency_penalty: None, presence_penalty: None, reasoning_effort: body.reasoning_effort.clone(), response_mime_type, response_schema, google_search: has_web_search, }; let generation_params = if gp.temperature.is_some() || gp.top_p.is_some() || gp.max_output_tokens.is_some() || gp.reasoning_effort.is_some() || gp.response_mime_type.is_some() || gp.response_schema.is_some() || gp.google_search { Some(gp) } else { None }; let response_id = format!("resp_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); // Always create a new cascade for every request let cascade_id = match state.backend.create_cascade().await { Ok(cid) => cid, Err(e) => { return err_response( StatusCode::BAD_GATEWAY, format!("StartCascade failed: {e}"), "server_error", ); } }; // Image for MITM injection let pending_image = image.as_ref().map(|img| { use base64::Engine; crate::mitm::store::PendingImage { base64_data: base64::engine::general_purpose::STANDARD.encode(&img.data), mime_type: img.mime_type.clone(), } }); // Build event channel — always created for MITM response path let (tx, rx) = tokio::sync::mpsc::channel(64); let (mitm_rx, event_tx) = (Some(rx), tx); // Build tool rounds now that cascade_id is known let mut tool_rounds: Vec = Vec::new(); if is_tool_result_turn && !pending_tool_results.is_empty() { // Get last captured function calls from the previous request context let last_calls = state .mitm_store .take_function_calls(&cascade_id) .await .unwrap_or_default(); tool_rounds.push(crate::mitm::store::ToolRound { calls: last_calls, results: pending_tool_results.clone(), }); } // Start debug trace let trace = state .trace .start(&cascade_id, "POST /v1/responses", model.name, body.stream); if let Some(ref t) = trace { t.set_client_request(crate::trace::ClientRequestSummary { message_count: if is_tool_result_turn { 0 } else { 1 }, tool_count: body.tools.as_ref().map_or(0, |t| t.len()), tool_round_count: tool_rounds.len(), user_text_len: user_text.len(), user_text_preview: user_text.chars().take(200).collect(), system_prompt: body.instructions.is_some(), has_image: image.is_some(), }) .await; t.start_turn().await; } let mitm_gate = std::sync::Arc::new(tokio::sync::Notify::new()); let mitm_gate_clone = mitm_gate.clone(); state .mitm_store .register_request(crate::mitm::store::RequestContext { cascade_id: cascade_id.clone(), pending_user_text: user_text.clone(), event_channel: event_tx, generation_params, pending_image, tools, tool_config, pending_tool_results, tool_rounds, last_function_calls: Vec::new(), call_id_to_name: std::collections::HashMap::new(), created_at: std::time::Instant::now(), gate: mitm_gate_clone, trace_handle: trace.clone(), trace_turn: 0, }) .await; // Send REAL user text to LS match state .backend .send_message_with_image( &cascade_id, &format!(".", cascade_id), model.model_enum, image.as_ref(), ) .await { Ok((200, _)) => { let bg = Arc::clone(&state.backend); let cid = cascade_id.clone(); tokio::spawn(async move { let _ = bg.update_annotations(&cid).await; }); } Ok((status, _)) => { state.mitm_store.remove_request(&cascade_id).await; return err_response( StatusCode::BAD_GATEWAY, format!("Antigravity returned {status}"), "server_error", ); } Err(e) => { state.mitm_store.remove_request(&cascade_id).await; return err_response( StatusCode::BAD_GATEWAY, format!("Send message failed: {e}"), "server_error", ); } } // Wait for MITM gate: 5s → 502 if MITM enabled let gate_start = std::time::Instant::now(); let gate_matched = tokio::time::timeout(std::time::Duration::from_secs(5), mitm_gate.notified()).await; let gate_wait_ms = gate_start.elapsed().as_millis() as u64; if gate_matched.is_err() { if state.mitm_enabled { state.mitm_store.remove_request(&cascade_id).await; if let Some(ref t) = trace { t.record_error("MITM gate timeout (5s)".to_string()).await; t.finish("mitm_timeout").await; } return err_response( StatusCode::BAD_GATEWAY, "MITM proxy did not match request within 5s".to_string(), "mitm_timeout", ); } warn!(cascade = %cascade_id, "MITM gate timeout (--no-mitm mode)"); } else { debug!(cascade = %cascade_id, gate_wait_ms, "MITM gate signaled — request matched"); if let Some(ref t) = trace { t.record_mitm_match(0, gate_wait_ms).await; } } // Capture request params for response building let req_params = RequestParams { user_text: user_text.clone(), instructions: body.instructions.clone(), store: body.store, temperature: body.temperature.unwrap_or(1.0), top_p: body.top_p.unwrap_or(1.0), max_output_tokens: body.max_output_tokens, previous_response_id: body.previous_response_id.clone(), user: body.user.clone(), metadata: body.metadata.clone().unwrap_or(serde_json::json!({})), max_tool_calls: body.max_tool_calls, reasoning_effort: body.reasoning_effort.clone(), tool_choice: body .tool_choice .clone() .unwrap_or(serde_json::json!("auto")), tools: body.tools.clone().unwrap_or_default(), text_format, }; if body.stream { handle_responses_stream( state, response_id, model_name.to_string(), cascade_id, body.timeout, req_params, mitm_rx, trace, ) .await } else { handle_responses_sync( state, response_id, model_name.to_string(), cascade_id, body.timeout, req_params, mitm_rx, trace, ) .await } } /// Captured request parameters needed to echo back in the response. struct RequestParams { user_text: String, instructions: Option, store: bool, temperature: f64, top_p: f64, max_output_tokens: Option, previous_response_id: Option, user: Option, metadata: serde_json::Value, max_tool_calls: Option, reasoning_effort: Option, tool_choice: serde_json::Value, tools: Vec, text_format: TextFormat, } /// Build Usage from the best available source, and extract thinking text from MITM: /// 1. MITM intercepted data (real API tokens, including cache stats + thinking text) /// 2. LS trajectory data (real tokens, no cache info) /// 3. Estimation from text lengths (fallback) /// /// Returns (Usage, Option). The LS strips thinking text from steps, /// so we capture it from the raw MITM-intercepted API response. async fn usage_from_poll( mitm_store: &crate::mitm::store::MitmStore, cascade_id: &str, model_usage: &Option, input_text: &str, output_text: &str, ) -> (Usage, Option) { // Priority 1: MITM intercepted data (most accurate — includes cache tokens + thinking text) // Try exact cascade_id match first, then fall back to "_latest" (unmatched). // // Race condition: The LS makes TWO Google API calls for thinking models: // Call 1: response + thinking token count (recorded first) // Call 2: thinking summary text (merged into Call 1 by the store) // We may read the usage after Call 1 but before Call 2 arrives. // If we see thinking tokens but no text, wait briefly for the merge. let keys_to_try: Vec<&str> = vec![cascade_id, "_latest"]; let mut mitm_usage = None; for key in &keys_to_try { if let Some(u) = mitm_store.peek_usage(key).await { if u.thinking_output_tokens > 0 && u.thinking_text.is_none() { // Call 2 hasn't arrived yet — wait briefly for the merge tracing::debug!( "MITM: thinking tokens found but no text, waiting for summary merge..." ); for _ in 0..10 { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; if let Some(u2) = mitm_store.peek_usage(key).await { if u2.thinking_text.is_some() { break; } } } } mitm_usage = mitm_store.take_usage(key).await; break; } } if let Some(mitm_usage) = mitm_usage { tracing::debug!( input = mitm_usage.input_tokens, output = mitm_usage.output_tokens, cache_read = mitm_usage.cache_read_input_tokens, cache_create = mitm_usage.cache_creation_input_tokens, thinking = mitm_usage.thinking_output_tokens, thinking_text_len = mitm_usage.thinking_text.as_ref().map_or(0, |t| t.len()), "Using MITM intercepted usage" ); let thinking_text = mitm_usage.thinking_text; let usage = Usage { input_tokens: mitm_usage.input_tokens, input_tokens_details: InputTokensDetails { cached_tokens: mitm_usage.cache_read_input_tokens, }, output_tokens: mitm_usage.output_tokens, output_tokens_details: OutputTokensDetails { reasoning_tokens: mitm_usage.thinking_output_tokens, }, total_tokens: mitm_usage.input_tokens + mitm_usage.output_tokens, }; return (usage, thinking_text); } // Priority 2: LS trajectory data (from CHECKPOINT/metadata steps) if let Some(u) = model_usage { return ( Usage { input_tokens: u.input_tokens, input_tokens_details: InputTokensDetails { cached_tokens: 0 }, output_tokens: u.output_tokens, output_tokens_details: OutputTokensDetails { reasoning_tokens: 0, }, total_tokens: u.input_tokens + u.output_tokens, }, None, ); } // Priority 3: Estimate from text lengths (Usage::estimate(input_text, output_text), None) } // ─── Sync response ─────────────────────────────────────────────────────────── #[allow(clippy::too_many_arguments)] async fn handle_responses_sync( state: Arc, response_id: String, model_name: String, cascade_id: String, timeout: u64, params: RequestParams, mitm_rx: Option>, trace: Option, ) -> axum::response::Response { let created_at = now_unix(); // Clear stale captured response and upstream errors (only if no pre-installed channel) if mitm_rx.is_none() { state.mitm_store.clear_response_async().await; state.mitm_store.clear_upstream_error().await; } // ── MITM bypass: channel-based pipeline when custom tools active ── if let Some(mut rx) = mitm_rx { let start = std::time::Instant::now(); let mut acc_text = String::new(); let mut acc_thinking: Option = None; let mut _last_usage: Option = None; while let Some(event) = tokio::time::timeout( std::time::Duration::from_secs(timeout.saturating_sub(start.elapsed().as_secs())), rx.recv(), ) .await .ok() .flatten() { use crate::mitm::store::MitmEvent; match event { MitmEvent::ThinkingDelta(t) => { acc_thinking = Some(t); } MitmEvent::TextDelta(t) => { acc_text = t; } MitmEvent::Usage(u) => { _last_usage = Some(u); } MitmEvent::Grounding(_) => {} // stored by proxy directly MitmEvent::FunctionCall(raw_calls) => { let calls: Vec<_> = if let Some(max) = params.max_tool_calls { raw_calls.iter().take(max as usize).collect() } else { raw_calls.iter().collect() }; let mut output_items: Vec = Vec::new(); for fc in &calls { let call_id = format!( "call_{}", &uuid::Uuid::new_v4().to_string().replace('-', "")[..24] ); state .mitm_store .register_call_id(&cascade_id, call_id.clone(), fc.name.clone()) .await; let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); output_items .push(build_function_call_output(&call_id, &fc.name, &arguments)); } let (usage, _) = usage_from_poll( &state.mitm_store, &cascade_id, &None, ¶ms.user_text, "", ) .await; state.mitm_store.remove_request(&cascade_id).await; // Record trace before usage is moved if let Some(ref t) = trace { let fc_summaries: Vec = calls .iter() .map(|fc| crate::trace::FunctionCallSummary { name: fc.name.clone(), args_preview: serde_json::to_string(&fc.args) .unwrap_or_default() .chars() .take(200) .collect(), }) .collect(); t.record_response( 0, crate::trace::ResponseSummary { text_len: 0, thinking_len: 0, text_preview: String::new(), finish_reason: Some("tool_calls".to_string()), function_calls: fc_summaries, grounding: false, }, ) .await; t.set_usage(crate::trace::TrackedUsage { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens, thinking_tokens: usage.output_tokens_details.reasoning_tokens, cache_read: usage.input_tokens_details.cached_tokens, }) .await; t.finish("tool_call").await; } let resp = build_response_object( ResponseData { id: response_id, model: model_name, status: "completed", created_at, completed_at: Some(now_unix()), output: output_items, usage: Some(usage), thinking_signature: None, }, ¶ms, ); return Json(resp).into_response(); } MitmEvent::ResponseComplete => { if acc_text.is_empty() && acc_thinking.is_none() { // Empty response — continue waiting continue; } if acc_text.is_empty() && acc_thinking.is_some() { // Thinking-only — LS needs to make a follow-up request. // Reinstall channel and unblock gate. let (new_tx, new_rx) = tokio::sync::mpsc::channel(64); state.mitm_store.set_channel(&cascade_id, new_tx).await; let _ = state.mitm_store.take_any_function_calls().await; rx = new_rx; debug!( "Responses sync: thinking-only — new channel for follow-up, thinking_len={}", acc_thinking.as_ref().map(|t| t.len()).unwrap_or(0) ); continue; } let (usage, _) = usage_from_poll( &state.mitm_store, &cascade_id, &None, ¶ms.user_text, &acc_text, ) .await; state.mitm_store.remove_request(&cascade_id).await; let mut output_items: Vec = Vec::new(); if let Some(ref t) = acc_thinking { output_items.push(build_reasoning_output(t)); } let msg_id = format!("msg_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); output_items.push(build_message_output(&msg_id, &acc_text)); // Record trace before usage is moved if let Some(ref t) = trace { t.record_response( 0, crate::trace::ResponseSummary { text_len: acc_text.len(), thinking_len: acc_thinking.as_ref().map_or(0, |s| s.len()), text_preview: acc_text.chars().take(200).collect(), finish_reason: Some("stop".to_string()), function_calls: Vec::new(), grounding: false, }, ) .await; t.set_usage(crate::trace::TrackedUsage { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens, thinking_tokens: usage.output_tokens_details.reasoning_tokens, cache_read: usage.input_tokens_details.cached_tokens, }) .await; t.finish("completed").await; } let resp = build_response_object( ResponseData { id: response_id, model: model_name, status: "completed", created_at, completed_at: Some(now_unix()), output: output_items, usage: Some(usage), thinking_signature: None, }, ¶ms, ); return Json(resp).into_response(); } MitmEvent::UpstreamError(err) => { state.mitm_store.remove_request(&cascade_id).await; if let Some(ref t) = trace { t.record_error(format!( "Upstream: {}", err.message.as_deref().unwrap_or("unknown") )) .await; t.finish("upstream_error").await; } return upstream_err_response(&err); } } } // Timeout state.mitm_store.remove_request(&cascade_id).await; if let Some(ref t) = trace { t.record_error(format!("Timeout: {}s", timeout)).await; t.finish("timeout").await; } return err_response( StatusCode::GATEWAY_TIMEOUT, format!("Timeout: no response from Google API after {timeout}s"), "upstream_error", ); } // ── Normal LS path (no custom tools) ── let poll_result = poll_for_response(&state, &cascade_id, timeout).await; if let Some(ref err) = poll_result.upstream_error { return upstream_err_response(err); } let completed_at = now_unix(); let msg_id = format!("msg_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); // Check for captured function calls from MITM (clears the active flag) let captured_tool_calls = state.mitm_store.take_function_calls(&cascade_id).await; // Enforce max_tool_calls limit let captured_tool_calls = captured_tool_calls.map(|mut calls| { if let Some(max) = params.max_tool_calls { calls.truncate(max as usize); } calls }); // If we have captured tool calls, return them as function_call output items if let Some(ref calls) = captured_tool_calls { info!( count = calls.len(), tools = ?calls.iter().map(|c| &c.name).collect::>(), "Returning captured function calls to client" ); let mut output_items: Vec = Vec::new(); for fc in calls { let call_id = format!( "call_{}", &uuid::Uuid::new_v4().to_string().replace('-', "")[..24] ); // Register call_id → name mapping for tool result routing state .mitm_store .register_call_id(&cascade_id, call_id.clone(), fc.name.clone()) .await; // Stringify args (OpenAI sends arguments as JSON string) let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); output_items.push(build_function_call_output(&call_id, &fc.name, &arguments)); } let (usage, _) = usage_from_poll( &state.mitm_store, &cascade_id, &poll_result.usage, ¶ms.user_text, &poll_result.text, ) .await; // Record trace before usage is moved if let Some(ref t) = trace { let fc_summaries: Vec = calls .iter() .map(|fc| crate::trace::FunctionCallSummary { name: fc.name.clone(), args_preview: serde_json::to_string(&fc.args) .unwrap_or_default() .chars() .take(200) .collect(), }) .collect(); t.record_response( 0, crate::trace::ResponseSummary { text_len: poll_result.text.len(), thinking_len: poll_result.thinking.as_ref().map_or(0, |s| s.len()), text_preview: String::new(), finish_reason: Some("tool_calls".to_string()), function_calls: fc_summaries, grounding: false, }, ) .await; t.set_usage(crate::trace::TrackedUsage { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens, thinking_tokens: usage.output_tokens_details.reasoning_tokens, cache_read: usage.input_tokens_details.cached_tokens, }) .await; t.finish("tool_call").await; } let resp = build_response_object( ResponseData { id: response_id, model: model_name, status: "completed", created_at, completed_at: Some(completed_at), output: output_items, usage: Some(usage), thinking_signature: poll_result.thinking_signature, }, ¶ms, ); return Json(resp).into_response(); } // Normal text response (no tool calls) let (usage, mitm_thinking) = usage_from_poll( &state.mitm_store, &cascade_id, &poll_result.usage, ¶ms.user_text, &poll_result.text, ) .await; // Thinking text priority: MITM-captured (raw API) > LS-extracted (steps) let thinking_text = mitm_thinking.or(poll_result.thinking); // Build output array: [reasoning (if present), message] let mut output_items: Vec = Vec::new(); if let Some(ref thinking) = thinking_text { output_items.push(build_reasoning_output(thinking)); } output_items.push(build_message_output(&msg_id, &poll_result.text)); // Record trace before usage is moved if let Some(ref t) = trace { t.record_response( 0, crate::trace::ResponseSummary { text_len: poll_result.text.len(), thinking_len: thinking_text.as_ref().map_or(0, |s| s.len()), text_preview: poll_result.text.chars().take(200).collect(), finish_reason: Some("stop".to_string()), function_calls: Vec::new(), grounding: false, }, ) .await; t.set_usage(crate::trace::TrackedUsage { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens, thinking_tokens: usage.output_tokens_details.reasoning_tokens, cache_read: usage.input_tokens_details.cached_tokens, }) .await; t.finish("completed").await; } let resp = build_response_object( ResponseData { id: response_id, model: model_name, status: "completed", created_at, completed_at: Some(completed_at), output: output_items, usage: Some(usage), thinking_signature: poll_result.thinking_signature, }, ¶ms, ); Json(resp).into_response() } // ─── Streaming response ───────────────────────────────────────────────────── #[allow(clippy::too_many_arguments)] async fn handle_responses_stream( state: Arc, response_id: String, model_name: String, cascade_id: String, timeout: u64, params: RequestParams, mitm_rx: Option>, trace: Option, ) -> axum::response::Response { let stream = async_stream::stream! { let msg_id = format!("msg_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); let created_at = now_unix(); let seq = AtomicU32::new(0); let next_seq = || seq.fetch_add(1, Ordering::Relaxed); const CONTENT_IDX: u32 = 0; // Build the in-progress response shell (no output yet) let in_progress_resp = build_response_object( ResponseData { id: response_id.clone(), model: model_name.clone(), status: "in_progress", created_at, completed_at: None, output: vec![], usage: None, thinking_signature: None, }, ¶ms, ); let resp_json = response_to_json(&in_progress_resp); // 1. response.created yield Ok::<_, std::convert::Infallible>(responses_sse_event( "response.created", serde_json::json!({ "type": "response.created", "sequence_number": next_seq(), "response": resp_json, }), )); // 2. response.in_progress yield Ok(responses_sse_event( "response.in_progress", serde_json::json!({ "type": "response.in_progress", "sequence_number": next_seq(), "response": resp_json, }), )); // ── Stream cascade updates: event-driven instead of timer-based polling ── let start = std::time::Instant::now(); let mut last_text = String::new(); let mut thinking_emitted = false; let mut thinking_text: Option = None; let mut message_started = false; let reasoning_id = format!("rs_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); // Clear stale response (only if no pre-installed channel) if mitm_rx.is_none() { state.mitm_store.clear_response_async().await; state.mitm_store.clear_upstream_error().await; } // ── MITM bypass mode (when custom tools are active) ── // Channel-based pipeline: read events directly from MITM proxy. // Channel is pre-installed before send_message to avoid race conditions. if let Some(mut rx) = mitm_rx { let mut last_thinking = String::new(); while let Some(event) = tokio::time::timeout( std::time::Duration::from_secs(timeout.saturating_sub(start.elapsed().as_secs())), rx.recv(), ).await.ok().flatten() { use crate::mitm::store::MitmEvent; match event { MitmEvent::ThinkingDelta(full_thinking) => { if !thinking_emitted && full_thinking.len() > last_thinking.len() { // First thinking text — emit reasoning output_item.added if last_thinking.is_empty() { yield Ok(responses_sse_event( "response.output_item.added", serde_json::json!({ "type": "response.output_item.added", "sequence_number": next_seq(), "output_index": 0, "item": { "id": &reasoning_id, "type": "reasoning", "summary": [], }, }), )); yield Ok(responses_sse_event( "response.reasoning_summary_part.added", serde_json::json!({ "type": "response.reasoning_summary_part.added", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "part": { "type": "summary_text", "text": "" }, }), )); } let delta = &full_thinking[last_thinking.len()..]; if !delta.is_empty() { yield Ok(responses_sse_event( "response.reasoning_summary_text.delta", serde_json::json!({ "type": "response.reasoning_summary_text.delta", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "delta": delta, }), )); } last_thinking = full_thinking; } } MitmEvent::TextDelta(full_text) => { if full_text.len() > last_text.len() { // Finalize thinking if started but not done if !thinking_emitted && !last_thinking.is_empty() { thinking_emitted = true; thinking_text = Some(last_thinking.clone()); yield Ok(responses_sse_event( "response.reasoning_summary_text.done", serde_json::json!({ "type": "response.reasoning_summary_text.done", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "text": &last_thinking, }), )); yield Ok(responses_sse_event( "response.reasoning_summary_part.done", serde_json::json!({ "type": "response.reasoning_summary_part.done", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "part": { "type": "summary_text", "text": &last_thinking }, }), )); yield Ok(responses_sse_event( "response.output_item.done", serde_json::json!({ "type": "response.output_item.done", "sequence_number": next_seq(), "output_index": 0, "item": { "id": &reasoning_id, "type": "reasoning", "summary": [{ "type": "summary_text", "text": &last_thinking, }], }, }), )); } let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; if !message_started { message_started = true; yield Ok(responses_sse_event( "response.output_item.added", serde_json::json!({ "type": "response.output_item.added", "sequence_number": next_seq(), "output_index": msg_output_index, "item": build_message_output_in_progress(&msg_id), }), )); yield Ok(responses_sse_event( "response.content_part.added", serde_json::json!({ "type": "response.content_part.added", "sequence_number": next_seq(), "output_index": msg_output_index, "content_index": CONTENT_IDX, "part": { "type": "output_text", "text": "", "annotations": [], } }), )); } let delta = &full_text[last_text.len()..]; if !delta.is_empty() { let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; yield Ok(responses_sse_event( "response.output_text.delta", serde_json::json!({ "type": "response.output_text.delta", "sequence_number": next_seq(), "item_id": &msg_id, "output_index": msg_output_index, "content_index": CONTENT_IDX, "delta": delta, }), )); last_text = full_text; } } } MitmEvent::FunctionCall(raw_calls) => { let calls: Vec<_> = if let Some(max) = params.max_tool_calls { raw_calls.iter().take(max as usize).collect() } else { raw_calls.iter().collect() }; let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; for (i, fc) in calls.iter().enumerate() { let call_id = format!( "call_{}", &uuid::Uuid::new_v4().to_string().replace('-', "")[..24] ); let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); state.mitm_store.register_call_id(&cascade_id, call_id.clone(), fc.name.clone()).await; let fc_item_id = format!("fc_{}", uuid::Uuid::new_v4().to_string().replace('-', "")); yield Ok(responses_sse_event( "response.output_item.added", serde_json::json!({ "type": "response.output_item.added", "sequence_number": next_seq(), "output_index": msg_output_index as usize + i, "item": { "id": &fc_item_id, "type": "function_call", "call_id": &call_id, "name": &fc.name, "arguments": &arguments, "status": "completed", }, }), )); yield Ok(responses_sse_event( "response.output_item.done", serde_json::json!({ "type": "response.output_item.done", "sequence_number": next_seq(), "output_index": msg_output_index as usize + i, "item": { "id": &fc_item_id, "type": "function_call", "call_id": &call_id, "name": &fc.name, "arguments": &arguments, "status": "completed", }, }), )); } // Build output for final response let mut output_items: Vec = Vec::new(); for fc in &calls { let call_id = format!( "call_{}", &uuid::Uuid::new_v4().to_string().replace('-', "")[..24] ); let arguments = serde_json::to_string(&fc.args).unwrap_or_default(); output_items.push(build_function_call_output(&call_id, &fc.name, &arguments)); } let (usage, _) = usage_from_poll( &state.mitm_store, &cascade_id, &None, ¶ms.user_text, "", ).await; // Save trace usage before move let trace_usage = crate::trace::TrackedUsage { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens, thinking_tokens: usage.output_tokens_details.reasoning_tokens, cache_read: usage.input_tokens_details.cached_tokens, }; let final_resp = build_response_object( ResponseData { id: response_id.clone(), model: model_name.clone(), status: "completed", created_at, completed_at: Some(now_unix()), output: output_items, usage: Some(usage), thinking_signature: None, }, ¶ms, ); yield Ok(responses_sse_event( "response.completed", serde_json::json!({ "type": "response.completed", "sequence_number": next_seq(), "response": response_to_json(&final_resp), }), )); if let Some(ref t) = trace { let fc_summaries: Vec = calls.iter().map(|fc| crate::trace::FunctionCallSummary { name: fc.name.clone(), args_preview: serde_json::to_string(&fc.args).unwrap_or_default().chars().take(200).collect(), }).collect(); t.record_response(0, crate::trace::ResponseSummary { text_len: 0, thinking_len: last_thinking.len(), text_preview: String::new(), finish_reason: Some("tool_calls".to_string()), function_calls: fc_summaries, grounding: false, }).await; t.set_usage(trace_usage).await; t.finish("tool_call").await; } state.mitm_store.remove_request(&cascade_id).await; return; } MitmEvent::ResponseComplete => { if !last_text.is_empty() { let msg_idx: u32 = if thinking_emitted { 1 } else { 0 }; let (usage, _) = usage_from_poll( &state.mitm_store, &cascade_id, &None, ¶ms.user_text, &last_text, ).await; let tc = thinking_text.clone(); for evt in completion_events( &response_id, &model_name, &msg_id, &reasoning_id, msg_idx, CONTENT_IDX, &last_text, usage, created_at, &seq, ¶ms, None, tc, ) { yield Ok(evt); } if let Some(ref t) = trace { t.record_response(0, crate::trace::ResponseSummary { text_len: last_text.len(), thinking_len: thinking_text.as_ref().map_or(0, |s| s.len()), text_preview: last_text.chars().take(200).collect(), finish_reason: Some("stop".to_string()), function_calls: Vec::new(), grounding: false, }).await; t.finish("completed").await; } state.mitm_store.remove_request(&cascade_id).await; return; } else if !last_thinking.is_empty() { // Thinking-only response — LS needs follow-up API calls. // Create a new channel and unblock the gate. let (new_tx, new_rx) = tokio::sync::mpsc::channel(64); state.mitm_store.set_channel(&cascade_id, new_tx).await; let _ = state.mitm_store.take_any_function_calls().await; rx = new_rx; debug!( "Responses stream: thinking-only — new channel for follow-up, thinking_len={}", last_thinking.len() ); } // ResponseComplete with no text and no thinking — continue waiting } MitmEvent::UpstreamError(err) => { let error_msg = super::util::upstream_error_message(&err); let error_type = super::util::upstream_error_type(&err); yield Ok(responses_sse_event( "response.failed", serde_json::json!({ "type": "response.failed", "sequence_number": next_seq(), "response": { "id": &response_id, "status": "failed", "error": { "type": error_type, "message": error_msg, "code": err.status, }, }, }), )); if let Some(ref t) = trace { t.record_error(format!("Upstream: {}", error_msg)).await; t.finish("upstream_error").await; } state.mitm_store.remove_request(&cascade_id).await; return; } MitmEvent::Usage(_) | MitmEvent::Grounding(_) => { // Usage/grounding stored by proxy, consumed via usage_from_poll } } } // Timeout in channel mode state.mitm_store.remove_request(&cascade_id).await; yield Ok(responses_sse_event( "response.failed", serde_json::json!({ "type": "response.failed", "sequence_number": next_seq(), "response": { "id": &response_id, "status": "failed", "error": { "type": "upstream_error", "message": format!("Timeout: no response from Google API after {timeout}s"), "code": 504, }, }, }), )); if let Some(ref t) = trace { t.record_error(format!("Timeout: {timeout}s")).await; t.finish("timeout").await; } return; } // ── Normal LS path (no custom tools) ── // Try to open a reactive streaming connection for real-time notifications. let mut reactive_rx = match state.backend.stream_cascade_updates(&cascade_id).await { Ok(rx) => { debug!("Using reactive streaming for cascade updates"); Some(rx) } Err(e) => { debug!("Reactive streaming unavailable, falling back to polling: {e}"); None } }; let mut thinking_started = false; let mut thinking_done = false; let mut last_thinking_len: usize = 0; while start.elapsed().as_secs() < timeout { // Check for upstream errors from MITM (Google API errors) if let Some(err) = state.mitm_store.take_upstream_error().await { let error_msg = super::util::upstream_error_message(&err); let error_type = super::util::upstream_error_type(&err); yield Ok(responses_sse_event( "response.failed", serde_json::json!({ "type": "response.failed", "sequence_number": next_seq(), "response": { "id": &response_id, "status": "failed", "error": { "type": error_type, "message": error_msg, "code": err.status, }, }, }), )); break; } if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await { if status == 200 { if let Some(steps) = data["steps"].as_array() { // ── Phase 1: Stream thinking deltas progressively ── if let Some(tc) = extract_thinking_content(steps) { if !thinking_started { // First time we see thinking — emit structure events thinking_started = true; yield Ok(responses_sse_event( "response.output_item.added", serde_json::json!({ "type": "response.output_item.added", "sequence_number": next_seq(), "output_index": 0, "item": { "id": &reasoning_id, "type": "reasoning", "summary": [], }, }), )); yield Ok(responses_sse_event( "response.reasoning_summary_part.added", serde_json::json!({ "type": "response.reasoning_summary_part.added", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "part": { "type": "summary_text", "text": "" }, }), )); } // Emit delta if thinking text has grown if tc.len() > last_thinking_len { let delta = &tc[last_thinking_len..]; last_thinking_len = tc.len(); thinking_text = Some(tc.clone()); yield Ok(responses_sse_event( "response.reasoning_summary_text.delta", serde_json::json!({ "type": "response.reasoning_summary_text.delta", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "delta": delta, }), )); } } // ── Phase 2: Stream text deltas ── let text = extract_response_text(steps); let msg_output_index: u32 = if thinking_started { 1 } else { 0 }; if !text.is_empty() && text != last_text { // Finalize thinking when response text first appears if thinking_started && !thinking_done { thinking_done = true; let final_thinking = thinking_text.clone().unwrap_or_default(); yield Ok(responses_sse_event( "response.reasoning_summary_text.done", serde_json::json!({ "type": "response.reasoning_summary_text.done", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "text": &final_thinking, }), )); yield Ok(responses_sse_event( "response.reasoning_summary_part.done", serde_json::json!({ "type": "response.reasoning_summary_part.done", "sequence_number": next_seq(), "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, "part": { "type": "summary_text", "text": &final_thinking }, }), )); yield Ok(responses_sse_event( "response.output_item.done", serde_json::json!({ "type": "response.output_item.done", "sequence_number": next_seq(), "output_index": 0, "item": { "id": &reasoning_id, "type": "reasoning", "summary": [{ "type": "summary_text", "text": &final_thinking, }], }, }), )); } // Emit message output_item.added on first text if !message_started { message_started = true; yield Ok(responses_sse_event( "response.output_item.added", serde_json::json!({ "type": "response.output_item.added", "sequence_number": next_seq(), "output_index": msg_output_index, "item": build_message_output_in_progress(&msg_id), }), )); yield Ok(responses_sse_event( "response.content_part.added", serde_json::json!({ "type": "response.content_part.added", "sequence_number": next_seq(), "output_index": msg_output_index, "content_index": CONTENT_IDX, "part": { "type": "output_text", "text": "", "annotations": [], } }), )); } let new_content = if text.len() > last_text.len() && text.starts_with(&*last_text) { &text[last_text.len()..] } else { &text }; if !new_content.is_empty() { yield Ok(responses_sse_event( "response.output_text.delta", serde_json::json!({ "type": "response.output_text.delta", "sequence_number": next_seq(), "item_id": &msg_id, "output_index": msg_output_index, "content_index": CONTENT_IDX, "delta": new_content, }), )); last_text = text.to_string(); } } // ── Check completion ── if is_response_done(steps) && !last_text.is_empty() { debug!("Response done, text length={}", last_text.len()); let mu = extract_model_usage(steps); let msg_idx: u32 = if thinking_started { 1 } else { 0 }; let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); // Use already-captured thinking, or MITM thinking, or LS thinking let tc = thinking_text.clone() .or(mitm_thinking) .or_else(|| extract_thinking_content(steps)); for evt in completion_events( &response_id, &model_name, &msg_id, &reasoning_id, msg_idx, CONTENT_IDX, &last_text, usage, created_at, &seq, ¶ms, ts, tc, ) { yield Ok(evt); } return; } // IDLE fallback let step_count = steps.len(); if step_count > 4 && step_count % 5 == 0 { if let Ok((ts, td)) = state.backend.get_trajectory(&cascade_id).await { if ts == 200 { let run_status = td["status"].as_str().unwrap_or(""); if run_status.contains("IDLE") && !last_text.is_empty() { debug!("Trajectory IDLE, text length={}", last_text.len()); let mu = extract_model_usage(steps); let msg_idx: u32 = if thinking_started { 1 } else { 0 }; let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); let tc = thinking_text.clone() .or(mitm_thinking) .or_else(|| extract_thinking_content(steps)); for evt in completion_events( &response_id, &model_name, &msg_id, &reasoning_id, msg_idx, CONTENT_IDX, &last_text, usage, created_at, &seq, ¶ms, ts, tc, ) { yield Ok(evt); } return; } } } } } } } // Wait for next update: either reactive notification or fallback timer match reactive_rx { Some(ref mut rx) => { // Wait for reactive notification with a safety timeout let timeout = tokio::time::timeout( tokio::time::Duration::from_millis(500), rx.recv(), ).await; match timeout { Ok(Some(_diff)) => { // Drain any additional queued notifications (coalesce) while rx.try_recv().is_ok() {} } Ok(None) => { // Stream closed — fall back to polling debug!("Reactive stream closed, falling back to polling"); reactive_rx = None; } Err(_) => {} // timeout — fetch anyway as safety net } } None => { // Fallback: timer-based polling let poll_ms: u64 = rand::thread_rng().gen_range(150..250); tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; } } } // Timeout — emit error, not fake incomplete response yield Ok(responses_sse_event( "response.failed", serde_json::json!({ "type": "response.failed", "sequence_number": next_seq(), "response": { "id": &response_id, "status": "failed", "error": { "type": "upstream_error", "message": format!("Timeout: no response from Google API after {timeout}s"), "code": 504, }, }, }), )); }; Sse::new(stream) .keep_alive( axum::response::sse::KeepAlive::new() .interval(std::time::Duration::from_secs(15)) .text(""), ) .into_response() } // ─── SSE completion events ─────────────────────────────────────────────────── /// Build the final SSE events at completion time. /// /// Reasoning events were already streamed during polling (when thinking /// appeared in LS steps before response text). Message output_item.added /// and content_part.added were also emitted when text first appeared. /// /// This function emits only the "done" events plus the final response.completed. #[allow(clippy::too_many_arguments)] fn completion_events( resp_id: &str, model: &str, msg_id: &str, reasoning_id: &str, msg_output_index: u32, content_idx: u32, text: &str, usage: Usage, created_at: u64, seq: &AtomicU32, params: &RequestParams, thinking_signature: Option, thinking: Option, ) -> Vec { let next_seq = || seq.fetch_add(1, Ordering::Relaxed); let completed_at = now_unix(); let output_item = build_message_output(msg_id, text); // Build output array: [reasoning (if present), message] let mut output_items: Vec = Vec::new(); if let Some(ref thinking_text) = thinking { output_items.push(serde_json::json!({ "id": reasoning_id, "type": "reasoning", "summary": [{ "type": "summary_text", "text": thinking_text, }], })); } output_items.push(build_message_output(msg_id, text)); let completed_resp = build_response_object( ResponseData { id: resp_id.to_string(), model: model.to_string(), status: "completed", created_at, completed_at: Some(completed_at), output: output_items, usage: Some(usage), thinking_signature, }, params, ); let mut events: Vec = Vec::new(); // Message done events events.push(responses_sse_event( "response.output_text.done", serde_json::json!({ "type": "response.output_text.done", "sequence_number": next_seq(), "item_id": msg_id, "output_index": msg_output_index, "content_index": content_idx, "text": text, }), )); events.push(responses_sse_event( "response.content_part.done", serde_json::json!({ "type": "response.content_part.done", "sequence_number": next_seq(), "output_index": msg_output_index, "content_index": content_idx, "part": { "type": "output_text", "text": text, "annotations": [] }, }), )); events.push(responses_sse_event( "response.output_item.done", serde_json::json!({ "type": "response.output_item.done", "sequence_number": next_seq(), "output_index": msg_output_index, "item": output_item, }), )); events.push(responses_sse_event( "response.completed", serde_json::json!({ "type": "response.completed", "sequence_number": next_seq(), "response": response_to_json(&completed_resp), }), )); events }