From df96fc1688dc0dd9cdd1e0b6c21feb5a03335f9c Mon Sep 17 00:00:00 2001 From: Vitus <144411726+Vitus213@users.noreply.github.com> Date: Sat, 26 Apr 2025 12:23:56 +0800 Subject: [PATCH] fix(provider): adjust structure of proxy_handler (#83) --- .../provider/src/handlers/invoke_resolver.rs | 10 ++- crates/provider/src/proxy/builder.rs | 41 ++++++++++ crates/provider/src/proxy/client.rs | 32 ++++++++ crates/provider/src/proxy/mod.rs | 2 + crates/provider/src/proxy/proxy_handler.rs | 79 ++----------------- crates/service/src/systemd.rs | 37 ++++----- 6 files changed, 102 insertions(+), 99 deletions(-) create mode 100644 crates/provider/src/proxy/builder.rs create mode 100644 crates/provider/src/proxy/client.rs diff --git a/crates/provider/src/handlers/invoke_resolver.rs b/crates/provider/src/handlers/invoke_resolver.rs index 957d7e4..487a875 100644 --- a/crates/provider/src/handlers/invoke_resolver.rs +++ b/crates/provider/src/handlers/invoke_resolver.rs @@ -8,12 +8,13 @@ use url::Url; pub struct InvokeResolver; impl InvokeResolver { - pub async fn resolve(function_name: &str) -> Result { + pub async fn resolve_function_url(function_name: &str) -> Result { //根据函数名和containerd获取函数ip, //从函数名称中提取命名空间。如果函数名称中包含 .,则将其后的部分作为命名空间;否则使用默认命名空间 let mut actual_function_name = function_name; - let namespace = get_namespace_or_default(function_name, DEFAULT_FUNCTION_NAMESPACE); + let namespace = + extract_namespace_from_function_or_default(function_name, DEFAULT_FUNCTION_NAMESPACE); if function_name.contains('.') { actual_function_name = function_name.trim_end_matches(&format!(".{}", namespace)); } @@ -39,7 +40,10 @@ impl InvokeResolver { } } -fn get_namespace_or_default(function_name: &str, default_namespace: &str) -> String { +fn extract_namespace_from_function_or_default( + function_name: &str, + default_namespace: &str, +) -> String { let mut namespace = default_namespace.to_string(); if function_name.contains('.') { if let Some(index) = function_name.rfind('.') { diff --git a/crates/provider/src/proxy/builder.rs b/crates/provider/src/proxy/builder.rs new file mode 100644 index 0000000..c50d602 --- /dev/null +++ b/crates/provider/src/proxy/builder.rs @@ -0,0 +1,41 @@ +use actix_web::{Error, HttpRequest, web}; +use futures::StreamExt; +use reqwest::{Client, RequestBuilder}; +use url::Url; +//根据URL和原始请求来构建转发请求,并对请求头进行处理 +pub async fn build_proxy_request( + req: &HttpRequest, + base_url: &Url, + proxy_client: &Client, + mut payload: web::Payload, +) -> Result { + let origin_url = base_url.join(req.uri().path()).unwrap(); + let remaining_segments = origin_url.path_segments().unwrap().skip(2); + let rest_path = remaining_segments.collect::>().join("/"); + let url = base_url.join(&rest_path).unwrap(); + let mut proxy_req = proxy_client + .request(req.method().clone(), url) + .headers(req.headers().clone().into()); + + if req.headers().get("X-Forwarded-Host").is_none() { + if let Some(host) = req.headers().get("Host") { + proxy_req = proxy_req.header("X-Forwarded-Host", host); + } + } + + if req.headers().get("X-Forwarded-For").is_none() { + if let Some(remote_addr) = req.peer_addr() { + proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string()); + } + } + + let mut body = web::BytesMut::new(); + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + body.extend_from_slice(&chunk); + } + let body_bytes = body.freeze(); + let proxy_req = proxy_req.body(body_bytes); + + Ok(proxy_req) +} diff --git a/crates/provider/src/proxy/client.rs b/crates/provider/src/proxy/client.rs new file mode 100644 index 0000000..bc5ab5d --- /dev/null +++ b/crates/provider/src/proxy/client.rs @@ -0,0 +1,32 @@ +use std::time::Duration; + +use reqwest::{Client, redirect}; + +use crate::types::config::FaaSConfig; + +//构建client +pub async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client { + new_proxy_client( + config.get_read_timeout(), + /*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(), + ) + .await +} + +//根据FaasConfig参数来设置Client +pub async fn new_proxy_client( + timeout: Duration, + //max_idle_conns: usize, + max_idle_conns_per_host: usize, +) -> Client { + Client::builder() + .connect_timeout(timeout) + .timeout(timeout) + .pool_max_idle_per_host(max_idle_conns_per_host) + .pool_idle_timeout(Duration::from_millis(120)) + .tcp_keepalive(120 * Duration::from_secs(1)) + .redirect(redirect::Policy::none()) + .tcp_nodelay(true) + .build() + .expect("Failed to create client") +} diff --git a/crates/provider/src/proxy/mod.rs b/crates/provider/src/proxy/mod.rs index af3226f..79b8e70 100644 --- a/crates/provider/src/proxy/mod.rs +++ b/crates/provider/src/proxy/mod.rs @@ -1 +1,3 @@ +pub mod builder; +pub mod client; pub mod proxy_handler; diff --git a/crates/provider/src/proxy/proxy_handler.rs b/crates/provider/src/proxy/proxy_handler.rs index abcb7cd..4fcce1b 100644 --- a/crates/provider/src/proxy/proxy_handler.rs +++ b/crates/provider/src/proxy/proxy_handler.rs @@ -1,11 +1,8 @@ -use futures::StreamExt; -use std::time::Duration; - +use crate::handlers::invoke_resolver::InvokeResolver; +use crate::proxy::builder::build_proxy_request; +use crate::proxy::client::new_proxy_client_from_config; +use crate::types::config::FaaSConfig; use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web}; -use reqwest::{Client, RequestBuilder, redirect}; -use url::Url; - -use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig}; pub async fn proxy_handler( config: web::Data, @@ -29,45 +26,19 @@ pub async fn proxy_handler( _ => HttpResponse::MethodNotAllowed().body("method not allowed"), } } -//构建client -async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client { - new_proxy_client( - config.get_read_timeout(), - /*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(), - ) - .await -} - -//根据FaasConfig参数来设置Client -async fn new_proxy_client( - timeout: Duration, - //max_idle_conns: usize, - max_idle_conns_per_host: usize, -) -> Client { - Client::builder() - .connect_timeout(timeout) - .timeout(timeout) - .pool_max_idle_per_host(max_idle_conns_per_host) - .pool_idle_timeout(Duration::from_millis(120)) - .tcp_keepalive(120 * Duration::from_secs(1)) - .redirect(redirect::Policy::none()) - .tcp_nodelay(true) - .build() - .expect("Failed to create client") -} //根据原始请求,解析url,构建转发请求并转发,获取响应 async fn proxy_request( req: &HttpRequest, payload: web::Payload, - proxy_client: &Client, + proxy_client: &reqwest::Client, ) -> Result { let function_name = req.match_info().get("name").unwrap_or(""); if function_name.is_empty() { return Ok(HttpResponse::BadRequest().body("provide function name in path")); } - let function_addr = match InvokeResolver::resolve(function_name).await { + let function_addr = match InvokeResolver::resolve_function_url(function_name).await { Ok(function_addr) => function_addr, Err(e) => return Ok(HttpResponse::BadRequest().body(e.to_string())), }; @@ -90,41 +61,3 @@ async fn proxy_request( Err(e) => Ok(HttpResponse::BadGateway().body(e.to_string())), } } - -//根据URL和原始请求来构建转发请求,并对请求头进行处理 -async fn build_proxy_request( - req: &HttpRequest, - base_url: &Url, - proxy_client: &Client, - mut payload: web::Payload, -) -> Result { - let origin_url = base_url.join(req.uri().path()).unwrap(); - let remaining_segments = origin_url.path_segments().unwrap().skip(2); - let rest_path = remaining_segments.collect::>().join("/"); - let url = base_url.join(&rest_path).unwrap(); - let mut proxy_req = proxy_client - .request(req.method().clone(), url) - .headers(req.headers().clone().into()); - - if req.headers().get("X-Forwarded-Host").is_none() { - if let Some(host) = req.headers().get("Host") { - proxy_req = proxy_req.header("X-Forwarded-Host", host); - } - } - - if req.headers().get("X-Forwarded-For").is_none() { - if let Some(remote_addr) = req.peer_addr() { - proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string()); - } - } - - let mut body = web::BytesMut::new(); - while let Some(chunk) = payload.next().await { - let chunk = chunk?; - body.extend_from_slice(&chunk); - } - let body_bytes = body.freeze(); - let proxy_req = proxy_req.body(body_bytes); - - Ok(proxy_req) -} diff --git a/crates/service/src/systemd.rs b/crates/service/src/systemd.rs index c43ad9a..9fc5e0a 100644 --- a/crates/service/src/systemd.rs +++ b/crates/service/src/systemd.rs @@ -11,14 +11,11 @@ impl Systemd { .arg(&unit) .output()?; if !output.status.success() { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "Failed to enable unit {}: {}", - unit, - String::from_utf8_lossy(&output.stderr) - ), - ))); + return Err(Box::new(std::io::Error::other(format!( + "Failed to enable unit {}: {}", + unit, + String::from_utf8_lossy(&output.stderr) + )))); } Ok(()) } @@ -29,14 +26,11 @@ impl Systemd { .arg(&unit) .output()?; if !output.status.success() { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "Failed to start unit {}: {}", - unit, - String::from_utf8_lossy(&output.stderr) - ), - ))); + return Err(Box::new(std::io::Error::other(format!( + "Failed to start unit {}: {}", + unit, + String::from_utf8_lossy(&output.stderr) + )))); } Ok(()) } @@ -46,13 +40,10 @@ impl Systemd { .arg("daemon-reload") .output()?; if !output.status.success() { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "Failed to reload systemd daemon: {}", - String::from_utf8_lossy(&output.stderr) - ), - ))); + return Err(Box::new(std::io::Error::other(format!( + "Failed to reload systemd daemon: {}", + String::from_utf8_lossy(&output.stderr) + )))); } Ok(()) }