feat(proxy): invoke functions logic(#22)

* fix:move trait IAMHandler to types/

* feat:enable to resolve request and to set proxy for request
This commit is contained in:
DoL 2025-04-03 14:22:32 +08:00 committed by GitHub
parent 1830e40326
commit 2f9f6c4ca9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 240 additions and 44 deletions

1
Cargo.lock generated
View File

@ -2054,6 +2054,7 @@ dependencies = [
"thiserror 1.0.69",
"tokio",
"tower 0.4.13",
"url",
"uuid",
]

View File

@ -30,3 +30,4 @@ async-trait = "0.1"
lazy_static = "1.4.0"
log = "0.4"
my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" }
url = "2.4"

View File

@ -1,14 +1,14 @@
#[allow(unused)]
const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn";
pub const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn";
#[allow(unused)]
const NAMESPACE_LABEL: &str = "faasrs";
pub const NAMESPACE_LABEL: &str = "faasrs";
#[allow(unused)]
const FAASRS_NAMESPACE: &str = "faasrs";
pub const FAASRS_NAMESPACE: &str = "faasrs";
#[allow(unused)]
const FAASRS_SERVICE_PULL_ALWAYS: bool = false;
pub const FAASRS_SERVICE_PULL_ALWAYS: bool = false;
#[allow(unused)]
const DEFAULT_SNAPSHOTTER: &str = "overlayfs";
pub const DEFAULT_SNAPSHOTTER: &str = "overlayfs";

View File

@ -1,4 +1,4 @@
use super::IAmHandler;
use crate::types::config::IAmHandler;
use crate::types::{self, CreateContainerInfo};
use actix_web::HttpResponse;
use service::Service;
@ -13,7 +13,7 @@ impl IAmHandler for DeployHandler {
type Input = CreateContainerInfo;
// type Output = String;
async fn execute(&self, input: Self::Input) -> impl actix_web::Responder {
async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder {
let cid = input.container_id.clone();
let image = input.image.clone();
let ns = input.ns.clone();

View File

@ -1,16 +1,16 @@
use std::{collections::HashMap, time::SystemTime};
use crate::types::config::IAmHandler;
use actix_web::HttpResponse;
use std::{collections::HashMap, time::SystemTime};
pub struct FunctionLister {
service: std::sync::Arc<service::Service>,
}
impl super::IAmHandler for FunctionLister {
impl IAmHandler for FunctionLister {
type Input = String;
// type Output = Vec<String>;
async fn execute(&self, input: Self::Input) -> impl actix_web::Responder {
async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder {
// faasd进来的第一步是验证命名空间的标签是否具有某个值也就是验证是否为true确保命名空间有效
// 但是这里省略,因为好像标签为空?

View File

@ -0,0 +1,56 @@
use crate::consts::DEFAULT_FUNCTION_NAMESPACE;
use crate::handlers::function_get::get_function;
use actix_web::{Error, error::ErrorInternalServerError};
use log;
use url::Url;
pub struct InvokeResolver {
client: service::Service,
}
impl InvokeResolver {
pub async fn new(client: service::Service) -> Self {
Self { client }
}
pub async fn resolve(&self, function_name: &str) -> Result<Url, Error> {
//根据函数名和containerd获取函数ip
//从函数名称中提取命名空间。如果函数名称中包含 .,则将其后的部分作为命名空间;否则使用默认命名空间
let mut actual_function_name = function_name;
let namespace = get_namespace_or_default(function_name, DEFAULT_FUNCTION_NAMESPACE);
if function_name.contains('.') {
actual_function_name = function_name.trim_end_matches(&format!(".{}", namespace));
}
let function = match get_function(&self.client, actual_function_name, &namespace).await {
Ok(function) => function,
Err(e) => {
log::error!("Failed to get function:{}", e);
return Err(ErrorInternalServerError("Failed to get function"));
}
};
let ip = function.ip;
let port = 8080;
let urlstr = format!("http://{}:{}", ip, port);
match Url::parse(&urlstr) {
Ok(url) => Ok(url),
Err(e) => {
log::error!("Failed to resolve url:{}", e);
Err(ErrorInternalServerError("Failed to resolve URL"))
}
}
}
}
fn get_namespace_or_default(function_name: &str, default_namespace: &str) -> String {
let mut namespace = default_namespace.to_string();
if function_name.contains('.') {
if let Some(index) = function_name.rfind('.') {
namespace = function_name[index + 1..].to_string();
}
}
namespace
}

View File

@ -1,10 +1,10 @@
pub mod deploy;
pub mod function_get;
pub mod function_list;
pub mod invoke_resolver;
pub mod namespace_list;
use actix_web::{HttpRequest, HttpResponse, Responder};
use serde::de::DeserializeOwned;
pub async fn function_lister(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("函数列表")
@ -108,17 +108,3 @@ impl From<std::io::Error> for FaasError {
}
}
}
pub trait IAmHandler {
type Input: DeserializeOwned + Send + 'static;
// type Output: Serialize + Send + 'static;
// /// 获取Handler元数据函数名、超时时间等
// fn metadata(&self) -> HandlerMeta;
/// 执行核心逻辑
fn execute(
&self,
input: Self::Input,
) -> impl std::future::Future<Output = impl Responder> + Send;
}

View File

@ -1,4 +1,4 @@
use super::IAmHandler;
use crate::types::config::IAmHandler;
use actix_web::{HttpResponse, Responder};
use service::Service;
use std::sync::Arc;
@ -10,7 +10,7 @@ pub struct NamespaceLister {
impl IAmHandler for NamespaceLister {
type Input = ();
// type Output = Vec<String>;
async fn execute(&self, _input: Self::Input) -> impl Responder {
async fn execute(&mut self, _input: Self::Input) -> impl Responder {
let ns_list = self.service.list_namespaces().await.unwrap();
HttpResponse::Ok().json(ns_list)
}

View File

@ -1,5 +1,6 @@
pub mod bootstrap;
pub mod config;
pub mod consts;
pub mod handlers;
pub mod httputils;
pub mod logs;

View File

@ -1 +1,10 @@
pub mod proxy_handler;
use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig};
use actix_web::{HttpRequest, web};
pub struct ProxyHandlerInfo {
req: HttpRequest,
payload: web::Payload,
config: FaaSConfig,
resolver: Option<InvokeResolver>,
}

View File

@ -0,0 +1,147 @@
use futures::StreamExt;
use std::time::Duration;
use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web};
use reqwest::{Client, RequestBuilder, redirect};
use url::Url;
use crate::{
handlers::invoke_resolver::InvokeResolver,
types::config::{FaaSConfig, IAmHandler},
};
use super::ProxyHandlerInfo;
pub struct Proxy {}
impl IAmHandler for Proxy {
type Input = ProxyHandlerInfo;
async fn execute(&mut self, input: Self::Input) -> impl Responder {
let resolver = input
.resolver
.expect("empty proxy handler resolver, cannot be nil");
let req = input.req;
let config = input.config;
let mut payload = input.payload;
let proxy_client = new_proxy_client_from_config(config).await;
match *req.method() {
Method::POST
| Method::PUT
| Method::DELETE
| Method::GET
| Method::PATCH
| Method::HEAD
| Method::OPTIONS => {
match proxy_request(&req, &mut payload, &proxy_client, &resolver).await {
Ok(resp) => resp,
Err(e) => HttpResponse::InternalServerError().body(e.to_string()),
}
}
_ => HttpResponse::MethodNotAllowed().body("method not allowed"),
}
}
}
//构建client
async fn new_proxy_client_from_config(config: FaaSConfig) -> Client {
new_proxy_client(
config.get_read_timeout(),
/*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(),
)
.await
}
//根据FaasConfig参数来设置Client
async fn new_proxy_client(
timeout: Duration,
//max_idle_conns: usize,
max_idle_conns_per_host: usize,
) -> Client {
Client::builder()
.connect_timeout(timeout)
.timeout(timeout)
.pool_max_idle_per_host(max_idle_conns_per_host)
.pool_idle_timeout(Duration::from_millis(120))
.tcp_keepalive(120 * Duration::from_secs(1))
.redirect(redirect::Policy::none())
.tcp_nodelay(true)
.build()
.expect("Failed to create client")
}
//根据原始请求解析url构建转发请求并转发获取响应
async fn proxy_request(
req: &HttpRequest,
payload: &mut web::Payload,
proxy_client: &Client,
resolver: &InvokeResolver,
) -> Result<HttpResponse, Error> {
let function_name = req.match_info().get("name").unwrap_or("");
if function_name.is_empty() {
return Ok(HttpResponse::BadRequest().body("provide function name in path"));
}
let function_addr = match resolver.resolve(function_name).await {
Ok(function_addr) => function_addr,
Err(e) => return Ok(HttpResponse::BadRequest().body(e.to_string())),
};
let proxy_req = build_proxy_request(req, &function_addr, proxy_client, payload).await?;
match proxy_req.send().await {
Ok(resp) => {
let status = resp.status();
let mut client_resp = HttpResponse::build(status);
for (name, value) in resp.headers().iter() {
client_resp.insert_header((name.clone(), value.clone()));
}
let body = resp.bytes().await.unwrap();
Ok(client_resp.body(body))
}
Err(e) => Ok(HttpResponse::BadGateway().body(e.to_string())),
}
}
//根据URL和原始请求来构建转发请求并对请求头进行处理
async fn build_proxy_request(
req: &HttpRequest,
base_url: &Url,
proxy_client: &Client,
payload: &mut web::Payload,
) -> Result<RequestBuilder, Error> {
let origin_url = base_url.join(req.uri().path()).unwrap();
let remaining_segments = origin_url.path_segments().unwrap().skip(2);
let rest_path = remaining_segments.collect::<Vec<_>>().join("/");
let url = base_url.join(&rest_path).unwrap();
let mut proxy_req = proxy_client
.request(req.method().clone(), url)
.headers(req.headers().clone().into());
if req.headers().get("X-Forwarded-Host").is_none() {
if let Some(host) = req.headers().get("Host") {
proxy_req = proxy_req.header("X-Forwarded-Host", host);
}
}
if req.headers().get("X-Forwarded-For").is_none() {
if let Some(remote_addr) = req.peer_addr() {
proxy_req = proxy_req.header("X-Forwarded-For", remote_addr.to_string());
}
}
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
body.extend_from_slice(&chunk);
}
let body_bytes = body.freeze();
let proxy_req = proxy_req.body(body_bytes);
Ok(proxy_req)
}

View File

@ -1,23 +1,18 @@
use actix_web::Responder;
use std::time::Duration;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_MAX_IDLE_CONNS: usize = 1024;
pub struct FaasHandler<S> {
pub list_namespaces: S,
pub mutate_namespace: S,
pub function_proxy: S,
pub function_lister: S,
pub deploy_function: S,
pub update_function: S,
pub delete_function: S,
pub function_status: S,
pub scale_function: S,
pub secrets: S,
pub logs: S,
pub health: Option<S>,
pub info: S,
pub telemetry: S,
pub trait IAmHandler {
type Input;
// type Output: Serialize + Send + 'static;
// /// 获取Handler元数据函数名、超时时间等
// fn metadata(&self) -> HandlerMeta;
/// 执行核心逻辑
fn execute(&mut self, input: Self::Input) -> impl std::future::Future<Output = impl Responder> /*+ Send*/;
}
#[derive(Debug, Clone)]