refactor: extract GrpcUsage::into_api_usage to DRY up h2_handler
This commit is contained in:
@@ -20,7 +20,7 @@
|
|||||||
//! of a streaming response, so we tee the data and parse after stream ends.
|
//! of a streaming response, so we tee the data and parse after stream ends.
|
||||||
|
|
||||||
use crate::mitm::proto::parse_grpc_response_for_usage;
|
use crate::mitm::proto::parse_grpc_response_for_usage;
|
||||||
use crate::mitm::store::{ApiUsage, MitmStore};
|
use crate::mitm::store::MitmStore;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http_body_util::{BodyExt, Full, StreamBody};
|
use http_body_util::{BodyExt, Full, StreamBody};
|
||||||
@@ -352,23 +352,7 @@ async fn handle_h2_request(
|
|||||||
if let Some(tee_buffer) = tee_buffer {
|
if let Some(tee_buffer) = tee_buffer {
|
||||||
if !tee_buffer.is_empty() {
|
if !tee_buffer.is_empty() {
|
||||||
if let Some(grpc_usage) = parse_grpc_response_for_usage(&tee_buffer) {
|
if let Some(grpc_usage) = parse_grpc_response_for_usage(&tee_buffer) {
|
||||||
let usage = ApiUsage {
|
let usage = grpc_usage.into_api_usage(path_clone.clone());
|
||||||
input_tokens: grpc_usage.input_tokens,
|
|
||||||
output_tokens: grpc_usage.output_tokens,
|
|
||||||
thinking_output_tokens: grpc_usage.thinking_output_tokens,
|
|
||||||
response_output_tokens: grpc_usage.response_output_tokens,
|
|
||||||
cache_creation_input_tokens: grpc_usage.cache_write_tokens,
|
|
||||||
cache_read_input_tokens: grpc_usage.cache_read_tokens,
|
|
||||||
model: grpc_usage.model,
|
|
||||||
api_provider: grpc_usage.api_provider,
|
|
||||||
grpc_method: Some(path_clone.clone()),
|
|
||||||
stop_reason: None,
|
|
||||||
total_cost_usd: None,
|
|
||||||
captured_at: std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.as_secs(),
|
|
||||||
};
|
|
||||||
let cascade_hint = extract_cascade_from_grpc_request(&request_body_clone);
|
let cascade_hint = extract_cascade_from_grpc_request(&request_body_clone);
|
||||||
store_clone.record_usage(cascade_hint.as_deref(), usage).await;
|
store_clone.record_usage(cascade_hint.as_deref(), usage).await;
|
||||||
}
|
}
|
||||||
@@ -420,24 +404,7 @@ async fn handle_h2_request(
|
|||||||
// Extract usage data from usage-carrying gRPC methods
|
// Extract usage data from usage-carrying gRPC methods
|
||||||
if is_usage_method && !response_body.is_empty() && status.is_success() {
|
if is_usage_method && !response_body.is_empty() && status.is_success() {
|
||||||
if let Some(grpc_usage) = parse_grpc_response_for_usage(&response_body) {
|
if let Some(grpc_usage) = parse_grpc_response_for_usage(&response_body) {
|
||||||
let usage = ApiUsage {
|
let usage = grpc_usage.into_api_usage(path.clone());
|
||||||
input_tokens: grpc_usage.input_tokens,
|
|
||||||
output_tokens: grpc_usage.output_tokens,
|
|
||||||
thinking_output_tokens: grpc_usage.thinking_output_tokens,
|
|
||||||
response_output_tokens: grpc_usage.response_output_tokens,
|
|
||||||
cache_creation_input_tokens: grpc_usage.cache_write_tokens,
|
|
||||||
cache_read_input_tokens: grpc_usage.cache_read_tokens,
|
|
||||||
model: grpc_usage.model,
|
|
||||||
api_provider: grpc_usage.api_provider,
|
|
||||||
grpc_method: Some(path.clone()),
|
|
||||||
stop_reason: None,
|
|
||||||
total_cost_usd: None,
|
|
||||||
captured_at: std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.as_secs(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let cascade_hint = extract_cascade_from_grpc_request(&request_body);
|
let cascade_hint = extract_cascade_from_grpc_request(&request_body);
|
||||||
store.record_usage(cascade_hint.as_deref(), usage).await;
|
store.record_usage(cascade_hint.as_deref(), usage).await;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,6 +72,29 @@ pub struct GrpcUsage {
|
|||||||
pub response_id: Option<String>,
|
pub response_id: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl GrpcUsage {
|
||||||
|
/// Convert to a full `ApiUsage` record, attaching the gRPC method path.
|
||||||
|
pub fn into_api_usage(self, grpc_method: String) -> super::store::ApiUsage {
|
||||||
|
super::store::ApiUsage {
|
||||||
|
input_tokens: self.input_tokens,
|
||||||
|
output_tokens: self.output_tokens,
|
||||||
|
thinking_output_tokens: self.thinking_output_tokens,
|
||||||
|
response_output_tokens: self.response_output_tokens,
|
||||||
|
cache_creation_input_tokens: self.cache_write_tokens,
|
||||||
|
cache_read_input_tokens: self.cache_read_tokens,
|
||||||
|
model: self.model,
|
||||||
|
api_provider: self.api_provider,
|
||||||
|
grpc_method: Some(grpc_method),
|
||||||
|
stop_reason: None,
|
||||||
|
total_cost_usd: None,
|
||||||
|
captured_at: std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Extract gRPC message frames from a buffer.
|
/// Extract gRPC message frames from a buffer.
|
||||||
///
|
///
|
||||||
/// A gRPC message is:
|
/// A gRPC message is:
|
||||||
|
|||||||
Reference in New Issue
Block a user