feat(awc): introduce awc crate to match actix-web streaming requests (#94)

* feat: introduce awc crate to match actix-web streaming requests

* fix(ci): fix the integration_tests

mod

---------

Co-authored-by: Samuka007 <dailvchen@dragonos.org>
This commit is contained in:
aLinChe
2025-05-08 15:16:12 +08:00
committed by GitHub
parent 702e6a8596
commit ed6741cd8a
11 changed files with 299 additions and 536 deletions

View File

@ -11,6 +11,8 @@ mod integration_tests {
use actix_web::http::StatusCode;
use actix_web::test;
use serde_json::json;
use std::thread::sleep;
use std::time::Duration;
#[actix_web::test]
#[ignore]
@ -37,7 +39,7 @@ mod integration_tests {
.uri("/function/test-no-found-function")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
assert!(response_str.contains("Failed to get function"));
@ -76,6 +78,7 @@ mod integration_tests {
log::info!("{}", response_str);
assert!(response_str.contains("Function test-function deployment initiated successfully."));
sleep(Duration::from_secs(2));
// test proxy in namespace 'default'
let req = test::TestRequest::get()
.uri("/function/test-function")

View File

@ -14,10 +14,9 @@ uuid = { version = "1.8.0", features = ["v4"] }
actix-web-httpauth = "0.6"
config = "0.11"
thiserror = "1.0"
reqwest = "0.11"
awc = "3.6.0"
prometheus = "0.13"
tempfile = "3.2"
hyper = "0.14"
tower = "0.4"
regex = "1"
futures = "0.3"
@ -33,3 +32,4 @@ my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" }
url = "2.4"
derive_more = { version = "2", features = ["full"] }
tonic = "0.12"
http = "1.3.1"

View File

@ -26,8 +26,10 @@ pub async fn delete_handler(info: web::Json<DeleteContainerInfo>) -> impl Respon
Ok(function) => function,
Err(e) => {
log::error!("Failed to get function: {}", e);
return HttpResponse::NotFound()
.body(format!("Function '{}' not found ", function_name));
return HttpResponse::NotFound().body(format!(
"Function '{}' not found in namespace '{}'",
function_name, namespace
));
}
};

View File

@ -33,7 +33,7 @@ pub async fn deploy_handler(info: web::Json<DeployFunctionInfo>) -> impl Respond
match deploy(&function_name, &image, &namespace).await {
Ok(()) => HttpResponse::Accepted().body(format!(
"Function {} deployment initiated successfully .",
"Function {} deployment initiated successfully.",
function_name
)),
Err(e) => HttpResponse::BadRequest().body(format!(

View File

@ -1,41 +1,40 @@
use actix_web::{Error, HttpRequest, web};
use futures::StreamExt;
use reqwest::{Client, RequestBuilder};
use actix_web::{HttpRequest, web};
use awc::http::Uri;
use url::Url;
//根据URL和原始请求来构建转发请求并对请求头进行处理
pub async fn build_proxy_request(
pub fn create_proxy_request(
req: &HttpRequest,
base_url: &Url,
proxy_client: &Client,
mut payload: web::Payload,
) -> Result<RequestBuilder, Error> {
payload: web::Payload,
) -> awc::SendClientRequest {
let proxy_client = awc::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.finish();
let origin_url = base_url.join(req.uri().path()).unwrap();
let remaining_segments = origin_url.path_segments().unwrap().skip(2);
let rest_path = remaining_segments.collect::<Vec<_>>().join("/");
let url = base_url.join(&rest_path).unwrap();
let mut proxy_req = proxy_client
.request(req.method().clone(), url)
.headers(req.headers().clone().into());
let uri = url.as_str().parse::<Uri>().unwrap();
let mut proxy_req = proxy_client.request(req.method().clone(), uri);
for header in req.headers() {
proxy_req = proxy_req.insert_header(header);
}
if req.headers().get("X-Forwarded-Host").is_none() {
if let Some(host) = req.headers().get("Host") {
proxy_req = proxy_req.header("X-Forwarded-Host", host);
proxy_req = proxy_req.insert_header(("X-Forwarded-Host", host));
}
}
if req.headers().get("X-Forwarded-For").is_none() {
if let Some(remote_addr) = req.peer_addr() {
proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string());
proxy_req = proxy_req.insert_header(("X-Forwarded-For", remote_addr.to_string()));
}
}
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
body.extend_from_slice(&chunk);
}
let body_bytes = body.freeze();
let proxy_req = proxy_req.body(body_bytes);
Ok(proxy_req)
proxy_req.send_stream(payload)
}

View File

@ -1,32 +0,0 @@
use std::time::Duration;
use reqwest::{Client, redirect};
use crate::types::config::FaaSConfig;
//构建client
pub async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client {
new_proxy_client(
config.get_read_timeout(),
/*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(),
)
.await
}
//根据FaasConfig参数来设置Client
pub async fn new_proxy_client(
timeout: Duration,
//max_idle_conns: usize,
max_idle_conns_per_host: usize,
) -> Client {
Client::builder()
.connect_timeout(timeout)
.timeout(timeout)
.pool_max_idle_per_host(max_idle_conns_per_host)
.pool_idle_timeout(Duration::from_millis(120))
.tcp_keepalive(120 * Duration::from_secs(1))
.redirect(redirect::Policy::none())
.tcp_nodelay(true)
.build()
.expect("Failed to create client")
}

View File

@ -1,3 +1,3 @@
pub mod builder;
pub mod client;
pub mod proxy_handler;
mod proxy_handler_test;

View File

@ -1,9 +1,8 @@
use crate::handlers::invoke_resolver::InvokeResolver;
use crate::proxy::builder::build_proxy_request;
use crate::proxy::client::new_proxy_client_from_config;
use crate::types::config::FaaSConfig;
use crate::proxy::builder::create_proxy_request;
use actix_web::{
Error, HttpRequest, HttpResponse,
HttpRequest, HttpResponse,
error::{ErrorBadRequest, ErrorInternalServerError, ErrorMethodNotAllowed},
http::Method,
web,
@ -11,13 +10,9 @@ use actix_web::{
// 主要参考源码的响应设置
pub async fn proxy_handler(
config: web::Data<FaaSConfig>,
req: HttpRequest,
payload: web::Payload,
) -> Result<HttpResponse, Error> {
let proxy_client = new_proxy_client_from_config(config.as_ref()).await;
log::info!("proxy_client : {:?}", proxy_client);
) -> actix_web::Result<HttpResponse> {
match *req.method() {
Method::POST
| Method::PUT
@ -25,8 +20,8 @@ pub async fn proxy_handler(
| Method::GET
| Method::PATCH
| Method::HEAD
| Method::OPTIONS => proxy_request(&req, payload, &proxy_client).await,
_ => Err(ErrorMethodNotAllowed("method not allowed")),
| Method::OPTIONS => proxy_request(&req, payload).await,
_ => Err(ErrorMethodNotAllowed("Method not allowed")),
}
}
@ -34,30 +29,31 @@ pub async fn proxy_handler(
async fn proxy_request(
req: &HttpRequest,
payload: web::Payload,
proxy_client: &reqwest::Client,
) -> Result<HttpResponse, Error> {
) -> actix_web::Result<HttpResponse> {
let function_name = req.match_info().get("name").unwrap_or("");
if function_name.is_empty() {
return Err(ErrorBadRequest("function name is required"));
return Err(ErrorBadRequest("Function name is required"));
}
let function_addr = InvokeResolver::resolve_function_url(function_name).await?;
let proxy_req = build_proxy_request(req, &function_addr, proxy_client, payload).await?;
let proxy_req = create_proxy_request(req, &function_addr, payload);
match proxy_req.send().await {
Ok(resp) => {
let status = resp.status();
let mut client_resp = HttpResponse::build(status);
for (name, value) in resp.headers().iter() {
client_resp.insert_header((name.clone(), value.clone()));
}
let body = resp.bytes().await.unwrap();
Ok(client_resp.body(body))
// Handle the error conversion explicitly
let proxy_resp = match proxy_req.await {
Ok(resp) => resp,
Err(e) => {
log::error!("Proxy request failed: {}", e);
return Err(ErrorInternalServerError(format!(
"Proxy request failed: {}",
e
)));
}
Err(e) => Err(ErrorInternalServerError(e)),
}
};
// Now create an HttpResponse from the proxy response
let mut client_resp = HttpResponse::build(proxy_resp.status());
// Stream the response body
Ok(client_resp.streaming(proxy_resp))
}

View File

@ -0,0 +1,127 @@
#[cfg(test)]
mod test {
use crate::proxy::proxy_handler::proxy_handler;
use actix_web::{
App, HttpRequest, HttpResponse, Responder, http,
test::{self},
web::{self, Bytes},
};
#[actix_web::test]
#[ignore]
async fn test_proxy_handler_success() {
todo!()
}
#[actix_web::test]
async fn test_path_parsing() {
let test_cases = vec![
("simple_name_match", "/function/echo", "echo", "", 200),
(
"simple_name_match",
"/function/echo.faasd-in-rs-fn",
"echo.faasd-in-rs-fn",
"",
200,
),
(
"simple_name_match_with_trailing_slash",
"/function/echo/",
"echo",
"",
200,
),
(
"name_match_with_additional_path_values",
"/function/echo/subPath/extras",
"echo",
"subPath/extras",
200,
),
(
"name_match_with_additional_path_values_and_querystring",
"/function/echo/subPath/extras?query=true",
"echo",
"subPath/extras",
200,
),
("not_found_if_no_name", "/function/", "", "", 404),
];
let app = test::init_service(
App::new()
.route("/function/{name}", web::get().to(var_handler))
.route("/function/{name}/", web::get().to(var_handler))
.route("/function/{name}/{params:.*}", web::get().to(var_handler)),
)
.await;
for (name, path, function_name, extra_path, status_code) in test_cases {
let req = test::TestRequest::get().uri(path).to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status().as_u16(), status_code, "Test case: {}", name);
if status_code == 200 {
let body = test::read_body(resp).await;
let expected_body = format!("name: {} params: {}", function_name, extra_path);
assert_eq!(body, expected_body.as_bytes(), "Test case: {}", name);
}
}
}
#[actix_web::test]
async fn test_handler_func_invalid_method() {
let app = test::init_service(
App::new().route("/function/{name}{path:/?.*}", web::to(proxy_handler)),
)
.await;
let req = test::TestRequest::with_uri("/function/test-service/path")
.method(http::Method::from_bytes(b"INVALID").unwrap())
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), http::StatusCode::METHOD_NOT_ALLOWED);
}
#[actix_web::test]
async fn test_handler_func_empty_function_nam() {
let app = test::init_service(
App::new().route("/function{name:/?}{path:/?.*}", web::to(proxy_handler)),
)
.await;
let req = test::TestRequest::post()
.uri("/function")
.insert_header((http::header::CONTENT_TYPE, "application/json"))
.set_payload(Bytes::from_static(b"{\"key\":\"value\"}"))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
}
#[actix_web::test]
async fn test_handler_func_empty_function_name() {
let app = test::init_service(
App::new().route("/function{name:/?}{path:/?.*}", web::to(proxy_handler)),
)
.await;
let req = test::TestRequest::post()
.uri("/function")
.insert_header((http::header::CONTENT_TYPE, "application/json"))
.set_payload(Bytes::from_static(b"{\"key\":\"value\"}"))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
}
async fn var_handler(req: HttpRequest) -> impl Responder {
let vars = req.match_info();
HttpResponse::Ok().body(format!(
"name: {} params: {}",
vars.get("name").unwrap_or(""),
vars.get("params").unwrap_or("")
))
}
}