diff --git a/Cargo.lock b/Cargo.lock index 6529716..98cfffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2054,6 +2054,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tower 0.4.13", + "url", "uuid", ] diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 21dde36..b2da58a 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -29,4 +29,5 @@ cni = { path = "../cni" } async-trait = "0.1" lazy_static = "1.4.0" log = "0.4" -my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } \ No newline at end of file +my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } +url = "2.4" \ No newline at end of file diff --git a/crates/provider/src/consts.rs b/crates/provider/src/consts.rs index fe33525..86ec2d8 100644 --- a/crates/provider/src/consts.rs +++ b/crates/provider/src/consts.rs @@ -1,14 +1,14 @@ #[allow(unused)] -const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn"; +pub const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn"; #[allow(unused)] -const NAMESPACE_LABEL: &str = "faasrs"; +pub const NAMESPACE_LABEL: &str = "faasrs"; #[allow(unused)] -const FAASRS_NAMESPACE: &str = "faasrs"; +pub const FAASRS_NAMESPACE: &str = "faasrs"; #[allow(unused)] -const FAASRS_SERVICE_PULL_ALWAYS: bool = false; +pub const FAASRS_SERVICE_PULL_ALWAYS: bool = false; #[allow(unused)] -const DEFAULT_SNAPSHOTTER: &str = "overlayfs"; +pub const DEFAULT_SNAPSHOTTER: &str = "overlayfs"; diff --git a/crates/provider/src/handlers/deploy.rs b/crates/provider/src/handlers/deploy.rs index bff9d91..2a0c233 100644 --- a/crates/provider/src/handlers/deploy.rs +++ b/crates/provider/src/handlers/deploy.rs @@ -1,4 +1,4 @@ -use super::IAmHandler; +use crate::types::config::IAmHandler; use crate::types::{self, CreateContainerInfo}; use actix_web::HttpResponse; use service::Service; @@ -13,7 +13,7 @@ impl IAmHandler for DeployHandler { type Input = CreateContainerInfo; // type Output = String; - async fn execute(&self, input: Self::Input) -> impl actix_web::Responder { + async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder { let cid = input.container_id.clone(); let image = input.image.clone(); let ns = input.ns.clone(); diff --git a/crates/provider/src/handlers/function_list.rs b/crates/provider/src/handlers/function_list.rs index 56b6c3d..6781abc 100644 --- a/crates/provider/src/handlers/function_list.rs +++ b/crates/provider/src/handlers/function_list.rs @@ -1,16 +1,16 @@ -use std::{collections::HashMap, time::SystemTime}; - +use crate::types::config::IAmHandler; use actix_web::HttpResponse; +use std::{collections::HashMap, time::SystemTime}; pub struct FunctionLister { service: std::sync::Arc, } -impl super::IAmHandler for FunctionLister { +impl IAmHandler for FunctionLister { type Input = String; // type Output = Vec; - async fn execute(&self, input: Self::Input) -> impl actix_web::Responder { + async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder { // faasd进来的第一步是验证命名空间的标签是否具有某个值,也就是验证是否为true,确保命名空间有效 // 但是这里省略,因为好像标签为空? diff --git a/crates/provider/src/handlers/invoke_resolver.rs b/crates/provider/src/handlers/invoke_resolver.rs new file mode 100644 index 0000000..1f1a87a --- /dev/null +++ b/crates/provider/src/handlers/invoke_resolver.rs @@ -0,0 +1,56 @@ +use crate::consts::DEFAULT_FUNCTION_NAMESPACE; +use crate::handlers::function_get::get_function; +use actix_web::{Error, error::ErrorInternalServerError}; +use log; +use url::Url; + +pub struct InvokeResolver { + client: service::Service, +} + +impl InvokeResolver { + pub async fn new(client: service::Service) -> Self { + Self { client } + } + + pub async fn resolve(&self, function_name: &str) -> Result { + //根据函数名和containerd获取函数ip, + //从函数名称中提取命名空间。如果函数名称中包含 .,则将其后的部分作为命名空间;否则使用默认命名空间 + + let mut actual_function_name = function_name; + let namespace = get_namespace_or_default(function_name, DEFAULT_FUNCTION_NAMESPACE); + if function_name.contains('.') { + actual_function_name = function_name.trim_end_matches(&format!(".{}", namespace)); + } + + let function = match get_function(&self.client, actual_function_name, &namespace).await { + Ok(function) => function, + Err(e) => { + log::error!("Failed to get function:{}", e); + return Err(ErrorInternalServerError("Failed to get function")); + } + }; + + let ip = function.ip; + let port = 8080; + + let urlstr = format!("http://{}:{}", ip, port); + match Url::parse(&urlstr) { + Ok(url) => Ok(url), + Err(e) => { + log::error!("Failed to resolve url:{}", e); + Err(ErrorInternalServerError("Failed to resolve URL")) + } + } + } +} + +fn get_namespace_or_default(function_name: &str, default_namespace: &str) -> String { + let mut namespace = default_namespace.to_string(); + if function_name.contains('.') { + if let Some(index) = function_name.rfind('.') { + namespace = function_name[index + 1..].to_string(); + } + } + namespace +} diff --git a/crates/provider/src/handlers/mod.rs b/crates/provider/src/handlers/mod.rs index 9cc6ee0..39e5431 100644 --- a/crates/provider/src/handlers/mod.rs +++ b/crates/provider/src/handlers/mod.rs @@ -1,10 +1,10 @@ pub mod deploy; pub mod function_get; pub mod function_list; +pub mod invoke_resolver; pub mod namespace_list; use actix_web::{HttpRequest, HttpResponse, Responder}; -use serde::de::DeserializeOwned; pub async fn function_lister(_req: HttpRequest) -> impl Responder { HttpResponse::Ok().body("函数列表") @@ -108,17 +108,3 @@ impl From for FaasError { } } } - -pub trait IAmHandler { - type Input: DeserializeOwned + Send + 'static; - // type Output: Serialize + Send + 'static; - - // /// 获取Handler元数据(函数名、超时时间等) - // fn metadata(&self) -> HandlerMeta; - - /// 执行核心逻辑 - fn execute( - &self, - input: Self::Input, - ) -> impl std::future::Future + Send; -} diff --git a/crates/provider/src/handlers/namespace_list.rs b/crates/provider/src/handlers/namespace_list.rs index 17a2780..59b0ff4 100644 --- a/crates/provider/src/handlers/namespace_list.rs +++ b/crates/provider/src/handlers/namespace_list.rs @@ -1,4 +1,4 @@ -use super::IAmHandler; +use crate::types::config::IAmHandler; use actix_web::{HttpResponse, Responder}; use service::Service; use std::sync::Arc; @@ -10,7 +10,7 @@ pub struct NamespaceLister { impl IAmHandler for NamespaceLister { type Input = (); // type Output = Vec; - async fn execute(&self, _input: Self::Input) -> impl Responder { + async fn execute(&mut self, _input: Self::Input) -> impl Responder { let ns_list = self.service.list_namespaces().await.unwrap(); HttpResponse::Ok().json(ns_list) } diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 71fb673..437d4a0 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -1,5 +1,6 @@ pub mod bootstrap; pub mod config; +pub mod consts; pub mod handlers; pub mod httputils; pub mod logs; diff --git a/crates/provider/src/proxy/mod.rs b/crates/provider/src/proxy/mod.rs index 8b13789..398d48c 100644 --- a/crates/provider/src/proxy/mod.rs +++ b/crates/provider/src/proxy/mod.rs @@ -1 +1,10 @@ +pub mod proxy_handler; +use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig}; +use actix_web::{HttpRequest, web}; +pub struct ProxyHandlerInfo { + req: HttpRequest, + payload: web::Payload, + config: FaaSConfig, + resolver: Option, +} diff --git a/crates/provider/src/proxy/proxy_handler.rs b/crates/provider/src/proxy/proxy_handler.rs new file mode 100644 index 0000000..a562583 --- /dev/null +++ b/crates/provider/src/proxy/proxy_handler.rs @@ -0,0 +1,147 @@ +use futures::StreamExt; +use std::time::Duration; + +use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web}; +use reqwest::{Client, RequestBuilder, redirect}; +use url::Url; + +use crate::{ + handlers::invoke_resolver::InvokeResolver, + types::config::{FaaSConfig, IAmHandler}, +}; + +use super::ProxyHandlerInfo; + +pub struct Proxy {} + +impl IAmHandler for Proxy { + type Input = ProxyHandlerInfo; + + async fn execute(&mut self, input: Self::Input) -> impl Responder { + let resolver = input + .resolver + .expect("empty proxy handler resolver, cannot be nil"); + let req = input.req; + let config = input.config; + let mut payload = input.payload; + + let proxy_client = new_proxy_client_from_config(config).await; + + match *req.method() { + Method::POST + | Method::PUT + | Method::DELETE + | Method::GET + | Method::PATCH + | Method::HEAD + | Method::OPTIONS => { + match proxy_request(&req, &mut payload, &proxy_client, &resolver).await { + Ok(resp) => resp, + Err(e) => HttpResponse::InternalServerError().body(e.to_string()), + } + } + _ => HttpResponse::MethodNotAllowed().body("method not allowed"), + } + } +} + +//构建client +async fn new_proxy_client_from_config(config: FaaSConfig) -> Client { + new_proxy_client( + config.get_read_timeout(), + /*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(), + ) + .await +} + +//根据FaasConfig参数来设置Client +async fn new_proxy_client( + timeout: Duration, + //max_idle_conns: usize, + max_idle_conns_per_host: usize, +) -> Client { + Client::builder() + .connect_timeout(timeout) + .timeout(timeout) + .pool_max_idle_per_host(max_idle_conns_per_host) + .pool_idle_timeout(Duration::from_millis(120)) + .tcp_keepalive(120 * Duration::from_secs(1)) + .redirect(redirect::Policy::none()) + .tcp_nodelay(true) + .build() + .expect("Failed to create client") +} + +//根据原始请求,解析url,构建转发请求并转发,获取响应 +async fn proxy_request( + req: &HttpRequest, + payload: &mut web::Payload, + proxy_client: &Client, + resolver: &InvokeResolver, +) -> Result { + let function_name = req.match_info().get("name").unwrap_or(""); + if function_name.is_empty() { + return Ok(HttpResponse::BadRequest().body("provide function name in path")); + } + + let function_addr = match resolver.resolve(function_name).await { + Ok(function_addr) => function_addr, + Err(e) => return Ok(HttpResponse::BadRequest().body(e.to_string())), + }; + + let proxy_req = build_proxy_request(req, &function_addr, proxy_client, payload).await?; + + match proxy_req.send().await { + Ok(resp) => { + let status = resp.status(); + let mut client_resp = HttpResponse::build(status); + + for (name, value) in resp.headers().iter() { + client_resp.insert_header((name.clone(), value.clone())); + } + + let body = resp.bytes().await.unwrap(); + + Ok(client_resp.body(body)) + } + Err(e) => Ok(HttpResponse::BadGateway().body(e.to_string())), + } +} + +//根据URL和原始请求来构建转发请求,并对请求头进行处理 +async fn build_proxy_request( + req: &HttpRequest, + base_url: &Url, + proxy_client: &Client, + payload: &mut web::Payload, +) -> Result { + let origin_url = base_url.join(req.uri().path()).unwrap(); + let remaining_segments = origin_url.path_segments().unwrap().skip(2); + let rest_path = remaining_segments.collect::>().join("/"); + let url = base_url.join(&rest_path).unwrap(); + let mut proxy_req = proxy_client + .request(req.method().clone(), url) + .headers(req.headers().clone().into()); + + if req.headers().get("X-Forwarded-Host").is_none() { + if let Some(host) = req.headers().get("Host") { + proxy_req = proxy_req.header("X-Forwarded-Host", host); + } + } + + if req.headers().get("X-Forwarded-For").is_none() { + if let Some(remote_addr) = req.peer_addr() { + proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string()); + } + } + + let mut body = web::BytesMut::new(); + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + body.extend_from_slice(&chunk); + } + let body_bytes = body.freeze(); + let proxy_req = proxy_req.body(body_bytes); + + Ok(proxy_req) +} diff --git a/crates/provider/src/types/config.rs b/crates/provider/src/types/config.rs index 1567bec..22d0a2a 100644 --- a/crates/provider/src/types/config.rs +++ b/crates/provider/src/types/config.rs @@ -1,23 +1,18 @@ +use actix_web::Responder; use std::time::Duration; const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_MAX_IDLE_CONNS: usize = 1024; -pub struct FaasHandler { - pub list_namespaces: S, - pub mutate_namespace: S, - pub function_proxy: S, - pub function_lister: S, - pub deploy_function: S, - pub update_function: S, - pub delete_function: S, - pub function_status: S, - pub scale_function: S, - pub secrets: S, - pub logs: S, - pub health: Option, - pub info: S, - pub telemetry: S, +pub trait IAmHandler { + type Input; + // type Output: Serialize + Send + 'static; + + // /// 获取Handler元数据(函数名、超时时间等) + // fn metadata(&self) -> HandlerMeta; + + /// 执行核心逻辑 + fn execute(&mut self, input: Self::Input) -> impl std::future::Future /*+ Send*/; } #[derive(Debug, Clone)]