fix(provider): adjust structure of proxy_handler (#83)

This commit is contained in:
Vitus 2025-04-26 12:23:56 +08:00 committed by GitHub
parent e7fb8105b4
commit df96fc1688
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 102 additions and 99 deletions

View File

@ -8,12 +8,13 @@ use url::Url;
pub struct InvokeResolver; pub struct InvokeResolver;
impl InvokeResolver { impl InvokeResolver {
pub async fn resolve(function_name: &str) -> Result<Url, Error> { pub async fn resolve_function_url(function_name: &str) -> Result<Url, Error> {
//根据函数名和containerd获取函数ip //根据函数名和containerd获取函数ip
//从函数名称中提取命名空间。如果函数名称中包含 .,则将其后的部分作为命名空间;否则使用默认命名空间 //从函数名称中提取命名空间。如果函数名称中包含 .,则将其后的部分作为命名空间;否则使用默认命名空间
let mut actual_function_name = function_name; let mut actual_function_name = function_name;
let namespace = get_namespace_or_default(function_name, DEFAULT_FUNCTION_NAMESPACE); let namespace =
extract_namespace_from_function_or_default(function_name, DEFAULT_FUNCTION_NAMESPACE);
if function_name.contains('.') { if function_name.contains('.') {
actual_function_name = function_name.trim_end_matches(&format!(".{}", namespace)); actual_function_name = function_name.trim_end_matches(&format!(".{}", namespace));
} }
@ -39,7 +40,10 @@ impl InvokeResolver {
} }
} }
fn get_namespace_or_default(function_name: &str, default_namespace: &str) -> String { fn extract_namespace_from_function_or_default(
function_name: &str,
default_namespace: &str,
) -> String {
let mut namespace = default_namespace.to_string(); let mut namespace = default_namespace.to_string();
if function_name.contains('.') { if function_name.contains('.') {
if let Some(index) = function_name.rfind('.') { if let Some(index) = function_name.rfind('.') {

View File

@ -0,0 +1,41 @@
use actix_web::{Error, HttpRequest, web};
use futures::StreamExt;
use reqwest::{Client, RequestBuilder};
use url::Url;
//根据URL和原始请求来构建转发请求并对请求头进行处理
pub async fn build_proxy_request(
req: &HttpRequest,
base_url: &Url,
proxy_client: &Client,
mut payload: web::Payload,
) -> Result<RequestBuilder, Error> {
let origin_url = base_url.join(req.uri().path()).unwrap();
let remaining_segments = origin_url.path_segments().unwrap().skip(2);
let rest_path = remaining_segments.collect::<Vec<_>>().join("/");
let url = base_url.join(&rest_path).unwrap();
let mut proxy_req = proxy_client
.request(req.method().clone(), url)
.headers(req.headers().clone().into());
if req.headers().get("X-Forwarded-Host").is_none() {
if let Some(host) = req.headers().get("Host") {
proxy_req = proxy_req.header("X-Forwarded-Host", host);
}
}
if req.headers().get("X-Forwarded-For").is_none() {
if let Some(remote_addr) = req.peer_addr() {
proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string());
}
}
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
body.extend_from_slice(&chunk);
}
let body_bytes = body.freeze();
let proxy_req = proxy_req.body(body_bytes);
Ok(proxy_req)
}

View File

@ -0,0 +1,32 @@
use std::time::Duration;
use reqwest::{Client, redirect};
use crate::types::config::FaaSConfig;
//构建client
pub async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client {
new_proxy_client(
config.get_read_timeout(),
/*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(),
)
.await
}
//根据FaasConfig参数来设置Client
pub async fn new_proxy_client(
timeout: Duration,
//max_idle_conns: usize,
max_idle_conns_per_host: usize,
) -> Client {
Client::builder()
.connect_timeout(timeout)
.timeout(timeout)
.pool_max_idle_per_host(max_idle_conns_per_host)
.pool_idle_timeout(Duration::from_millis(120))
.tcp_keepalive(120 * Duration::from_secs(1))
.redirect(redirect::Policy::none())
.tcp_nodelay(true)
.build()
.expect("Failed to create client")
}

View File

@ -1 +1,3 @@
pub mod builder;
pub mod client;
pub mod proxy_handler; pub mod proxy_handler;

View File

@ -1,11 +1,8 @@
use futures::StreamExt; use crate::handlers::invoke_resolver::InvokeResolver;
use std::time::Duration; use crate::proxy::builder::build_proxy_request;
use crate::proxy::client::new_proxy_client_from_config;
use crate::types::config::FaaSConfig;
use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web}; use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web};
use reqwest::{Client, RequestBuilder, redirect};
use url::Url;
use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig};
pub async fn proxy_handler( pub async fn proxy_handler(
config: web::Data<FaaSConfig>, config: web::Data<FaaSConfig>,
@ -29,45 +26,19 @@ pub async fn proxy_handler(
_ => HttpResponse::MethodNotAllowed().body("method not allowed"), _ => HttpResponse::MethodNotAllowed().body("method not allowed"),
} }
} }
//构建client
async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client {
new_proxy_client(
config.get_read_timeout(),
/*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(),
)
.await
}
//根据FaasConfig参数来设置Client
async fn new_proxy_client(
timeout: Duration,
//max_idle_conns: usize,
max_idle_conns_per_host: usize,
) -> Client {
Client::builder()
.connect_timeout(timeout)
.timeout(timeout)
.pool_max_idle_per_host(max_idle_conns_per_host)
.pool_idle_timeout(Duration::from_millis(120))
.tcp_keepalive(120 * Duration::from_secs(1))
.redirect(redirect::Policy::none())
.tcp_nodelay(true)
.build()
.expect("Failed to create client")
}
//根据原始请求解析url构建转发请求并转发获取响应 //根据原始请求解析url构建转发请求并转发获取响应
async fn proxy_request( async fn proxy_request(
req: &HttpRequest, req: &HttpRequest,
payload: web::Payload, payload: web::Payload,
proxy_client: &Client, proxy_client: &reqwest::Client,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let function_name = req.match_info().get("name").unwrap_or(""); let function_name = req.match_info().get("name").unwrap_or("");
if function_name.is_empty() { if function_name.is_empty() {
return Ok(HttpResponse::BadRequest().body("provide function name in path")); return Ok(HttpResponse::BadRequest().body("provide function name in path"));
} }
let function_addr = match InvokeResolver::resolve(function_name).await { let function_addr = match InvokeResolver::resolve_function_url(function_name).await {
Ok(function_addr) => function_addr, Ok(function_addr) => function_addr,
Err(e) => return Ok(HttpResponse::BadRequest().body(e.to_string())), Err(e) => return Ok(HttpResponse::BadRequest().body(e.to_string())),
}; };
@ -90,41 +61,3 @@ async fn proxy_request(
Err(e) => Ok(HttpResponse::BadGateway().body(e.to_string())), Err(e) => Ok(HttpResponse::BadGateway().body(e.to_string())),
} }
} }
//根据URL和原始请求来构建转发请求并对请求头进行处理
async fn build_proxy_request(
req: &HttpRequest,
base_url: &Url,
proxy_client: &Client,
mut payload: web::Payload,
) -> Result<RequestBuilder, Error> {
let origin_url = base_url.join(req.uri().path()).unwrap();
let remaining_segments = origin_url.path_segments().unwrap().skip(2);
let rest_path = remaining_segments.collect::<Vec<_>>().join("/");
let url = base_url.join(&rest_path).unwrap();
let mut proxy_req = proxy_client
.request(req.method().clone(), url)
.headers(req.headers().clone().into());
if req.headers().get("X-Forwarded-Host").is_none() {
if let Some(host) = req.headers().get("Host") {
proxy_req = proxy_req.header("X-Forwarded-Host", host);
}
}
if req.headers().get("X-Forwarded-For").is_none() {
if let Some(remote_addr) = req.peer_addr() {
proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string());
}
}
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
body.extend_from_slice(&chunk);
}
let body_bytes = body.freeze();
let proxy_req = proxy_req.body(body_bytes);
Ok(proxy_req)
}

View File

@ -11,14 +11,11 @@ impl Systemd {
.arg(&unit) .arg(&unit)
.output()?; .output()?;
if !output.status.success() { if !output.status.success() {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::other(format!(
std::io::ErrorKind::Other, "Failed to enable unit {}: {}",
format!( unit,
"Failed to enable unit {}: {}", String::from_utf8_lossy(&output.stderr)
unit, ))));
String::from_utf8_lossy(&output.stderr)
),
)));
} }
Ok(()) Ok(())
} }
@ -29,14 +26,11 @@ impl Systemd {
.arg(&unit) .arg(&unit)
.output()?; .output()?;
if !output.status.success() { if !output.status.success() {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::other(format!(
std::io::ErrorKind::Other, "Failed to start unit {}: {}",
format!( unit,
"Failed to start unit {}: {}", String::from_utf8_lossy(&output.stderr)
unit, ))));
String::from_utf8_lossy(&output.stderr)
),
)));
} }
Ok(()) Ok(())
} }
@ -46,13 +40,10 @@ impl Systemd {
.arg("daemon-reload") .arg("daemon-reload")
.output()?; .output()?;
if !output.status.success() { if !output.status.success() {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::other(format!(
std::io::ErrorKind::Other, "Failed to reload systemd daemon: {}",
format!( String::from_utf8_lossy(&output.stderr)
"Failed to reload systemd daemon: {}", ))));
String::from_utf8_lossy(&output.stderr)
),
)));
} }
Ok(()) Ok(())
} }