From 1acbab92fc8a69d13de9b286204090cecb49a5d9 Mon Sep 17 00:00:00 2001 From: DoL <145818730+dolzhuying@users.noreply.github.com> Date: Sat, 12 Apr 2025 00:02:49 +0800 Subject: [PATCH] =?UTF-8?q?feat(gateway)=EF=BC=9Aenable=20to=20deploy=20an?= =?UTF-8?q?d=20delete=20function=20with=20request=20(#54)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix:move trait IAMHandler to types/ * feat:enable to resolve request and to set proxy for request * save implementation for trait IAMHandler * remove trait IAMHandler and add deploy and delete * add feishu_bot.yml * 修复一点ctr逻辑 (#29) * 修复一点逻辑 * fmt * feat:remove trait IAMHandler and bind fucntion to gateway * resolve conflict in cni_network * fix(delete): fix delete task. * fix(clippy): fmt * fix:remove darwin and launch.json * fix:hakari generate * fix: remove files and build targets * fix: some format problems --------- Co-authored-by: vitus213 <2811215248@qq.com> Co-authored-by: 火花 Co-authored-by: scutKKsix <1129332011@qq.com> Co-authored-by: Samuka007 --- .config/hakari.toml | 4 +- .env | 4 +- Cargo.lock | 59 ++++++++- crates/app/Cargo.toml | 7 +- crates/app/src/main.rs | 17 ++- crates/cni/Cargo.toml | 3 + crates/my-workspace-hack/Cargo.toml | 10 +- crates/provider/Cargo.toml | 4 +- crates/provider/src/consts.rs | 2 +- crates/provider/src/handlers/delete.rs | 64 ++++++++++ crates/provider/src/handlers/deploy.rs | 112 +++++++++++++++--- crates/provider/src/handlers/function_get.rs | 5 +- crates/provider/src/handlers/function_list.rs | 28 ----- crates/provider/src/handlers/mod.rs | 3 +- .../provider/src/handlers/namespace_list.rs | 17 --- crates/provider/src/handlers/utils.rs | 65 ++++++++++ crates/provider/src/proxy/mod.rs | 9 -- crates/provider/src/proxy/proxy_handler.rs | 65 +++++----- crates/provider/src/types/config.rs | 12 -- .../provider/src/types/function_deployment.rs | 83 +++++++------ crates/service/src/lib.rs | 64 +++++++--- crates/service/src/spec.rs | 4 +- flake.nix | 6 +- 23 files changed, 439 insertions(+), 208 deletions(-) create mode 100644 crates/provider/src/handlers/delete.rs delete mode 100644 crates/provider/src/handlers/namespace_list.rs create mode 100644 crates/provider/src/handlers/utils.rs diff --git a/.config/hakari.toml b/.config/hakari.toml index f43ae8f..97685dd 100644 --- a/.config/hakari.toml +++ b/.config/hakari.toml @@ -18,9 +18,7 @@ resolver = "2" # https://doc.rust-lang.org/rustc/platform-support.html platforms = [ "x86_64-unknown-linux-gnu", - # "x86_64-apple-darwin", - # "aarch64-apple-darwin", - # "x86_64-pc-windows-msvc", + "aarch64-unknown-linux-gnu", ] # Write out exact versions rather than a semver range. (Defaults to false.) diff --git a/.env b/.env index 54e084f..6fa4683 100644 --- a/.env +++ b/.env @@ -1,5 +1,5 @@ # cni插件的路径 -CNI_BIN_DIR= "/opt/cni/bin" +CNI_BIN_DIR= "/nix/store/vrnv8mvvbfj04zma6hr035chj0x5f5i3-cni-plugins-1.6.1/bin" CNI_CONF_DIR= "/etc/cni/net.d" # 你的cnitool的路径 -CNI_TOOL = "/home/dragonos/MY/gopath/bin/cnitool" \ No newline at end of file +CNI_TOOL = "/nix/store/c1ig375fgbv0ykv3amy94ps5sn0cyi7c-cni-1.2.3/bin/cnitool" \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7972f74..539a748 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,7 +35,7 @@ dependencies = [ "brotli", "bytes", "bytestring", - "derive_more", + "derive_more 0.99.18", "encoding_rs", "flate2", "futures-core", @@ -151,7 +151,7 @@ dependencies = [ "bytestring", "cfg-if", "cookie", - "derive_more", + "derive_more 0.99.18", "encoding_rs", "futures-core", "futures-util", @@ -517,7 +517,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "cni" version = "0.1.0" dependencies = [ + "dotenv", "lazy_static", + "log", "my-workspace-hack", "serde_json", ] @@ -560,6 +562,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cookie" version = "0.16.2" @@ -731,13 +742,35 @@ version = "0.99.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version", "syn 2.0.96", ] +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "convert_case 0.7.1", + "proc-macro2", + "quote", + "syn 2.0.96", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -816,6 +849,7 @@ dependencies = [ "actix-web", "dotenv", "my-workspace-hack", + "provider", "serde 1.0.217", "serde_json", "service", @@ -1573,9 +1607,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.25" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "matchit" @@ -1627,7 +1661,6 @@ name = "my-workspace-hack" version = "0.1.0" dependencies = [ "actix-router", - "bitflags 2.8.0", "bytes", "futures-channel", "futures-task", @@ -2046,6 +2079,7 @@ dependencies = [ "bollard", "cni", "config", + "derive_more 2.0.1", "futures", "futures-util", "hyper 0.14.32", @@ -2061,6 +2095,7 @@ dependencies = [ "tempfile", "thiserror 1.0.69", "tokio", + "tonic", "tower 0.4.13", "url", "uuid", @@ -2886,6 +2921,18 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "url" version = "2.5.4" diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index d58274b..03b3d32 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -4,10 +4,11 @@ version = "0.1.0" edition = "2024" [dependencies] -actix-web = "4.0" +actix-web = "4.5.1" tokio = { version = "1", features = ["full"] } -service = { path = "../service" } +service = { path = "../service" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } -dotenv = "0.15" \ No newline at end of file +provider = { path = "../provider" } +dotenv = "0.15" diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index 7dd33ab..ec4cc64 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -7,6 +7,10 @@ pub mod handlers; pub mod types; use handlers::*; +use provider::{ + handlers::{delete::delete_handler, deploy::deploy_handler}, + proxy::proxy_handler::proxy_handler, +}; #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -19,17 +23,22 @@ async fn main() -> std::io::Result<()> { println!("I'm running!"); - HttpServer::new(move || { + let server = HttpServer::new(move || { App::new() .app_data(web::Data::new(service.clone())) .route("/create-container", web::post().to(create_container)) .route("/remove-container", web::post().to(remove_container)) .route("/containers", web::get().to(get_container_list)) + .route("/system/functions", web::post().to(deploy_handler)) + .route("/system/functions", web::delete().to(delete_handler)) + .route("/function/{name}{path:/?.*}", web::to(proxy_handler)) // 更多路由配置... }) - .bind("0.0.0.0:18080")? - .run() - .await + .bind("0.0.0.0:8090")?; + + println!("0.0.0.0:8090"); + + server.run().await } // 测试env能够正常获取 diff --git a/crates/cni/Cargo.toml b/crates/cni/Cargo.toml index 8012e33..6d156bd 100644 --- a/crates/cni/Cargo.toml +++ b/crates/cni/Cargo.toml @@ -8,4 +8,7 @@ edition = "2024" [dependencies] serde_json = "1.0" my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } +log = "0.4.27" +dotenv = "0.15.0" + lazy_static = "1.4.0" diff --git a/crates/my-workspace-hack/Cargo.toml b/crates/my-workspace-hack/Cargo.toml index 02f5775..c30d89f 100644 --- a/crates/my-workspace-hack/Cargo.toml +++ b/crates/my-workspace-hack/Cargo.toml @@ -50,12 +50,18 @@ tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1", default-features = false, features = ["std"] } [target.x86_64-unknown-linux-gnu.dependencies] -bitflags = { version = "2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] } libc = { version = "0.2" } [target.x86_64-unknown-linux-gnu.build-dependencies] -bitflags = { version = "2", default-features = false, features = ["std"] } +getrandom = { version = "0.2", default-features = false, features = ["std"] } +libc = { version = "0.2" } + +[target.aarch64-unknown-linux-gnu.dependencies] +getrandom = { version = "0.2", default-features = false, features = ["std"] } +libc = { version = "0.2" } + +[target.aarch64-unknown-linux-gnu.build-dependencies] getrandom = { version = "0.2", default-features = false, features = ["std"] } libc = { version = "0.2" } diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index b2da58a..f93b75e 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -30,4 +30,6 @@ async-trait = "0.1" lazy_static = "1.4.0" log = "0.4" my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } -url = "2.4" \ No newline at end of file +url = "2.4" +derive_more = { version = "2", features = ["full"] } +tonic = "0.12" diff --git a/crates/provider/src/consts.rs b/crates/provider/src/consts.rs index 86ec2d8..6158c0a 100644 --- a/crates/provider/src/consts.rs +++ b/crates/provider/src/consts.rs @@ -1,5 +1,5 @@ #[allow(unused)] -pub const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn"; +pub const DEFAULT_FUNCTION_NAMESPACE: &str = "default"; #[allow(unused)] pub const NAMESPACE_LABEL: &str = "faasrs"; diff --git a/crates/provider/src/handlers/delete.rs b/crates/provider/src/handlers/delete.rs new file mode 100644 index 0000000..2dee021 --- /dev/null +++ b/crates/provider/src/handlers/delete.rs @@ -0,0 +1,64 @@ +use crate::{ + consts, + handlers::{ + function_get::get_function, + utils::{CustomError, map_service_error}, + }, +}; +use actix_web::{HttpResponse, Responder, error, web}; +use serde::{Deserialize, Serialize}; +use service::Service; +use std::sync::Arc; + +pub async fn delete_handler( + service: web::Data>, + info: web::Json, +) -> impl Responder { + let function_name = info.function_name.clone(); + let namespace = info + .namespace + .clone() + .unwrap_or_else(|| consts::DEFAULT_FUNCTION_NAMESPACE.to_string()); + + match delete(&function_name, &namespace, &service).await { + Ok(()) => { + HttpResponse::Ok().body(format!("function {} deleted successfully", function_name)) + } + Err(e) => HttpResponse::InternalServerError().body(format!( + "failed to delete function {} in namespace {} because {}", + function_name, namespace, e + )), + } +} + +async fn delete( + function_name: &str, + namespace: &str, + service: &Arc, +) -> Result<(), CustomError> { + let namespaces = service.list_namespaces().await.map_err(map_service_error)?; + if !namespaces.contains(&namespace.to_string()) { + return Err(CustomError::ActixError(error::ErrorBadRequest(format!( + "Namespace '{}' not valid or does not exist", + namespace + )))); + } + let function = get_function(service, function_name, namespace).await?; + if function.replicas != 0 { + println!(" delete_cni_network ing {:?}", function.replicas); + cni::cni_network::delete_cni_network(namespace, function_name); + } else { + println!(" function.replicas {:?}", function.replicas); + } + service + .remove_container(function_name, namespace) + .await + .map_err(map_service_error)?; + Ok(()) +} + +#[derive(Serialize, Deserialize)] +pub struct DeleteContainerInfo { + pub function_name: String, + pub namespace: Option, +} diff --git a/crates/provider/src/handlers/deploy.rs b/crates/provider/src/handlers/deploy.rs index 2a0c233..b5a3ee8 100644 --- a/crates/provider/src/handlers/deploy.rs +++ b/crates/provider/src/handlers/deploy.rs @@ -1,26 +1,100 @@ -use crate::types::config::IAmHandler; -use crate::types::{self, CreateContainerInfo}; -use actix_web::HttpResponse; +use crate::{ + consts, + handlers::utils::{CustomError, map_service_error}, + types::function_deployment::{DeployFunctionInfo, FunctionDeployment}, +}; +use actix_web::{HttpResponse, Responder, web}; + use service::Service; use std::sync::Arc; -pub struct DeployHandler { - pub config: types::function_deployment::FunctionDeployment, - service: Arc, -} +pub async fn deploy_handler( + service: web::Data>, + info: web::Json, +) -> impl Responder { + let image = info.image.clone(); + let function_name = info.function_name.clone(); + let namespace = info + .namespace + .clone() + .unwrap_or(consts::DEFAULT_FUNCTION_NAMESPACE.to_string()); -impl IAmHandler for DeployHandler { - type Input = CreateContainerInfo; - // type Output = String; + let config = FunctionDeployment { + service: function_name, + image, + namespace: Some(namespace), + }; - async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder { - let cid = input.container_id.clone(); - let image = input.image.clone(); - let ns = input.ns.clone(); - self.service - .create_container(&image, &cid, &ns) - .await - .unwrap(); - HttpResponse::Ok().json(format!("Container {} created successfully!", cid)) + match deploy(&service, &config).await { + Ok(()) => HttpResponse::Accepted().body(format!( + "Function {} deployment initiated successfully .", + config.service + )), + Err(e) => HttpResponse::InternalServerError().body(format!( + "failed to deploy function {}, because {}", + config.service, e + )), } } + +async fn deploy(service: &Arc, config: &FunctionDeployment) -> Result<(), CustomError> { + // let namespaces = service + // .list_namespaces() + // .await + // .map_err(|e| map_service_error(e))?; + let namespace = config.namespace.clone().unwrap(); + + // if !namespaces.contains(&namespace) { + // return Err(CustomError::ActixError(error::ErrorBadRequest(format!( + // "Namespace '{}' not valid or does not exist", + // namespace + // )))); + // } + println!( + "Namespace '{}' validated.", + config.namespace.clone().unwrap() + ); + + let container_list = service + .get_container_list(&namespace) + .await + .map_err(CustomError::from)?; + + if container_list.contains(&config.service) { + return Err(CustomError::OtherError( + "container has been existed".to_string(), + )); + } + + service + .prepare_image(&config.image, &namespace, true) + .await + .map_err(map_service_error)?; + println!("Image '{}' validated", &config.image); + + service + .create_container(&config.image, &config.service, &namespace) + .await + .map_err(|e| CustomError::OtherError(format!("failed to create container:{}", e)))?; + + println!( + "Container {} created using image {} in namespace {}", + &config.service, &config.image, namespace + ); + + service + .create_and_start_task(&config.service, &namespace) + .await + .map_err(|e| { + CustomError::OtherError(format!( + "failed to start task for container {},{}", + &config.service, e + )) + })?; + println!( + "Task for container {} was created successfully", + &config.service + ); + + Ok(()) +} diff --git a/crates/provider/src/handlers/function_get.rs b/crates/provider/src/handlers/function_get.rs index 8600a55..d0a2e74 100644 --- a/crates/provider/src/handlers/function_get.rs +++ b/crates/provider/src/handlers/function_get.rs @@ -56,12 +56,13 @@ pub async fn get_function( match task { Ok(task) => { let status = task.status; - if status == 2 { + if status == 2 || status == 3 { pid = task.pid; replicas = 1; } } - Err(_) => { + Err(e) => { + eprintln!("Failed to get task: {}", e); replicas = 0; } } diff --git a/crates/provider/src/handlers/function_list.rs b/crates/provider/src/handlers/function_list.rs index 6781abc..b183cd9 100644 --- a/crates/provider/src/handlers/function_list.rs +++ b/crates/provider/src/handlers/function_list.rs @@ -1,33 +1,5 @@ -use crate::types::config::IAmHandler; -use actix_web::HttpResponse; use std::{collections::HashMap, time::SystemTime}; -pub struct FunctionLister { - service: std::sync::Arc, -} - -impl IAmHandler for FunctionLister { - type Input = String; - // type Output = Vec; - - async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder { - // faasd进来的第一步是验证命名空间的标签是否具有某个值,也就是验证是否为true,确保命名空间有效 - // 但是这里省略,因为好像标签为空? - - let containers = self - .service - .get_container_list(input.as_str()) - .await - .unwrap(); - - for container in containers.iter() { - log::debug!("container: {:?}", container); - } - - HttpResponse::Ok().json("函数列表") - } -} - pub struct Function { pub name: String, pub namespace: String, diff --git a/crates/provider/src/handlers/mod.rs b/crates/provider/src/handlers/mod.rs index 39e5431..54f986e 100644 --- a/crates/provider/src/handlers/mod.rs +++ b/crates/provider/src/handlers/mod.rs @@ -1,8 +1,9 @@ +pub mod delete; pub mod deploy; pub mod function_get; pub mod function_list; pub mod invoke_resolver; -pub mod namespace_list; +pub mod utils; use actix_web::{HttpRequest, HttpResponse, Responder}; diff --git a/crates/provider/src/handlers/namespace_list.rs b/crates/provider/src/handlers/namespace_list.rs deleted file mode 100644 index 59b0ff4..0000000 --- a/crates/provider/src/handlers/namespace_list.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::types::config::IAmHandler; -use actix_web::{HttpResponse, Responder}; -use service::Service; -use std::sync::Arc; - -pub struct NamespaceLister { - service: Arc, -} - -impl IAmHandler for NamespaceLister { - type Input = (); - // type Output = Vec; - async fn execute(&mut self, _input: Self::Input) -> impl Responder { - let ns_list = self.service.list_namespaces().await.unwrap(); - HttpResponse::Ok().json(ns_list) - } -} diff --git a/crates/provider/src/handlers/utils.rs b/crates/provider/src/handlers/utils.rs new file mode 100644 index 0000000..d96f97e --- /dev/null +++ b/crates/provider/src/handlers/utils.rs @@ -0,0 +1,65 @@ +use crate::handlers::function_get::FunctionError; +use actix_web::{Error, HttpResponse, ResponseError}; +use derive_more::Display; + +pub fn map_service_error(e: Box) -> Error { + eprintln!("Service error: {}", e); + actix_web::error::ErrorInternalServerError(format!("Operationfailed: {}", e)) +} + +//枚举错误类型,并非所有被调用的函数的错误类型都能实现调用函数的错误特征 +#[derive(Debug, Display)] +pub enum CustomError { + #[display("GrpcError: {}", _0)] + GrpcError(tonic::Status), + #[display("OtherError: {}", _0)] + OtherError(String), + #[display("ActixError: {}", _0)] + ActixError(actix_web::Error), + #[display("FunctionError: {}", _0)] + FunctionError(FunctionError), +} + +impl ResponseError for CustomError { + fn error_response(&self) -> HttpResponse { + match self { + CustomError::GrpcError(status) => { + // Customize the HTTP response based on the gRPC status + match status.code() { + tonic::Code::NotFound => { + HttpResponse::NotFound().body(status.message().to_string()) + } + tonic::Code::PermissionDenied => { + HttpResponse::Forbidden().body(status.message().to_string()) + } + _ => HttpResponse::InternalServerError().body(status.message().to_string()), + } + } + CustomError::OtherError(message) => { + HttpResponse::InternalServerError().body(message.clone()) + } + CustomError::ActixError(err) => err.error_response(), + CustomError::FunctionError(err) => { + HttpResponse::InternalServerError().body(err.to_string()) + } + } + } +} + +impl From for CustomError { + fn from(err: actix_web::Error) -> Self { + CustomError::ActixError(err) + } +} + +impl From for CustomError { + fn from(err: tonic::Status) -> Self { + CustomError::GrpcError(err) + } +} + +impl From for CustomError { + fn from(err: FunctionError) -> Self { + CustomError::FunctionError(err) + } +} diff --git a/crates/provider/src/proxy/mod.rs b/crates/provider/src/proxy/mod.rs index 398d48c..af3226f 100644 --- a/crates/provider/src/proxy/mod.rs +++ b/crates/provider/src/proxy/mod.rs @@ -1,10 +1 @@ pub mod proxy_handler; - -use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig}; -use actix_web::{HttpRequest, web}; -pub struct ProxyHandlerInfo { - req: HttpRequest, - payload: web::Payload, - config: FaaSConfig, - resolver: Option, -} diff --git a/crates/provider/src/proxy/proxy_handler.rs b/crates/provider/src/proxy/proxy_handler.rs index a562583..7df5a1f 100644 --- a/crates/provider/src/proxy/proxy_handler.rs +++ b/crates/provider/src/proxy/proxy_handler.rs @@ -5,48 +5,37 @@ use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web}; use reqwest::{Client, RequestBuilder, redirect}; use url::Url; -use crate::{ - handlers::invoke_resolver::InvokeResolver, - types::config::{FaaSConfig, IAmHandler}, -}; +use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig}; -use super::ProxyHandlerInfo; +pub async fn proxy_handler( + config: web::Data, + resolver: web::Data>, + req: HttpRequest, + payload: web::Payload, +) -> impl Responder { + let resolver_option = resolver.as_ref(); + let resolver = resolver_option + .as_ref() + .expect("empty proxy handler resolver, cannot be nil"); -pub struct Proxy {} + let proxy_client = new_proxy_client_from_config(config.as_ref()).await; -impl IAmHandler for Proxy { - type Input = ProxyHandlerInfo; - - async fn execute(&mut self, input: Self::Input) -> impl Responder { - let resolver = input - .resolver - .expect("empty proxy handler resolver, cannot be nil"); - let req = input.req; - let config = input.config; - let mut payload = input.payload; - - let proxy_client = new_proxy_client_from_config(config).await; - - match *req.method() { - Method::POST - | Method::PUT - | Method::DELETE - | Method::GET - | Method::PATCH - | Method::HEAD - | Method::OPTIONS => { - match proxy_request(&req, &mut payload, &proxy_client, &resolver).await { - Ok(resp) => resp, - Err(e) => HttpResponse::InternalServerError().body(e.to_string()), - } - } - _ => HttpResponse::MethodNotAllowed().body("method not allowed"), - } + match *req.method() { + Method::POST + | Method::PUT + | Method::DELETE + | Method::GET + | Method::PATCH + | Method::HEAD + | Method::OPTIONS => match proxy_request(&req, payload, &proxy_client, resolver).await { + Ok(resp) => resp, + Err(e) => HttpResponse::InternalServerError().body(e.to_string()), + }, + _ => HttpResponse::MethodNotAllowed().body("method not allowed"), } } - //构建client -async fn new_proxy_client_from_config(config: FaaSConfig) -> Client { +async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client { new_proxy_client( config.get_read_timeout(), /*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(), @@ -75,7 +64,7 @@ async fn new_proxy_client( //根据原始请求,解析url,构建转发请求并转发,获取响应 async fn proxy_request( req: &HttpRequest, - payload: &mut web::Payload, + payload: web::Payload, proxy_client: &Client, resolver: &InvokeResolver, ) -> Result { @@ -113,7 +102,7 @@ async fn build_proxy_request( req: &HttpRequest, base_url: &Url, proxy_client: &Client, - payload: &mut web::Payload, + mut payload: web::Payload, ) -> Result { let origin_url = base_url.join(req.uri().path()).unwrap(); let remaining_segments = origin_url.path_segments().unwrap().skip(2); diff --git a/crates/provider/src/types/config.rs b/crates/provider/src/types/config.rs index 22d0a2a..e58e31b 100644 --- a/crates/provider/src/types/config.rs +++ b/crates/provider/src/types/config.rs @@ -1,20 +1,8 @@ -use actix_web::Responder; use std::time::Duration; const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_MAX_IDLE_CONNS: usize = 1024; -pub trait IAmHandler { - type Input; - // type Output: Serialize + Send + 'static; - - // /// 获取Handler元数据(函数名、超时时间等) - // fn metadata(&self) -> HandlerMeta; - - /// 执行核心逻辑 - fn execute(&mut self, input: Self::Input) -> impl std::future::Future /*+ Send*/; -} - #[derive(Debug, Clone)] pub struct FaaSConfig { pub tcp_port: Option, diff --git a/crates/provider/src/types/function_deployment.rs b/crates/provider/src/types/function_deployment.rs index 731b499..4c33633 100644 --- a/crates/provider/src/types/function_deployment.rs +++ b/crates/provider/src/types/function_deployment.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +//use std::collections::HashMap; #[derive(Serialize, Deserialize, Debug)] pub struct FunctionDeployment { @@ -12,53 +12,60 @@ pub struct FunctionDeployment { /// Namespace for the function, if supported by the faas-provider #[serde(skip_serializing_if = "Option::is_none")] pub namespace: Option, + // /// EnvProcess overrides the fprocess environment variable and can be used + // /// with the watchdog + // #[serde(rename = "envProcess", skip_serializing_if = "Option::is_none")] + // pub env_process: Option, - /// EnvProcess overrides the fprocess environment variable and can be used - /// with the watchdog - #[serde(rename = "envProcess", skip_serializing_if = "Option::is_none")] - pub env_process: Option, + // /// EnvVars can be provided to set environment variables for the function runtime. + // #[serde(skip_serializing_if = "Option::is_none")] + // pub env_vars: Option>, - /// EnvVars can be provided to set environment variables for the function runtime. - #[serde(skip_serializing_if = "Option::is_none")] - pub env_vars: Option>, + // /// Constraints are specific to the faas-provider. + // #[serde(skip_serializing_if = "Option::is_none")] + // pub constraints: Option>, - /// Constraints are specific to the faas-provider. - #[serde(skip_serializing_if = "Option::is_none")] - pub constraints: Option>, + // /// Secrets list of secrets to be made available to function + // #[serde(skip_serializing_if = "Option::is_none")] + // pub secrets: Option>, - /// Secrets list of secrets to be made available to function - #[serde(skip_serializing_if = "Option::is_none")] - pub secrets: Option>, + // /// Labels are metadata for functions which may be used by the + // /// faas-provider or the gateway + // #[serde(skip_serializing_if = "Option::is_none")] + // pub labels: Option>, - /// Labels are metadata for functions which may be used by the - /// faas-provider or the gateway - #[serde(skip_serializing_if = "Option::is_none")] - pub labels: Option>, + // /// Annotations are metadata for functions which may be used by the + // /// faas-provider or the gateway + // #[serde(skip_serializing_if = "Option::is_none")] + // pub annotations: Option>, - /// Annotations are metadata for functions which may be used by the - /// faas-provider or the gateway - #[serde(skip_serializing_if = "Option::is_none")] - pub annotations: Option>, + // /// Limits for function + // #[serde(skip_serializing_if = "Option::is_none")] + // pub limits: Option, - /// Limits for function - #[serde(skip_serializing_if = "Option::is_none")] - pub limits: Option, + // /// Requests of resources requested by function + // #[serde(skip_serializing_if = "Option::is_none")] + // pub requests: Option, - /// Requests of resources requested by function - #[serde(skip_serializing_if = "Option::is_none")] - pub requests: Option, - - /// ReadOnlyRootFilesystem removes write-access from the root filesystem - /// mount-point. - #[serde(rename = "readOnlyRootFilesystem", default)] - pub read_only_root_filesystem: bool, + // /// ReadOnlyRootFilesystem removes write-access from the root filesystem + // /// mount-point. + // #[serde(rename = "readOnlyRootFilesystem", default)] + // pub read_only_root_filesystem: bool, } -#[derive(Debug, Serialize, Deserialize)] -pub struct FunctionResources { - #[serde(skip_serializing_if = "Option::is_none")] - pub memory: Option, +// #[derive(Debug, Serialize, Deserialize)] +// pub struct FunctionResources { +// #[serde(skip_serializing_if = "Option::is_none")] +// pub memory: Option, + +// #[serde(skip_serializing_if = "Option::is_none")] +// pub cpu: Option, +// } +#[derive(serde::Deserialize)] +pub struct DeployFunctionInfo { + pub function_name: String, + pub image: String, #[serde(skip_serializing_if = "Option::is_none")] - pub cpu: Option, + pub namespace: Option, } diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index 89cea62..c7b71f1 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -1,6 +1,7 @@ pub mod spec; pub mod systemd; +use cni::cni_network::init_net_work; use containerd_client::{ Client, services::v1::{ @@ -170,12 +171,10 @@ impl Service { .list(with_namespace!(request, namespace)) .await? .into_inner(); + println!("Tasks: {:?}", responce.tasks); drop(tc); - if let Some(task) = responce - .tasks - .iter() - .find(|task| task.container_id == container.id) - { + + if let Some(task) = responce.tasks.iter().find(|task| task.id == container.id) { println!("Task found: {}, Status: {}", task.id, task.status); // TASK_UNKNOWN (0) — 未知状态 // TASK_CREATED (1) — 任务已创建 @@ -184,7 +183,7 @@ impl Service { // TASK_EXITED (4) — 任务已退出 // TASK_PAUSED (5) — 任务已暂停 // TASK_FAILED (6) — 任务失败 - self.delete_task(&task.container_id, ns).await; + let _ = self.delete_task(&task.id, ns).await; } let delete_request = DeleteContainerRequest { @@ -197,7 +196,7 @@ impl Service { .expect("Failed to delete container"); self.remove_netns_ip(cid).await; - // println!("Container: {:?} deleted", cc); + println!("Container: {:?} deleted", cc); } else { todo!("Container not found"); } @@ -237,9 +236,15 @@ impl Service { .await? .into_inner() .mounts; + println!("mounts ok"); drop(sc); + println!("drop sc ok"); + let _ = init_net_work(); + println!("init_net_work ok"); let (ip, path) = cni::cni_network::create_cni_network(cid.to_string(), ns.to_string())?; + println!("create_cni_network ok"); self.save_netns_ip(cid, &path, &ip).await; + println!("save_netns_ip ok"); let mut tc = self.client.tasks(); let req = CreateTaskRequest { container_id: cid.to_string(), @@ -257,6 +262,7 @@ impl Service { ..Default::default() }; let _resp = self.client.tasks().start(with_namespace!(req, ns)).await?; + println!("Task: {:?} started", cid); Ok(()) } @@ -284,7 +290,7 @@ impl Service { pub async fn resume_task() { todo!() } - pub async fn delete_task(&self, cid: &str, ns: &str) { + pub async fn delete_task(&self, cid: &str, ns: &str) -> Result<(), Err> { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); @@ -300,6 +306,7 @@ impl Service { Ok::<(), Err>(()) }) .await; + println!(" after wait"); let kill_request = KillRequest { container_id: cid.to_string(), @@ -317,22 +324,43 @@ impl Service { container_id: cid.to_string(), }; - let _resp = c - .delete(with_namespace!(req, namespace)) - .await - .expect("Failed to delete task"); - println!("Task: {:?} deleted", cid); + // let _resp = c + // .delete(with_namespace!(req, namespace)) + // .await + // .expect("Failed to delete task"); + // println!("Task: {:?} deleted", cid); + match c.delete(with_namespace!(req, namespace)).await { + Ok(_) => { + println!("Task: {:?} deleted", cid); + Ok(()) + } + Err(e) => { + eprintln!("Failed to delete task: {}", e); + Err(e.into()) + } + } } - _ => { + Ok(Err(e)) => { + eprintln!("Wait task failed: {}", e); + Err(e) + } + Err(_) => { let kill_request = KillRequest { container_id: cid.to_string(), signal: 9, all: true, ..Default::default() }; - c.kill(with_namespace!(kill_request, namespace)) - .await - .expect("Failed to FORCE kill task"); + match c.kill(with_namespace!(kill_request, namespace)).await { + Ok(_) => { + println!("Task: {:?} force killed", cid); + Ok(()) + } + Err(e) => { + eprintln!("Failed to force kill task: {}", e); + Err(e.into()) + } + } } } } @@ -390,7 +418,7 @@ impl Service { let task = tasks .into_iter() - .find(|task| task.container_id == cid) + .find(|task| task.id == cid) .ok_or_else(|| -> Err { format!("Task for container {} not found", cid).into() })?; Ok(task) diff --git a/crates/service/src/spec.rs b/crates/service/src/spec.rs index 0594a07..691ab85 100644 --- a/crates/service/src/spec.rs +++ b/crates/service/src/spec.rs @@ -329,7 +329,9 @@ pub fn generate_spec( let mut spec = populate_default_unix_spec(id, ns); spec.process.args = args; spec.process.env = env; - let path = format!("{}/{}/{}.json", PATH_TO_SPEC_PREFIX, namespace, id); + let dir_path = format!("{}/{}", PATH_TO_SPEC_PREFIX, namespace); + let path = format!("{}/{}.json", dir_path, id); + std::fs::create_dir_all(&dir_path)?; save_spec_to_file(&spec, &path)?; Ok(path) } diff --git a/flake.nix b/flake.nix index aeb6ee4..73b3efb 100644 --- a/flake.nix +++ b/flake.nix @@ -13,7 +13,7 @@ }; outputs = { self, nixpkgs, crane, flake-utils, rust-overlay, nix-github-actions, ... }: - flake-utils.lib.eachDefaultSystem (system: + flake-utils.lib.eachSystem [ "x86_64-linux" "aarch64-linux" ] (system: let # reference: https://crane.dev/examples/quick-start-workspace.html overlays = [ (import rust-overlay) ]; @@ -33,6 +33,8 @@ strictDeps = true; # Add additional build inputs here buildInputs = with pkgs; [ + cni + cni-plugins openssl protobuf pkg-config @@ -153,9 +155,7 @@ // { githubActions = nix-github-actions.lib.mkGithubMatrix { checks.x86_64-linux = self.checks.x86_64-linux; - checks.x86_64-darwin.faas-rs-crate = self.checks.x86_64-darwin.faas-rs-crate; checks.aarch64-linux.faas-rs-crate = self.checks.aarch64-linux.faas-rs-crate; - checks.aarch64-darwin.faas-rs-crate = self.checks.aarch64-darwin.faas-rs-crate; }; }; }