From b1bd57ab5edb5946bcd73a55698ba516acc4f44b Mon Sep 17 00:00:00 2001 From: Nikketryhard Date: Sun, 15 Feb 2026 14:23:05 -0600 Subject: [PATCH] feat: forward generation params via MITM + add usageMetadata to Gemini - Add GenerationParams struct to MitmStore for temperature, top_p, top_k, max_output_tokens, stop_sequences, frequency/presence_penalty - MITM modify_request injects params into request.generationConfig - All 3 endpoints (Completions, Responses, Gemini) store client params - Add usageMetadata to Gemini sync responses (promptTokenCount, candidatesTokenCount, totalTokenCount, thoughtsTokenCount) - Add generation param fields to GeminiRequest (temperature, topP, etc.) - Completions stream_options.include_usage emits final usage chunk - Completions reasoning_tokens in completion_tokens_details - Update endpoint gap analysis doc (all high-priority gaps resolved) --- KNOWN_ISSUES.md | 26 +- docs/endpoint-gap-analysis.md | 464 ++++++++++++++++++++++++++++++++++ src/api/completions.rs | 242 +++++++++++++++++- src/api/gemini.rs | 327 +++++++++++++++++++++++- src/api/responses.rs | 72 ++++-- src/api/types.rs | 32 +++ src/mitm/modify.rs | 58 +++++ src/mitm/proxy.rs | 4 +- src/mitm/store.rs | 37 +++ 9 files changed, 1216 insertions(+), 46 deletions(-) create mode 100644 docs/endpoint-gap-analysis.md diff --git a/KNOWN_ISSUES.md b/KNOWN_ISSUES.md index 24c2c64..f76de83 100644 --- a/KNOWN_ISSUES.md +++ b/KNOWN_ISSUES.md @@ -72,11 +72,27 @@ heuristic hint is absent, properly correlating usage to cascades. **Status: SOLVED (2026-02-15)** -The MITM proxy now captures `thinking_text` from `StreamingAccumulator` into -`MitmStore` as SSE chunks arrive. The Responses API streaming handler reads -thinking deltas from MitmStore and emits `response.reasoning_summary_text.delta` -events in real-time. This works for both Google (`thought: true` parts) and -Anthropic (`thinking_delta`) formats. +Thinking text now streams progressively as delta events. The implementation: + +1. **LS cascade steps** — `plannerResponse.thinking` (field 3) grows progressively + as the LS receives data. For Opus 4.6, thinking text builds up word-by-word + over ~1-2s. For Gemini Flash, thinking arrives in 1-2 larger chunks. +2. **Delta tracking** — `last_thinking_len` tracks the previously emitted length. + Each poll compares current thinking length and emits only the new characters + as `response.reasoning_summary_text.delta` events. +3. **Lifecycle** — Structure events (`output_item.added`, `summary_part.added`) + emit on first thinking appearance. `done` events emit when response text + first appears (indicating thinking phase completed). + +**Verified with Opus 4.6:** (2026-02-15 13:22 UTC) + +``` +delta_len=24 "The user is asking about" +delta_len=61 " the Collatz conjecture..." +delta_len=5 " This" +delta_len=10 " is a pure" +... (11 progressive deltas over ~850ms) +``` --- diff --git a/docs/endpoint-gap-analysis.md b/docs/endpoint-gap-analysis.md new file mode 100644 index 0000000..dd1928f --- /dev/null +++ b/docs/endpoint-gap-analysis.md @@ -0,0 +1,464 @@ +# Endpoint Gap Analysis + +> **Generated:** 2026-02-15 (updated) +> **Proxy Version:** 3.1.0 +> **Scope:** All three API endpoints vs official OpenAI / Gemini specifications + +--- + +## Table of Contents + +- [Endpoint Overview](#endpoint-overview) +- [Feature Parity Matrix](#feature-parity-matrix) +- [Detailed Endpoint Analysis](#detailed-endpoint-analysis) + - [Responses API (`/v1/responses`)](#responses-api-v1responses) + - [Chat Completions API (`/v1/chat/completions`)](#chat-completions-api-v1chatcompletions) + - [Gemini API (`/v1/gemini`)](#gemini-api-v1gemini) +- [Priority Gaps](#priority-gaps) +- [Architecture Notes](#architecture-notes) + +--- + +## Endpoint Overview + +The proxy exposes three main API endpoints, each serving different client ecosystems: + +| Endpoint | Protocol | Primary Clients | Spec Reference | +| --------------------------- | --------------------------- | ----------------------------------------------------- | ------------------------------------------------------------------------------------------------------------ | +| `POST /v1/responses` | OpenAI Responses API | Claude Code, Antigravity-native clients | [platform.openai.com/docs/api-reference/responses](https://platform.openai.com/docs/api-reference/responses) | +| `POST /v1/chat/completions` | OpenAI Chat Completions API | OpenCode, Vercel AI SDK, any OpenAI-compatible client | [platform.openai.com/docs/api-reference/chat](https://platform.openai.com/docs/api-reference/chat/create) | +| `POST /v1/gemini` | Custom Gemini-native API | Direct Gemini-format consumers | [ai.google.dev/api](https://ai.google.dev/api) (loosely based) | + +All three endpoints share the same backend pipeline: + +``` +Client Request → Proxy Endpoint → LS (Language Server) → Google API + ↓ + MITM Proxy (captures real usage + injects generation params + tool calls) +``` + +--- + +## Feature Parity Matrix + +### Core Features + +| Feature | Responses | Completions | Gemini | +| -------------------- | :-------: | :---------: | :----: | +| Sync mode | ✅ | ✅ | ✅ | +| Streaming mode (SSE) | ✅ | ✅ | ✅ | +| Model selection | ✅ | ✅ | ✅ | +| Model validation | ✅ | ✅ | ✅ | +| Auth check (OAuth) | ✅ | ✅ | ✅ | +| Timeout control | ✅ | ✅ | ✅ | + +### Generation Parameters (MITM-injected) + +| Feature | Responses | Completions | Gemini | +| ------------------- | :-------: | :---------: | :----: | +| `temperature` | ✅ | ✅ | ✅ | +| `top_p` / `topP` | ✅ | ✅ | ✅ | +| `top_k` / `topK` | ❌ | ❌ | ✅ | +| `max_output_tokens` | ✅ | ✅ | ✅ | +| `stop_sequences` | ❌ | ❌ | ✅ | +| `frequency_penalty` | ❌ | ✅ | ❌ | +| `presence_penalty` | ❌ | ✅ | ❌ | + +> **Note:** All generation parameters are forwarded to Google's API via MITM injection into `request.generationConfig`. They override the LS defaults. + +### Thinking / Reasoning + +| Feature | Responses | Completions | Gemini | +| ---------------------------------- | :-------------------------------: | :-------------------------------: | :---------------------: | +| Thinking — LS path (streaming) | ✅ `reasoning_summary_text.delta` | ✅ `reasoning_content` delta | ✅ `thought: true` part | +| Thinking — LS path (sync) | ✅ `reasoning` output item | ✅ `reasoning_content` in message | ✅ `thought: true` part | +| Thinking — Bypass path (streaming) | ✅ | ✅ | ✅ | +| Thinking — Bypass path (sync) | ✅ | ✅ | ✅ | +| Thinking signature (multi-turn) | ✅ `thinking_signature` field | ❌ Not applicable | ❌ Not applicable | + +### Tool Calls + +| Feature | Responses | Completions | Gemini | +| ---------------------------- | :-----------------------------: | :------------------------: | :-------------------------------------: | +| Tool definitions input | ✅ OpenAI format → Gemini | ✅ OpenAI format → Gemini | ✅ Native Gemini format | +| Tool choice control | ✅ `tool_choice` | ✅ `tool_choice` | ✅ `tool_config` | +| Tool call output (streaming) | ✅ `function_call` items | ✅ `tool_calls` in delta | ✅ `functionCall` parts | +| Tool call output (sync) | ✅ `function_call` items | ✅ `tool_calls` in message | ✅ `functionCall` parts | +| Tool result input | ✅ `function_call_output` items | ✅ `tool` role messages | ✅ `functionResponse` in `tool_results` | +| MITM bypass (custom tools) | ✅ | ✅ | ✅ | +| Stale state protection | ✅ | ✅ | ✅ | + +### Session Management + +| Feature | Responses | Completions | Gemini | +| ------------------------------------ | :---------------------: | :--------------: | :---------------------: | +| Session/conversation reuse | ✅ `conversation` field | ❌ Not supported | ✅ `conversation` field | +| Session listing (`GET /v1/sessions`) | ✅ Shared | ✅ Shared | ✅ Shared | +| Session deletion | ✅ Shared | ✅ Shared | ✅ Shared | + +### Usage / Token Tracking + +| Feature | Responses | Completions | Gemini | +| -------------------------------- | :---------------------------: | :-------------------------------: | :--------------------------: | +| Usage in sync response | ✅ MITM real tokens | ✅ MITM real tokens | ✅ `usageMetadata` | +| Usage in streaming (final chunk) | ❌ Not emitted | ✅ `stream_options.include_usage` | ❌ Not emitted | +| `reasoning_tokens` in usage | ✅ In `output_tokens_details` | ✅ In `completion_tokens_details` | ✅ `thoughtsTokenCount` | +| Cache tokens | ✅ `cached_tokens` | ✅ `cached_tokens` | ✅ `cachedContentTokenCount` | + +--- + +## Detailed Endpoint Analysis + +### Responses API (`/v1/responses`) + +**Spec:** [OpenAI Responses API](https://platform.openai.com/docs/api-reference/responses) + +#### Request Fields + +| Field | Spec | Status | Implementation Details | +| ---------------------------- | ------------- | :----: | -------------------------------------------------------------------------------------------------------------------------------- | +| `model` | Required | ✅ | Mapped to internal model enum via `lookup_model()` | +| `input` | Required | ✅ | String or array. Array supports `message` items and `function_call_output` items | +| `instructions` | Optional | ✅ | Prepended to user text as system instructions | +| `stream` | Optional | ✅ | SSE stream with `response.*` events | +| `tools` | Optional | ✅ | OpenAI function format → auto-converted to Gemini `functionDeclarations` via `openai_tools_to_gemini()` | +| `tool_choice` | Optional | ✅ | `"auto"`, `"required"`, `"none"`, or `{"type":"function","function":{"name":"X"}}` → converted to Gemini `functionCallingConfig` | +| `store` | Optional | ✅ | Accepted, echoed in response. Not actually persisted. | +| `temperature` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig` injection. | +| `top_p` | Optional | ✅ | **Forwarded** to Google via MITM. | +| `max_output_tokens` | Optional | ✅ | **Forwarded** to Google via MITM. | +| `previous_response_id` | Optional | ✅ | Accepted, echoed. Not used for chaining (use `conversation` instead). | +| `metadata` | Optional | ✅ | Accepted, echoed back in response. | +| `user` | Optional | ✅ | Accepted, echoed. | +| `conversation` | **Extension** | ✅ | Proxy-specific: session ID for multi-turn cascade reuse. | +| `timeout` | **Extension** | ✅ | Proxy-specific: request timeout in seconds (default 120). | +| `reasoning.effort` | Optional | ❌ | Could map to model variant selection (e.g., `"high"` → Opus, `"low"` → Flash). | +| `reasoning.generate_summary` | Optional | ❌ | Not implemented. Could control thinking output inclusion. | +| `truncation` | Optional | ❌ | Not applicable — LS manages context window. | +| `parallel_tool_calls` | Optional | ✅ | Hardcoded `true` in response. | + +#### Response Object + +| Field | Spec | Status | Notes | +| ---------------------- | ------------- | :----: | ---------------------------------------------------------------- | +| `id` | Required | ✅ | `resp_` + UUID | +| `object` | Required | ✅ | Always `"response"` | +| `created_at` | Required | ✅ | Unix timestamp | +| `status` | Required | ✅ | `"completed"` or `"incomplete"` | +| `completed_at` | Required | ✅ | Unix timestamp or null | +| `error` | Required | ✅ | null on success | +| `incomplete_details` | Required | ✅ | null | +| `instructions` | Required | ✅ | Echoed from request | +| `max_output_tokens` | Required | ✅ | Echoed or null | +| `model` | Required | ✅ | Model name string | +| `output` | Required | ✅ | Array of `reasoning` and/or `message` items | +| `parallel_tool_calls` | Required | ✅ | `true` | +| `previous_response_id` | Required | ✅ | Echoed or null | +| `reasoning` | Required | ✅ | `{effort: null, summary: null}` | +| `store` | Required | ✅ | Echoed | +| `temperature` | Required | ✅ | Echoed (default 1.0) | +| `text` | Required | ✅ | `{format: {type: "text"}}` | +| `tool_choice` | Required | ✅ | `"auto"` | +| `tools` | Required | ✅ | Echoed or `[]` | +| `top_p` | Required | ✅ | Echoed (default 1.0) | +| `truncation` | Required | ✅ | `"disabled"` | +| `usage` | Required | ✅ | MITM-intercepted real tokens when available, estimated otherwise | +| `user` | Required | ✅ | Echoed or null | +| `metadata` | Required | ✅ | Echoed or `{}` | +| `thinking_signature` | **Extension** | ✅ | Proxy-specific: opaque blob for multi-turn thinking chain | + +#### Streaming Events + +| Event | Spec | Status | Notes | +| ---------------------------------------- | --------- | :----: | ------------------------------ | +| `response.created` | Required | ✅ | Initial response shell | +| `response.in_progress` | Required | ✅ | | +| `response.output_item.added` | Required | ✅ | For reasoning + message items | +| `response.content_part.added` | Required | ✅ | | +| `response.output_text.delta` | Required | ✅ | Progressive text deltas | +| `response.output_text.done` | Required | ✅ | | +| `response.content_part.done` | Required | ✅ | | +| `response.output_item.done` | Required | ✅ | | +| `response.completed` | Required | ✅ | Final event with full response | +| `response.reasoning_summary_text.delta` | Required | ✅ | Progressive thinking deltas | +| `response.reasoning_summary_text.done` | Required | ✅ | | +| `response.function_call_arguments.delta` | For tools | ✅ | Tool call argument streaming | +| `response.function_call_arguments.done` | For tools | ✅ | | + +--- + +### Chat Completions API (`/v1/chat/completions`) + +**Spec:** [OpenAI Chat Completions API](https://platform.openai.com/docs/api-reference/chat/create) + +#### Request Fields + +| Field | Spec | Status | Implementation Details | +| ------------------------- | ------------- | :----: | -------------------------------------------------------------------- | +| `model` | Required | ✅ | Mapped to internal model enum | +| `messages` | Required | ✅ | Supports `system`, `developer`, `user`, `assistant`, `tool` roles | +| `messages[].content` | Required | ✅ | String or array of `{type: "text", text: "..."}` objects | +| `messages[].tool_calls` | Optional | ✅ | For assistant messages with tool calls | +| `messages[].tool_call_id` | Optional | ✅ | For tool result messages | +| `stream` | Optional | ✅ | SSE with `chat.completion.chunk` events | +| `stream_options` | Optional | ✅ | `include_usage: true` emits final usage chunk before `[DONE]` | +| `tools` | Optional | ✅ | OpenAI function format → auto-converted to Gemini | +| `tool_choice` | Optional | ✅ | `"auto"`, `"none"`, `"required"`, or specific function | +| `timeout` | **Extension** | ✅ | Proxy-specific (default 120s) | +| `temperature` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.temperature` | +| `top_p` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.topP` | +| `max_tokens` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.maxOutputTokens` | +| `max_completion_tokens` | Optional | ✅ | **Forwarded** (same as `max_tokens`, newer OpenAI param) | +| `frequency_penalty` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.frequencyPenalty` | +| `presence_penalty` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.presencePenalty` | +| `user` | Optional | ✅ | Accepted, not used | +| `n` | Optional | ❌ | N/A — single generation only | +| `logprobs` | Optional | ❌ | N/A | +| `top_logprobs` | Optional | ❌ | N/A | +| `logit_bias` | Optional | ❌ | N/A | +| `response_format` | Optional | ❌ | Could be useful for JSON mode | +| `seed` | Optional | ❌ | N/A | +| `stop` | Optional | ❌ | Could be forwarded as `stopSequences` | + +#### Sync Response Object + +| Field | Spec | Status | Notes | +| ------------------------------------------------------------ | ----------- | :----: | -------------------------------------------- | +| `id` | Required | ✅ | `chatcmpl-` + UUID | +| `object` | Required | ✅ | `"chat.completion"` | +| `created` | Required | ✅ | Unix timestamp | +| `model` | Required | ✅ | Model name | +| `choices[0].index` | Required | ✅ | `0` | +| `choices[0].message.role` | Required | ✅ | `"assistant"` | +| `choices[0].message.content` | Required | ✅ | Response text | +| `choices[0].message.reasoning_content` | Extension | ✅ | Thinking text (when model produces thinking) | +| `choices[0].message.tool_calls` | Conditional | ✅ | When model returns tool calls | +| `choices[0].message.refusal` | Optional | ❌ | Not implemented | +| `choices[0].message.annotations` | Optional | ❌ | Not implemented | +| `choices[0].logprobs` | Optional | ❌ | Not implemented | +| `choices[0].finish_reason` | Required | ✅ | `"stop"` or `"tool_calls"` | +| `usage.prompt_tokens` | Required | ✅ | MITM real or estimated | +| `usage.completion_tokens` | Required | ✅ | MITM real or estimated | +| `usage.total_tokens` | Required | ✅ | Sum | +| `usage.prompt_tokens_details.cached_tokens` | Optional | ✅ | MITM cache read tokens | +| `usage.completion_tokens_details.reasoning_tokens` | Optional | ✅ | MITM thinking token count | +| `usage.completion_tokens_details.accepted_prediction_tokens` | Optional | ❌ | N/A | +| `usage.completion_tokens_details.rejected_prediction_tokens` | Optional | ❌ | N/A | +| `system_fingerprint` | Deprecated | ❌ | Cosmetic, not needed | +| `service_tier` | Optional | ❌ | Cosmetic, not needed | + +#### Streaming Chunk Object + +| Field | Spec | Status | Notes | +| ------------------------------------ | --------------- | :----: | ----------------------------------------------------- | +| `id` | Required | ✅ | Same across all chunks | +| `object` | Required | ✅ | `"chat.completion.chunk"` | +| `created` | Required | ✅ | Same across all chunks | +| `model` | Required | ✅ | | +| `choices[0].index` | Required | ✅ | `0` | +| `choices[0].delta.role` | First chunk | ✅ | `"assistant"` in first chunk | +| `choices[0].delta.content` | Text chunks | ✅ | Progressive text deltas | +| `choices[0].delta.reasoning_content` | Thinking chunks | ✅ | Progressive thinking deltas | +| `choices[0].delta.tool_calls` | Tool chunks | ✅ | Tool call data | +| `choices[0].delta` | Final chunk | ✅ | Empty `{}` | +| `choices[0].finish_reason` | Final chunk | ✅ | `"stop"` or `"tool_calls"` | +| `choices[0].logprobs` | Optional | ❌ | Not implemented | +| `usage` (final chunk) | Optional | ✅ | Emitted when `stream_options.include_usage` is `true` | +| `data: [DONE]` | Required | ✅ | Stream termination signal | + +--- + +### Gemini API (`/v1/gemini`) + +**Spec:** Custom endpoint loosely based on [Gemini REST API](https://ai.google.dev/api/generate-content) + +> **Note:** This is NOT a 1:1 Gemini API replica. It's a simplified proxy-native endpoint that uses Gemini's `functionDeclarations` / `functionCall` / `functionResponse` format directly, avoiding OpenAI ↔ Gemini format conversion overhead. + +#### Request Fields + +| Field | Spec | Status | Implementation Details | +| --------------------------------------- | -------- | :----: | --------------------------------------------------------------- | +| `model` | Required | ✅ | Mapped to internal model enum | +| `input` | Required | ✅ | String only (no array/multipart) | +| `tools` | Optional | ✅ | Native Gemini `[{functionDeclarations: [...]}]` format | +| `tool_config` | Optional | ✅ | Native Gemini `{functionCallingConfig: {mode: "AUTO"}}` | +| `tool_results` | Optional | ✅ | Array of `{functionResponse: {name, response}}` | +| `conversation` | Optional | ✅ | Session ID for cascade reuse | +| `stream` | Optional | ✅ | SSE streaming | +| `timeout` | Optional | ✅ | Default 120s | +| `temperature` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.temperature` | +| `top_p` / `topP` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.topP` | +| `top_k` / `topK` | Optional | ✅ | **Forwarded** to Google via MITM `generationConfig.topK` | +| `max_output_tokens` / `maxOutputTokens` | Optional | ✅ | **Forwarded** via MITM `generationConfig.maxOutputTokens` | +| `stop_sequences` / `stopSequences` | Optional | ✅ | **Forwarded** via MITM `generationConfig.stopSequences` | + +#### Sync Response Object + +| Field | Spec | Status | Notes | +| -------------------------------------------- | --------- | :----: | -------------------------------- | +| `candidates[0].content.parts` | Required | ✅ | Array of text/functionCall parts | +| `candidates[0].content.parts[].text` | Text | ✅ | Response text | +| `candidates[0].content.parts[].thought` | Extension | ✅ | `true` for thinking parts | +| `candidates[0].content.parts[].functionCall` | Tool call | ✅ | `{name, args}` | +| `candidates[0].content.role` | Required | ✅ | `"model"` | +| `candidates[0].finishReason` | Required | ✅ | `"STOP"` | +| `modelVersion` | Required | ✅ | Model name string | +| `usageMetadata` | Optional | ✅ | MITM-intercepted token counts | + +`usageMetadata` fields: + +| Field | Status | Notes | +| ------------------------- | :----: | ------------------------- | +| `promptTokenCount` | ✅ | Input tokens | +| `candidatesTokenCount` | ✅ | Output tokens | +| `totalTokenCount` | ✅ | Input + output | +| `thoughtsTokenCount` | ✅ | Thinking/reasoning tokens | +| `cachedContentTokenCount` | ✅ | Cache read tokens | + +#### Streaming Format + +Each SSE `data:` chunk is a complete Gemini-format JSON object with progressive `candidates[0].content.parts`: + +``` +data: {"candidates":[{"content":{"parts":[{"text":"thinking...","thought":true}],"role":"model"}}],"modelVersion":"opus-4.6"} + +data: {"candidates":[{"content":{"parts":[{"text":"Hello!"}],"role":"model"}}],"modelVersion":"opus-4.6"} + +data: {"candidates":[{"content":{"parts":[{"text":""}],"role":"model"},"finishReason":"STOP"}],"modelVersion":"opus-4.6"} + +data: [DONE] +``` + +--- + +## Priority Gaps + +### 🔴 High Priority — RESOLVED ✅ + +All high-priority gaps have been addressed: + +1. ~~**Completions: `stream_options.include_usage`**~~ → ✅ Implemented +2. ~~**Completions: `completion_tokens_details.reasoning_tokens`**~~ → ✅ Implemented +3. ~~**Completions: Accept `temperature`, `top_p`, `max_tokens`**~~ → ✅ Forwarded via MITM +4. ~~**Gemini: `usageMetadata`**~~ → ✅ Implemented + +### 🟡 Medium Priority + +5. **Responses: `reasoning.effort`** + - **What:** Map reasoning effort levels (`"high"`, `"medium"`, `"low"`) to model variant selection + - **Why:** Could automatically select Opus vs Flash based on reasoning needs + - **Effort:** Medium — needs model selection logic changes + +6. **Completions: Session/conversation support** + - **What:** Add session reuse similar to Responses and Gemini endpoints + - **Why:** Would allow multi-turn conversations via the completions API + - **Effort:** Medium — need a way to pass session ID (maybe via `user` field or custom header) + +7. **Completions: `stop` sequences** + - **What:** Forward `stop` to Google as `stopSequences` in `generationConfig` + - **Why:** Some clients use stop sequences to control generation + - **Effort:** Trivial — just add to `CompletionRequest` and `GenerationParams` + +8. **Completions: `response_format` (JSON mode)** + - **What:** Forward `response_format: {"type": "json_object"}` to Google's `responseMimeType` + - **Why:** Useful for structured output + - **Effort:** Low — inject `responseMimeType: "application/json"` in generationConfig + +### 🟢 Low Priority + +Cosmetic or not applicable to our architecture: + +9. **`system_fingerprint`** — OpenAI-specific field, meaningless for our proxy +10. **`service_tier`** — OpenAI billing concept, not applicable +11. **`n` > 1** — Multiple completions per request; our backend only generates one +12. **`logprobs`** — Would require token-level access we don't have +13. **`seed`** — Deterministic sampling not controllable through our proxy + +--- + +## Architecture Notes + +### Generation Parameter Injection + +Client-specified sampling parameters are forwarded to Google's API via the MITM request modification pipeline: + +``` +Client sends temperature=0.5 → API handler stores in MitmStore.generation_params + ↓ + LS sends request to Google API + ↓ + MITM intercepts request + ↓ + modify_request() reads generation_params + ↓ + Injects into request.generationConfig: + temperature, topP, topK, maxOutputTokens, + stopSequences, frequencyPenalty, presencePenalty + ↓ + Forwards modified request to Google +``` + +This approach overrides whatever defaults the LS sets, giving clients direct control over sampling parameters. + +### Dual Path Architecture + +All three endpoints share a dual-path architecture: + +``` + ┌─────────────────┐ + │ Has custom │ +Request ────────────► │ tools? │ + └────────┬────────┘ + │ + ┌──── Yes ──┴── No ────┐ + │ │ + ┌─────▼─────┐ ┌─────▼─────┐ + │ MITM │ │ LS Steps │ + │ Bypass │ │ Polling │ + │ Path │ │ Path │ + └─────┬─────┘ └─────┬─────┘ + │ │ + ┌─────▼─────┐ ┌─────▼─────┐ + │ Poll │ │ Poll │ + │ MitmStore │ │ get_steps │ + │ directly │ │ from LS │ + └─────┬─────┘ └─────┬─────┘ + │ │ + └──────────┬────────────┘ + │ + ┌─────▼─────┐ + │ Response │ + │ to client │ + └───────────┘ +``` + +- **Bypass Path:** When custom tools are present, the handler polls `MitmStore` directly for response text, thinking text, and function calls. The MITM proxy captures these from the Google API response before the LS processes them. + +- **LS Path:** When no custom tools are present, the handler polls the LS's `get_steps` API for progressive response data (text, thinking, status). + +### Stale State Protection + +All bypass paths include protection against stale `response_complete` flags from previous requests: + +```rust +if complete && text.is_empty() && thinking.is_none() { + warn!("stale response_complete detected — clearing"); + state.mitm_store.clear_response_async().await; + continue; // or retry +} +``` + +This handles the race condition where a previous request's MITM handler calls `mark_response_complete()` after the new request has already called `clear_response_async()`. + +### Tool Format Conversion + +``` +OpenAI tools ──► openai_tools_to_gemini() ──► Gemini functionDeclarations + │ + MitmStore.set_tools() + │ + MITM proxy injects into + outgoing LS request +``` + +The Gemini endpoint skips this conversion entirely — tools are stored in native Gemini format. diff --git a/src/api/completions.rs b/src/api/completions.rs index 5669790..0b8d547 100644 --- a/src/api/completions.rs +++ b/src/api/completions.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use tracing::{debug, info, warn}; use super::models::{lookup_model, DEFAULT_MODEL, MODELS}; -use super::polling::{extract_response_text, is_response_done, poll_for_response}; +use super::polling::{extract_response_text, extract_thinking_content, is_response_done, poll_for_response}; use super::types::*; use super::util::{err_response, now_unix}; use super::AppState; @@ -176,6 +176,28 @@ pub(crate) async fn handle_completions( } state.mitm_store.clear_active_function_call(); + // Store generation parameters for MITM injection + { + use crate::mitm::store::GenerationParams; + let gp = GenerationParams { + temperature: body.temperature, + top_p: body.top_p, + top_k: None, // OpenAI doesn't have top_k + max_output_tokens: body.max_tokens.or(body.max_completion_tokens), + stop_sequences: None, // TODO: body.stop + frequency_penalty: body.frequency_penalty, + presence_penalty: body.presence_penalty, + }; + // Only store if at least one param is set + if gp.temperature.is_some() || gp.top_p.is_some() || gp.max_output_tokens.is_some() + || gp.frequency_penalty.is_some() || gp.presence_penalty.is_some() + { + state.mitm_store.set_generation_params(gp).await; + } else { + state.mitm_store.clear_generation_params().await; + } + } + let token = state.backend.oauth_token().await; if token.is_empty() { return err_response( @@ -241,6 +263,8 @@ pub(crate) async fn handle_completions( uuid::Uuid::new_v4().to_string().replace('-', "") ); + let include_usage = body.stream_options.as_ref().map_or(false, |o| o.include_usage); + if body.stream { chat_completions_stream( state, @@ -248,6 +272,7 @@ pub(crate) async fn handle_completions( model_name.to_string(), cascade_id, body.timeout, + include_usage, ) .await } else { @@ -271,6 +296,7 @@ async fn chat_completions_stream( model_name: String, cascade_id: String, timeout: u64, + include_usage: bool, ) -> axum::response::Response { let stream = async_stream::stream! { let start = std::time::Instant::now(); @@ -294,6 +320,7 @@ async fn chat_completions_stream( })).unwrap_or_default())); let mut keepalive_counter: u64 = 0; + let mut last_thinking_len: usize = 0; while start.elapsed().as_secs() < timeout { // ── Check for MITM-captured function calls FIRST ── @@ -342,6 +369,27 @@ async fn chat_completions_stream( "finish_reason": "tool_calls", }], })).unwrap_or_default())); + if include_usage { + let mitm = state.mitm_store.take_usage(&cascade_id).await + .or(state.mitm_store.take_usage("_latest").await); + let (pt, ct, crt, tt) = if let Some(ref u) = mitm { + (u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens) + } else { (0, 0, 0, 0) }; + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [], + "usage": { + "prompt_tokens": pt, + "completion_tokens": ct, + "total_tokens": pt + ct, + "prompt_tokens_details": { "cached_tokens": crt }, + "completion_tokens_details": { "reasoning_tokens": tt }, + }, + })).unwrap_or_default())); + } yield Ok(Event::default().data("[DONE]")); return; } @@ -349,7 +397,37 @@ async fn chat_completions_stream( // ── Check for MITM-captured response text (bypass LS) ── if has_custom_tools { - if let Some(text) = state.mitm_store.peek_response_text().await { + let peek = state.mitm_store.peek_response_text().await; + let complete = state.mitm_store.is_response_complete(); + let has_fc = state.mitm_store.has_active_function_call(); + if keepalive_counter % 10 == 0 || peek.is_some() || complete || has_fc { + debug!( + "Completions bypass poll: peek={}, complete={}, has_fc={}, last_text_len={}", + peek.as_ref().map(|t| t.len()).unwrap_or(0), + complete, has_fc, last_text.len() + ); + } + // Stream thinking text as reasoning_content deltas (MITM bypass) + if let Some(tc) = state.mitm_store.peek_thinking_text().await { + if tc.len() > last_thinking_len { + let delta = &tc[last_thinking_len..]; + last_thinking_len = tc.len(); + + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [{ + "index": 0, + "delta": {"reasoning_content": delta}, + "finish_reason": serde_json::Value::Null, + }], + })).unwrap_or_default())); + } + } + + if let Some(text) = peek { if !text.is_empty() && text != last_text { let delta = if text.len() > last_text.len() && text.starts_with(&*last_text) { text[last_text.len()..].to_string() @@ -387,12 +465,33 @@ async fn chat_completions_stream( "finish_reason": "stop", }], })).unwrap_or_default())); + if include_usage { + let mitm = state.mitm_store.take_usage(&cascade_id).await + .or(state.mitm_store.take_usage("_latest").await); + let (pt, ct, crt, tt) = if let Some(ref u) = mitm { + (u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens) + } else { (0, 0, 0, 0) }; + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [], + "usage": { + "prompt_tokens": pt, + "completion_tokens": ct, + "total_tokens": pt + ct, + "prompt_tokens_details": { "cached_tokens": crt }, + "completion_tokens_details": { "reasoning_tokens": tt }, + }, + })).unwrap_or_default())); + } yield Ok(Event::default().data("[DONE]")); return; } - } else if state.mitm_store.is_response_complete() { - // Response complete but no text — might be a tool call we already handled - // or an empty response. Give it a moment then bail. + } else if complete { + // Response complete but no text — might be a tool call arriving shortly, + // stale state from a previous request, or an empty response. tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; // Re-check function calls one more time let final_check = state.mitm_store.take_any_function_calls().await; @@ -437,10 +536,35 @@ async fn chat_completions_stream( "finish_reason": "tool_calls", }], })).unwrap_or_default())); + if include_usage { + let mitm = state.mitm_store.take_usage(&cascade_id).await + .or(state.mitm_store.take_usage("_latest").await); + let (pt, ct, crt, tt) = if let Some(ref u) = mitm { + (u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens) + } else { (0, 0, 0, 0) }; + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [], + "usage": { + "prompt_tokens": pt, + "completion_tokens": ct, + "total_tokens": pt + ct, + "prompt_tokens_details": { "cached_tokens": crt }, + "completion_tokens_details": { "reasoning_tokens": tt }, + }, + })).unwrap_or_default())); + } yield Ok(Event::default().data("[DONE]")); return; } } + // No text and no function calls but complete=true: stale state. + // Clear the flag so we wait for the real response from this request. + warn!("Completions: stale response_complete detected (no text, no FC) — clearing"); + state.mitm_store.clear_response_async().await; } // When using bypass mode, skip LS step polling @@ -457,6 +581,26 @@ async fn chat_completions_stream( if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await { if status == 200 { if let Some(steps) = data["steps"].as_array() { + // Stream thinking deltas (reasoning_content) + if let Some(tc) = extract_thinking_content(steps) { + if tc.len() > last_thinking_len { + let delta = &tc[last_thinking_len..]; + last_thinking_len = tc.len(); + + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [{ + "index": 0, + "delta": {"reasoning_content": delta}, + "finish_reason": serde_json::Value::Null, + }], + })).unwrap_or_default())); + } + } + let text = extract_response_text(steps); if !text.is_empty() && text != last_text { @@ -496,6 +640,27 @@ async fn chat_completions_stream( "finish_reason": "stop", }], })).unwrap_or_default())); + if include_usage { + let mitm = state.mitm_store.take_usage(&cascade_id).await + .or(state.mitm_store.take_usage("_latest").await); + let (pt, ct, crt, tt) = if let Some(ref u) = mitm { + (u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens) + } else { (0, 0, 0, 0) }; + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [], + "usage": { + "prompt_tokens": pt, + "completion_tokens": ct, + "total_tokens": pt + ct, + "prompt_tokens_details": { "cached_tokens": crt }, + "completion_tokens_details": { "reasoning_tokens": tt }, + }, + })).unwrap_or_default())); + } yield Ok(Event::default().data("[DONE]")); return; } @@ -519,6 +684,27 @@ async fn chat_completions_stream( "finish_reason": "stop", }], })).unwrap_or_default())); + if include_usage { + let mitm = state.mitm_store.take_usage(&cascade_id).await + .or(state.mitm_store.take_usage("_latest").await); + let (pt, ct, crt, tt) = if let Some(ref u) = mitm { + (u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens) + } else { (0, 0, 0, 0) }; + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [], + "usage": { + "prompt_tokens": pt, + "completion_tokens": ct, + "total_tokens": pt + ct, + "prompt_tokens_details": { "cached_tokens": crt }, + "completion_tokens_details": { "reasoning_tokens": tt }, + }, + })).unwrap_or_default())); + } yield Ok(Event::default().data("[DONE]")); return; } @@ -553,6 +739,27 @@ async fn chat_completions_stream( "finish_reason": "stop", }], })).unwrap_or_default())); + if include_usage { + let mitm = state.mitm_store.take_usage(&cascade_id).await + .or(state.mitm_store.take_usage("_latest").await); + let (pt, ct, crt, tt) = if let Some(ref u) = mitm { + (u.input_tokens, u.output_tokens, u.cache_read_input_tokens, u.thinking_output_tokens) + } else { (0, 0, 0, 0) }; + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "id": completion_id, + "object": "chat.completion.chunk", + "created": now_unix(), + "model": model_name, + "choices": [], + "usage": { + "prompt_tokens": pt, + "completion_tokens": ct, + "total_tokens": pt + ct, + "prompt_tokens_details": { "cached_tokens": crt }, + "completion_tokens_details": { "reasoning_tokens": tt }, + }, + })).unwrap_or_default())); + } yield Ok(Event::default().data("[DONE]")); }; @@ -582,14 +789,23 @@ async fn chat_completions_sync( Some(u) => Some(u), None => state.mitm_store.take_usage("_latest").await, }; - let (prompt_tokens, completion_tokens, cached_tokens) = if let Some(mitm_usage) = mitm { - (mitm_usage.input_tokens, mitm_usage.output_tokens, mitm_usage.cache_read_input_tokens) + let (prompt_tokens, completion_tokens, cached_tokens, thinking_tokens) = if let Some(mitm_usage) = mitm { + (mitm_usage.input_tokens, mitm_usage.output_tokens, mitm_usage.cache_read_input_tokens, mitm_usage.thinking_output_tokens) } else if let Some(u) = &result.usage { - (u.input_tokens, u.output_tokens, 0) + (u.input_tokens, u.output_tokens, 0, 0) } else { - (0, 0, 0) + (0, 0, 0, 0) }; + // Build message object, including reasoning_content if thinking is present + let mut message = serde_json::json!({ + "role": "assistant", + "content": result.text, + }); + if let Some(ref thinking) = result.thinking { + message["reasoning_content"] = serde_json::json!(thinking); + } + Json(serde_json::json!({ "id": completion_id, "object": "chat.completion", @@ -597,10 +813,7 @@ async fn chat_completions_sync( "model": model_name, "choices": [{ "index": 0, - "message": { - "role": "assistant", - "content": result.text, - }, + "message": message, "finish_reason": "stop", }], "usage": { @@ -610,6 +823,9 @@ async fn chat_completions_sync( "prompt_tokens_details": { "cached_tokens": cached_tokens, }, + "completion_tokens_details": { + "reasoning_tokens": thinking_tokens, + }, }, })) .into_response() diff --git a/src/api/gemini.rs b/src/api/gemini.rs index a83a8b7..e1e7b09 100644 --- a/src/api/gemini.rs +++ b/src/api/gemini.rs @@ -3,17 +3,19 @@ //! Accepts tools in Gemini `functionDeclarations` format directly, //! returns `functionCall` in Gemini format directly. //! No OpenAI ↔ Gemini format conversion. +//! Supports both sync (`stream: false`) and streaming (`stream: true`) modes. use axum::{ extract::State, http::StatusCode, - response::{IntoResponse, Json}, + response::{sse::Event, IntoResponse, Json, Sse}, }; +use rand::Rng; use std::sync::Arc; -use tracing::info; +use tracing::{info, warn}; use super::models::{lookup_model, DEFAULT_MODEL, MODELS}; -use super::polling::poll_for_response; +use super::polling::{extract_response_text, extract_thinking_content, is_response_done, poll_for_response}; use super::util::err_response; use super::AppState; use crate::mitm::store::PendingToolResult; @@ -40,6 +42,21 @@ pub(crate) struct GeminiRequest { /// Tool results in Gemini format: [{"functionResponse": {"name": "...", "response": {...}}}] #[serde(default)] pub tool_results: Option>, + /// Sampling temperature. + #[serde(default)] + pub temperature: Option, + /// Nucleus sampling (topP). + #[serde(default, alias = "topP")] + pub top_p: Option, + /// Top-K sampling. + #[serde(default, alias = "topK")] + pub top_k: Option, + /// Max output tokens. + #[serde(default, alias = "maxOutputTokens")] + pub max_output_tokens: Option, + /// Stop sequences. + #[serde(default, alias = "stopSequences")] + pub stop_sequences: Option>, } fn default_timeout() -> u64 { @@ -54,6 +71,30 @@ fn extract_conversation_id(conv: &Option) -> Option { } } +/// Build Gemini-format usageMetadata from MITM store. +async fn build_usage_metadata( + store: &crate::mitm::store::MitmStore, + cascade_id: &str, +) -> serde_json::Value { + let usage = store.take_usage(cascade_id).await + .or(store.take_usage("_latest").await); + if let Some(usage) = usage { + serde_json::json!({ + "promptTokenCount": usage.input_tokens, + "candidatesTokenCount": usage.output_tokens, + "totalTokenCount": usage.input_tokens + usage.output_tokens, + "thoughtsTokenCount": usage.thinking_output_tokens, + "cachedContentTokenCount": usage.cache_read_input_tokens, + }) + } else { + serde_json::json!({ + "promptTokenCount": 0, + "candidatesTokenCount": 0, + "totalTokenCount": 0, + }) + } +} + pub(crate) async fn handle_gemini( State(state): State>, Json(body): Json, @@ -124,6 +165,27 @@ pub(crate) async fn handle_gemini( info!(count = results.len(), "Stored Gemini-native tool results for MITM injection"); } + // Store generation parameters for MITM injection + { + use crate::mitm::store::GenerationParams; + let gp = GenerationParams { + temperature: body.temperature, + top_p: body.top_p, + top_k: body.top_k, + max_output_tokens: body.max_output_tokens, + stop_sequences: body.stop_sequences.clone(), + frequency_penalty: None, + presence_penalty: None, + }; + if gp.temperature.is_some() || gp.top_p.is_some() || gp.top_k.is_some() + || gp.max_output_tokens.is_some() || gp.stop_sequences.is_some() + { + state.mitm_store.set_generation_params(gp).await; + } else { + state.mitm_store.clear_generation_params().await; + } + } + // Session/conversation management let session_id_str = extract_conversation_id(&body.conversation); let cascade_id = if let Some(ref sid) = session_id_str { @@ -184,6 +246,24 @@ pub(crate) async fn handle_gemini( } } + // Dispatch to sync or stream + let model_name = model_name.to_string(); + let timeout = body.timeout; + if body.stream { + gemini_stream(state, model_name, cascade_id, timeout).await + } else { + gemini_sync(state, model_name, cascade_id, timeout).await + } +} + +// ─── Sync ──────────────────────────────────────────────────────────────────── + +async fn gemini_sync( + state: Arc, + model_name: String, + cascade_id: String, + timeout: u64, +) -> axum::response::Response { let has_custom_tools = state.mitm_store.get_tools().await.is_some(); // Clear stale response @@ -192,7 +272,7 @@ pub(crate) async fn handle_gemini( // ── MITM bypass: when tools active, poll MitmStore directly ── if has_custom_tools { let start = std::time::Instant::now(); - while start.elapsed().as_secs() < body.timeout { + while start.elapsed().as_secs() < timeout { // Check for function calls let captured = state.mitm_store.take_any_function_calls().await; if let Some(ref calls) = captured { @@ -226,15 +306,32 @@ pub(crate) async fn handle_gemini( // Check for completed text response if state.mitm_store.is_response_complete() { let text = state.mitm_store.take_response_text().await.unwrap_or_default(); + let thinking = state.mitm_store.take_thinking_text().await; + + // Guard against stale response_complete with no data + if text.is_empty() && thinking.is_none() { + warn!("Gemini sync bypass: stale response_complete — clearing"); + state.mitm_store.clear_response_async().await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + continue; + } + + let mut parts: Vec = Vec::new(); + if let Some(ref t) = thinking { + parts.push(serde_json::json!({"text": t, "thought": true})); + } + parts.push(serde_json::json!({"text": text})); + return Json(serde_json::json!({ "candidates": [{ "content": { - "parts": [{"text": text}], + "parts": parts, "role": "model", }, "finishReason": "STOP", }], "modelVersion": model_name, + "usageMetadata": build_usage_metadata(&state.mitm_store, &cascade_id).await, })) .into_response(); } @@ -253,8 +350,7 @@ pub(crate) async fn handle_gemini( } // ── Normal LS path (no custom tools) ── - // Poll for response - let poll_result = poll_for_response(&state, &cascade_id, body.timeout).await; + let poll_result = poll_for_response(&state, &cascade_id, timeout).await; // Check for captured function calls — return in Gemini format let captured_tool_calls = state.mitm_store.take_any_function_calls().await; @@ -287,20 +383,233 @@ pub(crate) async fn handle_gemini( "finishReason": "STOP", }], "modelVersion": model_name, + "usageMetadata": build_usage_metadata(&state.mitm_store, &cascade_id).await, })) .into_response(); } - // Normal text response + // Normal text response — include thinking if present + let mut parts: Vec = Vec::new(); + if let Some(ref thinking) = poll_result.thinking { + parts.push(serde_json::json!({"text": thinking, "thought": true})); + } + parts.push(serde_json::json!({"text": poll_result.text})); + Json(serde_json::json!({ "candidates": [{ "content": { - "parts": [{"text": poll_result.text}], + "parts": parts, "role": "model", }, "finishReason": "STOP", }], "modelVersion": model_name, + "usageMetadata": build_usage_metadata(&state.mitm_store, &cascade_id).await, })) .into_response() } + +// ─── Streaming ─────────────────────────────────────────────────────────────── + +async fn gemini_stream( + state: Arc, + model_name: String, + cascade_id: String, + timeout: u64, +) -> axum::response::Response { + let stream = async_stream::stream! { + let start = std::time::Instant::now(); + let mut last_text = String::new(); + let mut last_thinking = String::new(); + let has_custom_tools = state.mitm_store.get_tools().await.is_some(); + + // Clear stale response + state.mitm_store.clear_response_async().await; + + while start.elapsed().as_secs() < timeout { + // ── Check for MITM-captured function calls FIRST ── + let captured = state.mitm_store.take_any_function_calls().await; + if let Some(ref calls) = captured { + if !calls.is_empty() { + let parts: Vec = calls + .iter() + .map(|fc| { + serde_json::json!({ + "functionCall": { + "name": fc.name, + "args": fc.args, + } + }) + }) + .collect(); + + yield Ok::<_, std::convert::Infallible>(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": parts, + "role": "model", + }, + "finishReason": "STOP", + }], + "modelVersion": model_name, + })).unwrap_or_default())); + yield Ok(Event::default().data("[DONE]")); + return; + } + } + + // ── MITM bypass path (custom tools active) ── + if has_custom_tools { + // Stream thinking + if let Some(tc) = state.mitm_store.peek_thinking_text().await { + if tc.len() > last_thinking.len() { + let delta = tc[last_thinking.len()..].to_string(); + last_thinking = tc; + + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": delta, "thought": true}], + "role": "model", + }, + }], + "modelVersion": model_name, + })).unwrap_or_default())); + } + } + + // Stream text + if let Some(text) = state.mitm_store.peek_response_text().await { + if !text.is_empty() && text.len() > last_text.len() { + let delta = &text[last_text.len()..]; + + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": delta}], + "role": "model", + }, + }], + "modelVersion": model_name, + })).unwrap_or_default())); + last_text = text; + } + } + + // Check completion + let complete = state.mitm_store.is_response_complete(); + if complete && !last_text.is_empty() { + // Final chunk with finishReason + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": ""}], + "role": "model", + }, + "finishReason": "STOP", + }], + "modelVersion": model_name, + })).unwrap_or_default())); + yield Ok(Event::default().data("[DONE]")); + return; + } else if complete && last_text.is_empty() && last_thinking.is_empty() { + // Stale state — clear and retry + warn!("Gemini stream bypass: stale response_complete — clearing"); + state.mitm_store.clear_response_async().await; + } + + let poll_ms: u64 = rand::thread_rng().gen_range(150..300); + tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; + continue; + } + + // ── Normal LS path (no custom tools) ── + if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await { + if status == 200 { + if let Some(steps) = data["steps"].as_array() { + // Stream thinking + if let Some(tc) = extract_thinking_content(steps) { + if tc.len() > last_thinking.len() { + let delta = tc[last_thinking.len()..].to_string(); + last_thinking = tc; + + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": delta, "thought": true}], + "role": "model", + }, + }], + "modelVersion": model_name, + })).unwrap_or_default())); + } + } + + // Stream text + let text = extract_response_text(steps); + if !text.is_empty() && text.len() > last_text.len() { + let delta = if text.starts_with(&*last_text) { + &text[last_text.len()..] + } else { + &text + }; + + if !delta.is_empty() { + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": delta}], + "role": "model", + }, + }], + "modelVersion": model_name, + })).unwrap_or_default())); + last_text = text.to_string(); + } + } + + // Done check + if is_response_done(steps) && !last_text.is_empty() { + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": ""}], + "role": "model", + }, + "finishReason": "STOP", + }], + "modelVersion": model_name, + })).unwrap_or_default())); + yield Ok(Event::default().data("[DONE]")); + return; + } + } + } + } + + let poll_ms: u64 = rand::thread_rng().gen_range(200..350); + tokio::time::sleep(tokio::time::Duration::from_millis(poll_ms)).await; + } + + // Timeout + yield Ok(Event::default().data(serde_json::to_string(&serde_json::json!({ + "candidates": [{ + "content": { + "parts": [{"text": if last_text.is_empty() { "[Timeout]" } else { "" }}], + "role": "model", + }, + "finishReason": "STOP", + }], + "modelVersion": model_name, + })).unwrap_or_default())); + yield Ok(Event::default().data("[DONE]")); + }; + + Sse::new(stream) + .keep_alive( + axum::response::sse::KeepAlive::new() + .interval(std::time::Duration::from_secs(15)) + .text(""), + ) + .into_response() +} diff --git a/src/api/responses.rs b/src/api/responses.rs index ed229d6..a1c2ab8 100644 --- a/src/api/responses.rs +++ b/src/api/responses.rs @@ -242,6 +242,25 @@ pub(crate) async fn handle_responses( state.mitm_store.set_tool_config(gemini_config).await; } + // Store generation parameters for MITM injection + { + use crate::mitm::store::GenerationParams; + let gp = GenerationParams { + temperature: body.temperature, + top_p: body.top_p, + top_k: None, + max_output_tokens: body.max_output_tokens, + stop_sequences: None, + frequency_penalty: None, + presence_penalty: None, + }; + if gp.temperature.is_some() || gp.top_p.is_some() || gp.max_output_tokens.is_some() { + state.mitm_store.set_generation_params(gp).await; + } else { + state.mitm_store.clear_generation_params().await; + } + } + let response_id = format!( "resp_{}", uuid::Uuid::new_v4().to_string().replace('-', "") @@ -1003,19 +1022,21 @@ async fn handle_responses_stream( } }; + let mut thinking_started = false; + let mut thinking_done = false; + let mut last_thinking_len: usize = 0; while start.elapsed().as_secs() < timeout { if let Ok((status, data)) = state.backend.get_steps(&cascade_id).await { if status == 200 { if let Some(steps) = data["steps"].as_array() { - // Check for thinking content (appears before response text) - if !thinking_emitted { - if let Some(tc) = extract_thinking_content(steps) { - thinking_text = Some(tc.clone()); - thinking_emitted = true; + // ── Phase 1: Stream thinking deltas progressively ── + if let Some(tc) = extract_thinking_content(steps) { + if !thinking_started { + // First time we see thinking — emit structure events + thinking_started = true; - // Emit full reasoning event sequence at output_index 0 yield Ok(responses_sse_event( "response.output_item.added", serde_json::json!({ @@ -1040,6 +1061,14 @@ async fn handle_responses_stream( "part": { "type": "summary_text", "text": "" }, }), )); + } + + // Emit delta if thinking text has grown + if tc.len() > last_thinking_len { + let delta = &tc[last_thinking_len..]; + last_thinking_len = tc.len(); + thinking_text = Some(tc.clone()); + yield Ok(responses_sse_event( "response.reasoning_summary_text.delta", serde_json::json!({ @@ -1048,9 +1077,22 @@ async fn handle_responses_stream( "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, - "delta": &tc, + "delta": delta, }), )); + } + } + + // ── Phase 2: Stream text deltas ── + let text = extract_response_text(steps); + let msg_output_index: u32 = if thinking_started { 1 } else { 0 }; + + if !text.is_empty() && text != last_text { + // Finalize thinking when response text first appears + if thinking_started && !thinking_done { + thinking_done = true; + let final_thinking = thinking_text.clone().unwrap_or_default(); + yield Ok(responses_sse_event( "response.reasoning_summary_text.done", serde_json::json!({ @@ -1059,7 +1101,7 @@ async fn handle_responses_stream( "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, - "text": &tc, + "text": &final_thinking, }), )); yield Ok(responses_sse_event( @@ -1070,7 +1112,7 @@ async fn handle_responses_stream( "item_id": &reasoning_id, "output_index": 0, "summary_index": 0, - "part": { "type": "summary_text", "text": &tc }, + "part": { "type": "summary_text", "text": &final_thinking }, }), )); yield Ok(responses_sse_event( @@ -1084,19 +1126,13 @@ async fn handle_responses_stream( "type": "reasoning", "summary": [{ "type": "summary_text", - "text": &tc, + "text": &final_thinking, }], }, }), )); } - } - // ── Phase 2: Stream text deltas ── - let text = extract_response_text(steps); - let msg_output_index: u32 = if thinking_emitted { 1 } else { 0 }; - - if !text.is_empty() && text != last_text { // Emit message output_item.added on first text if !message_started { message_started = true; @@ -1153,7 +1189,7 @@ async fn handle_responses_stream( if is_response_done(steps) && !last_text.is_empty() { debug!("Response done, text length={}", last_text.len()); let mu = extract_model_usage(steps); - let msg_idx: u32 = if thinking_emitted { 1 } else { 0 }; + let msg_idx: u32 = if thinking_started { 1 } else { 0 }; let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); // Use already-captured thinking, or MITM thinking, or LS thinking @@ -1179,7 +1215,7 @@ async fn handle_responses_stream( if run_status.contains("IDLE") && !last_text.is_empty() { debug!("Trajectory IDLE, text length={}", last_text.len()); let mu = extract_model_usage(steps); - let msg_idx: u32 = if thinking_emitted { 1 } else { 0 }; + let msg_idx: u32 = if thinking_started { 1 } else { 0 }; let (usage, mitm_thinking) = usage_from_poll(&state.mitm_store, &cascade_id, &mu, ¶ms.user_text, &last_text).await; let ts = extract_thinking_signature(steps); let tc = thinking_text.clone() diff --git a/src/api/types.rs b/src/api/types.rs index 8bdb016..b54c95b 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -40,6 +40,14 @@ pub(crate) struct ResponsesRequest { pub tool_choice: Option, } +/// Stream options for Chat Completions (controls usage emission in final chunk). +#[derive(Deserialize, Default)] +pub(crate) struct StreamOptions { + /// When true, emit a final chunk with usage statistics before [DONE]. + #[serde(default)] + pub include_usage: bool, +} + /// Chat Completions request (OpenAI-compatible). #[derive(Deserialize)] pub(crate) struct CompletionRequest { @@ -53,6 +61,30 @@ pub(crate) struct CompletionRequest { pub tools: Option>, /// Tool choice: "auto", "none", "required", or {"type":"function","function":{"name":"..."}} pub tool_choice: Option, + /// Stream options — controls whether usage is emitted in the final streaming chunk. + #[serde(default)] + pub stream_options: Option, + /// Sampling temperature — forwarded to Google via MITM generationConfig injection. + #[serde(default)] + pub temperature: Option, + /// Nucleus sampling — forwarded to Google via MITM. + #[serde(default)] + pub top_p: Option, + /// Max tokens — forwarded to Google as maxOutputTokens via MITM. + #[serde(default)] + pub max_tokens: Option, + /// Max completion tokens — forwarded to Google as maxOutputTokens via MITM. + #[serde(default)] + pub max_completion_tokens: Option, + /// User identifier — accepted, not used. + #[serde(default)] + pub user: Option, + /// Frequency penalty — forwarded to Google via MITM. + #[serde(default)] + pub frequency_penalty: Option, + /// Presence penalty — forwarded to Google via MITM. + #[serde(default)] + pub presence_penalty: Option, } #[derive(Deserialize)] diff --git a/src/mitm/modify.rs b/src/mitm/modify.rs index b5a2031..27f726f 100644 --- a/src/mitm/modify.rs +++ b/src/mitm/modify.rs @@ -26,6 +26,8 @@ pub struct ToolContext { pub pending_results: Vec, /// Last captured function calls for history rewriting. pub last_calls: Vec, + /// Client-specified generation parameters (temperature, top_p, etc.). + pub generation_params: Option, } /// Modify a streamGenerateContent request body in-place. @@ -349,6 +351,62 @@ pub fn modify_request(body: &[u8], tool_ctx: Option<&ToolContext>) -> Option = Vec::new(); + + if let Some(t) = gp.temperature { + gc.insert("temperature".to_string(), serde_json::json!(t)); + injected.push(format!("temperature={t}")); + } + if let Some(p) = gp.top_p { + gc.insert("topP".to_string(), serde_json::json!(p)); + injected.push(format!("topP={p}")); + } + if let Some(k) = gp.top_k { + gc.insert("topK".to_string(), serde_json::json!(k)); + injected.push(format!("topK={k}")); + } + if let Some(m) = gp.max_output_tokens { + gc.insert("maxOutputTokens".to_string(), serde_json::json!(m)); + injected.push(format!("maxOutputTokens={m}")); + } + if let Some(ref seqs) = gp.stop_sequences { + gc.insert("stopSequences".to_string(), serde_json::json!(seqs)); + injected.push(format!("stopSequences({})", seqs.len())); + } + if let Some(fp) = gp.frequency_penalty { + gc.insert("frequencyPenalty".to_string(), serde_json::json!(fp)); + injected.push(format!("frequencyPenalty={fp}")); + } + if let Some(pp) = gp.presence_penalty { + gc.insert("presencePenalty".to_string(), serde_json::json!(pp)); + injected.push(format!("presencePenalty={pp}")); + } + + if !injected.is_empty() { + changes.push(format!("inject generationConfig: {}", injected.join(", "))); + } + } + } + } + if changes.is_empty() { return None; // Nothing modified } diff --git a/src/mitm/proxy.rs b/src/mitm/proxy.rs index 089e834..ed8347d 100644 --- a/src/mitm/proxy.rs +++ b/src/mitm/proxy.rs @@ -561,13 +561,15 @@ async fn handle_http_over_tls( let tool_config = store.get_tool_config().await; let pending_results = store.take_tool_results().await; let last_calls = store.get_last_function_calls().await; + let generation_params = store.get_generation_params().await; - let tool_ctx = if tools.is_some() || !pending_results.is_empty() { + let tool_ctx = if tools.is_some() || !pending_results.is_empty() || generation_params.is_some() { Some(super::modify::ToolContext { tools, tool_config, pending_results, last_calls, + generation_params, }) } else { None diff --git a/src/mitm/store.rs b/src/mitm/store.rs index 1f7faed..e6ee322 100644 --- a/src/mitm/store.rs +++ b/src/mitm/store.rs @@ -60,6 +60,21 @@ pub struct PendingToolResult { pub result: serde_json::Value, } +/// Client-specified generation parameters for MITM injection. +/// Set by API handlers, consumed by the MITM modify layer. +#[derive(Debug, Clone, Default)] +pub struct GenerationParams { + pub temperature: Option, + pub top_p: Option, + pub top_k: Option, + pub max_output_tokens: Option, + pub stop_sequences: Option>, + /// Frequency penalty (OpenAI) — mapped to frequencyPenalty in Gemini. + pub frequency_penalty: Option, + /// Presence penalty (OpenAI) — mapped to presencePenalty in Gemini. + pub presence_penalty: Option, +} + /// Thread-safe store for intercepted data. /// /// Keyed by a unique request ID that we can correlate with cascade operations. @@ -102,6 +117,10 @@ pub struct MitmStore { captured_thinking_text: Arc>>, /// Whether the captured response is complete (finishReason received). response_complete: Arc, + + // ── Generation parameters for MITM injection ───────────────────────── + /// Client-specified sampling parameters to inject into Google API requests. + generation_params: Arc>>, } /// Aggregate statistics across all intercepted traffic. @@ -144,6 +163,7 @@ impl MitmStore { captured_response_text: Arc::new(RwLock::new(None)), captured_thinking_text: Arc::new(RwLock::new(None)), response_complete: Arc::new(AtomicBool::new(false)), + generation_params: Arc::new(RwLock::new(None)), } } @@ -433,4 +453,21 @@ impl MitmStore { pub async fn clear_active_cascade(&self) { *self.active_cascade_id.write().await = None; } + + // ── Generation parameters ──────────────────────────────────────────── + + /// Store client-specified generation parameters for MITM injection. + pub async fn set_generation_params(&self, params: GenerationParams) { + *self.generation_params.write().await = Some(params); + } + + /// Read current generation parameters (non-consuming). + pub async fn get_generation_params(&self) -> Option { + self.generation_params.read().await.clone() + } + + /// Clear generation parameters. + pub async fn clear_generation_params(&self) { + *self.generation_params.write().await = None; + } }