feat(gateway):enable to deploy and delete function with request (#54)

* fix:move trait IAMHandler to types/

* feat:enable to resolve request and to set proxy for request

* save implementation for trait IAMHandler

* remove trait IAMHandler and add deploy and delete

* add feishu_bot.yml

* 修复一点ctr逻辑 (#29)

* 修复一点逻辑

* fmt

* feat:remove trait IAMHandler and bind fucntion to gateway

* resolve conflict in cni_network

* fix(delete): fix delete task.

* fix(clippy): fmt

* fix:remove darwin and launch.json

* fix:hakari generate

* fix: remove files and build targets

* fix: some format problems

---------

Co-authored-by: vitus213 <2811215248@qq.com>
Co-authored-by: 火花 <sparkhhhhhhhhhh@outlook.com>
Co-authored-by: scutKKsix <1129332011@qq.com>
Co-authored-by: Samuka007 <samuka007@dragonos.org>
This commit is contained in:
DoL 2025-04-12 00:02:49 +08:00 committed by GitHub
parent eeac4b0204
commit 1acbab92fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 439 additions and 208 deletions

View File

@ -18,9 +18,7 @@ resolver = "2"
# https://doc.rust-lang.org/rustc/platform-support.html # https://doc.rust-lang.org/rustc/platform-support.html
platforms = [ platforms = [
"x86_64-unknown-linux-gnu", "x86_64-unknown-linux-gnu",
# "x86_64-apple-darwin", "aarch64-unknown-linux-gnu",
# "aarch64-apple-darwin",
# "x86_64-pc-windows-msvc",
] ]
# Write out exact versions rather than a semver range. (Defaults to false.) # Write out exact versions rather than a semver range. (Defaults to false.)

4
.env
View File

@ -1,5 +1,5 @@
# cni插件的路径 # cni插件的路径
CNI_BIN_DIR= "/opt/cni/bin" CNI_BIN_DIR= "/nix/store/vrnv8mvvbfj04zma6hr035chj0x5f5i3-cni-plugins-1.6.1/bin"
CNI_CONF_DIR= "/etc/cni/net.d" CNI_CONF_DIR= "/etc/cni/net.d"
# 你的cnitool的路径 # 你的cnitool的路径
CNI_TOOL = "/home/dragonos/MY/gopath/bin/cnitool" CNI_TOOL = "/nix/store/c1ig375fgbv0ykv3amy94ps5sn0cyi7c-cni-1.2.3/bin/cnitool"

59
Cargo.lock generated
View File

@ -35,7 +35,7 @@ dependencies = [
"brotli", "brotli",
"bytes", "bytes",
"bytestring", "bytestring",
"derive_more", "derive_more 0.99.18",
"encoding_rs", "encoding_rs",
"flate2", "flate2",
"futures-core", "futures-core",
@ -151,7 +151,7 @@ dependencies = [
"bytestring", "bytestring",
"cfg-if", "cfg-if",
"cookie", "cookie",
"derive_more", "derive_more 0.99.18",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
"futures-util", "futures-util",
@ -517,7 +517,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
name = "cni" name = "cni"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"dotenv",
"lazy_static", "lazy_static",
"log",
"my-workspace-hack", "my-workspace-hack",
"serde_json", "serde_json",
] ]
@ -560,6 +562,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "convert_case"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7"
dependencies = [
"unicode-segmentation",
]
[[package]] [[package]]
name = "cookie" name = "cookie"
version = "0.16.2" version = "0.16.2"
@ -731,13 +742,35 @@ version = "0.99.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce"
dependencies = [ dependencies = [
"convert_case", "convert_case 0.4.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustc_version", "rustc_version",
"syn 2.0.96", "syn 2.0.96",
] ]
[[package]]
name = "derive_more"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"convert_case 0.7.1",
"proc-macro2",
"quote",
"syn 2.0.96",
"unicode-xid",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.7" version = "0.10.7"
@ -816,6 +849,7 @@ dependencies = [
"actix-web", "actix-web",
"dotenv", "dotenv",
"my-workspace-hack", "my-workspace-hack",
"provider",
"serde 1.0.217", "serde 1.0.217",
"serde_json", "serde_json",
"service", "service",
@ -1573,9 +1607,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.25" version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]] [[package]]
name = "matchit" name = "matchit"
@ -1627,7 +1661,6 @@ name = "my-workspace-hack"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix-router", "actix-router",
"bitflags 2.8.0",
"bytes", "bytes",
"futures-channel", "futures-channel",
"futures-task", "futures-task",
@ -2046,6 +2079,7 @@ dependencies = [
"bollard", "bollard",
"cni", "cni",
"config", "config",
"derive_more 2.0.1",
"futures", "futures",
"futures-util", "futures-util",
"hyper 0.14.32", "hyper 0.14.32",
@ -2061,6 +2095,7 @@ dependencies = [
"tempfile", "tempfile",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio", "tokio",
"tonic",
"tower 0.4.13", "tower 0.4.13",
"url", "url",
"uuid", "uuid",
@ -2886,6 +2921,18 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243" checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243"
[[package]]
name = "unicode-segmentation"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]] [[package]]
name = "url" name = "url"
version = "2.5.4" version = "2.5.4"

View File

@ -4,10 +4,11 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
actix-web = "4.0" actix-web = "4.5.1"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
service = { path = "../service" } service = { path = "../service" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" }
provider = { path = "../provider" }
dotenv = "0.15" dotenv = "0.15"

View File

@ -7,6 +7,10 @@ pub mod handlers;
pub mod types; pub mod types;
use handlers::*; use handlers::*;
use provider::{
handlers::{delete::delete_handler, deploy::deploy_handler},
proxy::proxy_handler::proxy_handler,
};
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
@ -19,17 +23,22 @@ async fn main() -> std::io::Result<()> {
println!("I'm running!"); println!("I'm running!");
HttpServer::new(move || { let server = HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(service.clone())) .app_data(web::Data::new(service.clone()))
.route("/create-container", web::post().to(create_container)) .route("/create-container", web::post().to(create_container))
.route("/remove-container", web::post().to(remove_container)) .route("/remove-container", web::post().to(remove_container))
.route("/containers", web::get().to(get_container_list)) .route("/containers", web::get().to(get_container_list))
.route("/system/functions", web::post().to(deploy_handler))
.route("/system/functions", web::delete().to(delete_handler))
.route("/function/{name}{path:/?.*}", web::to(proxy_handler))
// 更多路由配置... // 更多路由配置...
}) })
.bind("0.0.0.0:18080")? .bind("0.0.0.0:8090")?;
.run()
.await println!("0.0.0.0:8090");
server.run().await
} }
// 测试env能够正常获取 // 测试env能够正常获取

View File

@ -8,4 +8,7 @@ edition = "2024"
[dependencies] [dependencies]
serde_json = "1.0" serde_json = "1.0"
my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" }
log = "0.4.27"
dotenv = "0.15.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"

View File

@ -50,12 +50,18 @@ tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1", default-features = false, features = ["std"] } tracing-core = { version = "0.1", default-features = false, features = ["std"] }
[target.x86_64-unknown-linux-gnu.dependencies] [target.x86_64-unknown-linux-gnu.dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] }
libc = { version = "0.2" } libc = { version = "0.2" }
[target.x86_64-unknown-linux-gnu.build-dependencies] [target.x86_64-unknown-linux-gnu.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] }
libc = { version = "0.2" }
[target.aarch64-unknown-linux-gnu.dependencies]
getrandom = { version = "0.2", default-features = false, features = ["std"] }
libc = { version = "0.2" }
[target.aarch64-unknown-linux-gnu.build-dependencies]
getrandom = { version = "0.2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] }
libc = { version = "0.2" } libc = { version = "0.2" }

View File

@ -31,3 +31,5 @@ lazy_static = "1.4.0"
log = "0.4" log = "0.4"
my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" } my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" }
url = "2.4" url = "2.4"
derive_more = { version = "2", features = ["full"] }
tonic = "0.12"

View File

@ -1,5 +1,5 @@
#[allow(unused)] #[allow(unused)]
pub const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn"; pub const DEFAULT_FUNCTION_NAMESPACE: &str = "default";
#[allow(unused)] #[allow(unused)]
pub const NAMESPACE_LABEL: &str = "faasrs"; pub const NAMESPACE_LABEL: &str = "faasrs";

View File

@ -0,0 +1,64 @@
use crate::{
consts,
handlers::{
function_get::get_function,
utils::{CustomError, map_service_error},
},
};
use actix_web::{HttpResponse, Responder, error, web};
use serde::{Deserialize, Serialize};
use service::Service;
use std::sync::Arc;
pub async fn delete_handler(
service: web::Data<Arc<Service>>,
info: web::Json<DeleteContainerInfo>,
) -> impl Responder {
let function_name = info.function_name.clone();
let namespace = info
.namespace
.clone()
.unwrap_or_else(|| consts::DEFAULT_FUNCTION_NAMESPACE.to_string());
match delete(&function_name, &namespace, &service).await {
Ok(()) => {
HttpResponse::Ok().body(format!("function {} deleted successfully", function_name))
}
Err(e) => HttpResponse::InternalServerError().body(format!(
"failed to delete function {} in namespace {} because {}",
function_name, namespace, e
)),
}
}
async fn delete(
function_name: &str,
namespace: &str,
service: &Arc<Service>,
) -> Result<(), CustomError> {
let namespaces = service.list_namespaces().await.map_err(map_service_error)?;
if !namespaces.contains(&namespace.to_string()) {
return Err(CustomError::ActixError(error::ErrorBadRequest(format!(
"Namespace '{}' not valid or does not exist",
namespace
))));
}
let function = get_function(service, function_name, namespace).await?;
if function.replicas != 0 {
println!(" delete_cni_network ing {:?}", function.replicas);
cni::cni_network::delete_cni_network(namespace, function_name);
} else {
println!(" function.replicas {:?}", function.replicas);
}
service
.remove_container(function_name, namespace)
.await
.map_err(map_service_error)?;
Ok(())
}
#[derive(Serialize, Deserialize)]
pub struct DeleteContainerInfo {
pub function_name: String,
pub namespace: Option<String>,
}

View File

@ -1,26 +1,100 @@
use crate::types::config::IAmHandler; use crate::{
use crate::types::{self, CreateContainerInfo}; consts,
use actix_web::HttpResponse; handlers::utils::{CustomError, map_service_error},
types::function_deployment::{DeployFunctionInfo, FunctionDeployment},
};
use actix_web::{HttpResponse, Responder, web};
use service::Service; use service::Service;
use std::sync::Arc; use std::sync::Arc;
pub struct DeployHandler { pub async fn deploy_handler(
pub config: types::function_deployment::FunctionDeployment, service: web::Data<Arc<Service>>,
service: Arc<Service>, info: web::Json<DeployFunctionInfo>,
) -> impl Responder {
let image = info.image.clone();
let function_name = info.function_name.clone();
let namespace = info
.namespace
.clone()
.unwrap_or(consts::DEFAULT_FUNCTION_NAMESPACE.to_string());
let config = FunctionDeployment {
service: function_name,
image,
namespace: Some(namespace),
};
match deploy(&service, &config).await {
Ok(()) => HttpResponse::Accepted().body(format!(
"Function {} deployment initiated successfully .",
config.service
)),
Err(e) => HttpResponse::InternalServerError().body(format!(
"failed to deploy function {}, because {}",
config.service, e
)),
}
} }
impl IAmHandler for DeployHandler { async fn deploy(service: &Arc<Service>, config: &FunctionDeployment) -> Result<(), CustomError> {
type Input = CreateContainerInfo; // let namespaces = service
// type Output = String; // .list_namespaces()
// .await
// .map_err(|e| map_service_error(e))?;
let namespace = config.namespace.clone().unwrap();
async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder { // if !namespaces.contains(&namespace) {
let cid = input.container_id.clone(); // return Err(CustomError::ActixError(error::ErrorBadRequest(format!(
let image = input.image.clone(); // "Namespace '{}' not valid or does not exist",
let ns = input.ns.clone(); // namespace
self.service // ))));
.create_container(&image, &cid, &ns) // }
println!(
"Namespace '{}' validated.",
config.namespace.clone().unwrap()
);
let container_list = service
.get_container_list(&namespace)
.await .await
.unwrap(); .map_err(CustomError::from)?;
HttpResponse::Ok().json(format!("Container {} created successfully!", cid))
if container_list.contains(&config.service) {
return Err(CustomError::OtherError(
"container has been existed".to_string(),
));
} }
service
.prepare_image(&config.image, &namespace, true)
.await
.map_err(map_service_error)?;
println!("Image '{}' validated", &config.image);
service
.create_container(&config.image, &config.service, &namespace)
.await
.map_err(|e| CustomError::OtherError(format!("failed to create container:{}", e)))?;
println!(
"Container {} created using image {} in namespace {}",
&config.service, &config.image, namespace
);
service
.create_and_start_task(&config.service, &namespace)
.await
.map_err(|e| {
CustomError::OtherError(format!(
"failed to start task for container {},{}",
&config.service, e
))
})?;
println!(
"Task for container {} was created successfully",
&config.service
);
Ok(())
} }

View File

@ -56,12 +56,13 @@ pub async fn get_function(
match task { match task {
Ok(task) => { Ok(task) => {
let status = task.status; let status = task.status;
if status == 2 { if status == 2 || status == 3 {
pid = task.pid; pid = task.pid;
replicas = 1; replicas = 1;
} }
} }
Err(_) => { Err(e) => {
eprintln!("Failed to get task: {}", e);
replicas = 0; replicas = 0;
} }
} }

View File

@ -1,33 +1,5 @@
use crate::types::config::IAmHandler;
use actix_web::HttpResponse;
use std::{collections::HashMap, time::SystemTime}; use std::{collections::HashMap, time::SystemTime};
pub struct FunctionLister {
service: std::sync::Arc<service::Service>,
}
impl IAmHandler for FunctionLister {
type Input = String;
// type Output = Vec<String>;
async fn execute(&mut self, input: Self::Input) -> impl actix_web::Responder {
// faasd进来的第一步是验证命名空间的标签是否具有某个值也就是验证是否为true确保命名空间有效
// 但是这里省略,因为好像标签为空?
let containers = self
.service
.get_container_list(input.as_str())
.await
.unwrap();
for container in containers.iter() {
log::debug!("container: {:?}", container);
}
HttpResponse::Ok().json("函数列表")
}
}
pub struct Function { pub struct Function {
pub name: String, pub name: String,
pub namespace: String, pub namespace: String,

View File

@ -1,8 +1,9 @@
pub mod delete;
pub mod deploy; pub mod deploy;
pub mod function_get; pub mod function_get;
pub mod function_list; pub mod function_list;
pub mod invoke_resolver; pub mod invoke_resolver;
pub mod namespace_list; pub mod utils;
use actix_web::{HttpRequest, HttpResponse, Responder}; use actix_web::{HttpRequest, HttpResponse, Responder};

View File

@ -1,17 +0,0 @@
use crate::types::config::IAmHandler;
use actix_web::{HttpResponse, Responder};
use service::Service;
use std::sync::Arc;
pub struct NamespaceLister {
service: Arc<Service>,
}
impl IAmHandler for NamespaceLister {
type Input = ();
// type Output = Vec<String>;
async fn execute(&mut self, _input: Self::Input) -> impl Responder {
let ns_list = self.service.list_namespaces().await.unwrap();
HttpResponse::Ok().json(ns_list)
}
}

View File

@ -0,0 +1,65 @@
use crate::handlers::function_get::FunctionError;
use actix_web::{Error, HttpResponse, ResponseError};
use derive_more::Display;
pub fn map_service_error(e: Box<dyn std::error::Error>) -> Error {
eprintln!("Service error: {}", e);
actix_web::error::ErrorInternalServerError(format!("Operationfailed: {}", e))
}
//枚举错误类型,并非所有被调用的函数的错误类型都能实现调用函数的错误特征
#[derive(Debug, Display)]
pub enum CustomError {
#[display("GrpcError: {}", _0)]
GrpcError(tonic::Status),
#[display("OtherError: {}", _0)]
OtherError(String),
#[display("ActixError: {}", _0)]
ActixError(actix_web::Error),
#[display("FunctionError: {}", _0)]
FunctionError(FunctionError),
}
impl ResponseError for CustomError {
fn error_response(&self) -> HttpResponse {
match self {
CustomError::GrpcError(status) => {
// Customize the HTTP response based on the gRPC status
match status.code() {
tonic::Code::NotFound => {
HttpResponse::NotFound().body(status.message().to_string())
}
tonic::Code::PermissionDenied => {
HttpResponse::Forbidden().body(status.message().to_string())
}
_ => HttpResponse::InternalServerError().body(status.message().to_string()),
}
}
CustomError::OtherError(message) => {
HttpResponse::InternalServerError().body(message.clone())
}
CustomError::ActixError(err) => err.error_response(),
CustomError::FunctionError(err) => {
HttpResponse::InternalServerError().body(err.to_string())
}
}
}
}
impl From<actix_web::Error> for CustomError {
fn from(err: actix_web::Error) -> Self {
CustomError::ActixError(err)
}
}
impl From<tonic::Status> for CustomError {
fn from(err: tonic::Status) -> Self {
CustomError::GrpcError(err)
}
}
impl From<FunctionError> for CustomError {
fn from(err: FunctionError) -> Self {
CustomError::FunctionError(err)
}
}

View File

@ -1,10 +1 @@
pub mod proxy_handler; pub mod proxy_handler;
use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig};
use actix_web::{HttpRequest, web};
pub struct ProxyHandlerInfo {
req: HttpRequest,
payload: web::Payload,
config: FaaSConfig,
resolver: Option<InvokeResolver>,
}

View File

@ -5,27 +5,20 @@ use actix_web::{Error, HttpRequest, HttpResponse, Responder, http::Method, web};
use reqwest::{Client, RequestBuilder, redirect}; use reqwest::{Client, RequestBuilder, redirect};
use url::Url; use url::Url;
use crate::{ use crate::{handlers::invoke_resolver::InvokeResolver, types::config::FaaSConfig};
handlers::invoke_resolver::InvokeResolver,
types::config::{FaaSConfig, IAmHandler},
};
use super::ProxyHandlerInfo; pub async fn proxy_handler(
config: web::Data<FaaSConfig>,
pub struct Proxy {} resolver: web::Data<Option<InvokeResolver>>,
req: HttpRequest,
impl IAmHandler for Proxy { payload: web::Payload,
type Input = ProxyHandlerInfo; ) -> impl Responder {
let resolver_option = resolver.as_ref();
async fn execute(&mut self, input: Self::Input) -> impl Responder { let resolver = resolver_option
let resolver = input .as_ref()
.resolver
.expect("empty proxy handler resolver, cannot be nil"); .expect("empty proxy handler resolver, cannot be nil");
let req = input.req;
let config = input.config;
let mut payload = input.payload;
let proxy_client = new_proxy_client_from_config(config).await; let proxy_client = new_proxy_client_from_config(config.as_ref()).await;
match *req.method() { match *req.method() {
Method::POST Method::POST
@ -34,19 +27,15 @@ impl IAmHandler for Proxy {
| Method::GET | Method::GET
| Method::PATCH | Method::PATCH
| Method::HEAD | Method::HEAD
| Method::OPTIONS => { | Method::OPTIONS => match proxy_request(&req, payload, &proxy_client, resolver).await {
match proxy_request(&req, &mut payload, &proxy_client, &resolver).await {
Ok(resp) => resp, Ok(resp) => resp,
Err(e) => HttpResponse::InternalServerError().body(e.to_string()), Err(e) => HttpResponse::InternalServerError().body(e.to_string()),
} },
}
_ => HttpResponse::MethodNotAllowed().body("method not allowed"), _ => HttpResponse::MethodNotAllowed().body("method not allowed"),
} }
} }
}
//构建client //构建client
async fn new_proxy_client_from_config(config: FaaSConfig) -> Client { async fn new_proxy_client_from_config(config: &FaaSConfig) -> Client {
new_proxy_client( new_proxy_client(
config.get_read_timeout(), config.get_read_timeout(),
/*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(), /*config.get_max_idle_conns(),*/ config.get_max_idle_conns_per_host(),
@ -75,7 +64,7 @@ async fn new_proxy_client(
//根据原始请求解析url构建转发请求并转发获取响应 //根据原始请求解析url构建转发请求并转发获取响应
async fn proxy_request( async fn proxy_request(
req: &HttpRequest, req: &HttpRequest,
payload: &mut web::Payload, payload: web::Payload,
proxy_client: &Client, proxy_client: &Client,
resolver: &InvokeResolver, resolver: &InvokeResolver,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
@ -113,7 +102,7 @@ async fn build_proxy_request(
req: &HttpRequest, req: &HttpRequest,
base_url: &Url, base_url: &Url,
proxy_client: &Client, proxy_client: &Client,
payload: &mut web::Payload, mut payload: web::Payload,
) -> Result<RequestBuilder, Error> { ) -> Result<RequestBuilder, Error> {
let origin_url = base_url.join(req.uri().path()).unwrap(); let origin_url = base_url.join(req.uri().path()).unwrap();
let remaining_segments = origin_url.path_segments().unwrap().skip(2); let remaining_segments = origin_url.path_segments().unwrap().skip(2);

View File

@ -1,20 +1,8 @@
use actix_web::Responder;
use std::time::Duration; use std::time::Duration;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_MAX_IDLE_CONNS: usize = 1024; const DEFAULT_MAX_IDLE_CONNS: usize = 1024;
pub trait IAmHandler {
type Input;
// type Output: Serialize + Send + 'static;
// /// 获取Handler元数据函数名、超时时间等
// fn metadata(&self) -> HandlerMeta;
/// 执行核心逻辑
fn execute(&mut self, input: Self::Input) -> impl std::future::Future<Output = impl Responder> /*+ Send*/;
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FaaSConfig { pub struct FaaSConfig {
pub tcp_port: Option<u16>, pub tcp_port: Option<u16>,

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; //use std::collections::HashMap;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct FunctionDeployment { pub struct FunctionDeployment {
@ -12,53 +12,60 @@ pub struct FunctionDeployment {
/// Namespace for the function, if supported by the faas-provider /// Namespace for the function, if supported by the faas-provider
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>, pub namespace: Option<String>,
// /// EnvProcess overrides the fprocess environment variable and can be used
// /// with the watchdog
// #[serde(rename = "envProcess", skip_serializing_if = "Option::is_none")]
// pub env_process: Option<String>,
/// EnvProcess overrides the fprocess environment variable and can be used // /// EnvVars can be provided to set environment variables for the function runtime.
/// with the watchdog // #[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "envProcess", skip_serializing_if = "Option::is_none")] // pub env_vars: Option<HashMap<String, String>>,
pub env_process: Option<String>,
/// EnvVars can be provided to set environment variables for the function runtime. // /// Constraints are specific to the faas-provider.
#[serde(skip_serializing_if = "Option::is_none")] // #[serde(skip_serializing_if = "Option::is_none")]
pub env_vars: Option<HashMap<String, String>>, // pub constraints: Option<Vec<String>>,
/// Constraints are specific to the faas-provider. // /// Secrets list of secrets to be made available to function
#[serde(skip_serializing_if = "Option::is_none")] // #[serde(skip_serializing_if = "Option::is_none")]
pub constraints: Option<Vec<String>>, // pub secrets: Option<Vec<String>>,
/// Secrets list of secrets to be made available to function // /// Labels are metadata for functions which may be used by the
#[serde(skip_serializing_if = "Option::is_none")] // /// faas-provider or the gateway
pub secrets: Option<Vec<String>>, // #[serde(skip_serializing_if = "Option::is_none")]
// pub labels: Option<HashMap<String, String>>,
/// Labels are metadata for functions which may be used by the // /// Annotations are metadata for functions which may be used by the
/// faas-provider or the gateway // /// faas-provider or the gateway
#[serde(skip_serializing_if = "Option::is_none")] // #[serde(skip_serializing_if = "Option::is_none")]
pub labels: Option<HashMap<String, String>>, // pub annotations: Option<HashMap<String, String>>,
/// Annotations are metadata for functions which may be used by the // /// Limits for function
/// faas-provider or the gateway // #[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")] // pub limits: Option<FunctionResources>,
pub annotations: Option<HashMap<String, String>>,
/// Limits for function // /// Requests of resources requested by function
#[serde(skip_serializing_if = "Option::is_none")] // #[serde(skip_serializing_if = "Option::is_none")]
pub limits: Option<FunctionResources>, // pub requests: Option<FunctionResources>,
/// Requests of resources requested by function // /// ReadOnlyRootFilesystem removes write-access from the root filesystem
#[serde(skip_serializing_if = "Option::is_none")] // /// mount-point.
pub requests: Option<FunctionResources>, // #[serde(rename = "readOnlyRootFilesystem", default)]
// pub read_only_root_filesystem: bool,
/// ReadOnlyRootFilesystem removes write-access from the root filesystem
/// mount-point.
#[serde(rename = "readOnlyRootFilesystem", default)]
pub read_only_root_filesystem: bool,
} }
#[derive(Debug, Serialize, Deserialize)] // #[derive(Debug, Serialize, Deserialize)]
pub struct FunctionResources { // pub struct FunctionResources {
#[serde(skip_serializing_if = "Option::is_none")] // #[serde(skip_serializing_if = "Option::is_none")]
pub memory: Option<String>, // pub memory: Option<String>,
// #[serde(skip_serializing_if = "Option::is_none")]
// pub cpu: Option<String>,
// }
#[derive(serde::Deserialize)]
pub struct DeployFunctionInfo {
pub function_name: String,
pub image: String,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub cpu: Option<String>, pub namespace: Option<String>,
} }

View File

@ -1,6 +1,7 @@
pub mod spec; pub mod spec;
pub mod systemd; pub mod systemd;
use cni::cni_network::init_net_work;
use containerd_client::{ use containerd_client::{
Client, Client,
services::v1::{ services::v1::{
@ -170,12 +171,10 @@ impl Service {
.list(with_namespace!(request, namespace)) .list(with_namespace!(request, namespace))
.await? .await?
.into_inner(); .into_inner();
println!("Tasks: {:?}", responce.tasks);
drop(tc); drop(tc);
if let Some(task) = responce
.tasks if let Some(task) = responce.tasks.iter().find(|task| task.id == container.id) {
.iter()
.find(|task| task.container_id == container.id)
{
println!("Task found: {}, Status: {}", task.id, task.status); println!("Task found: {}, Status: {}", task.id, task.status);
// TASK_UNKNOWN (0) — 未知状态 // TASK_UNKNOWN (0) — 未知状态
// TASK_CREATED (1) — 任务已创建 // TASK_CREATED (1) — 任务已创建
@ -184,7 +183,7 @@ impl Service {
// TASK_EXITED (4) — 任务已退出 // TASK_EXITED (4) — 任务已退出
// TASK_PAUSED (5) — 任务已暂停 // TASK_PAUSED (5) — 任务已暂停
// TASK_FAILED (6) — 任务失败 // TASK_FAILED (6) — 任务失败
self.delete_task(&task.container_id, ns).await; let _ = self.delete_task(&task.id, ns).await;
} }
let delete_request = DeleteContainerRequest { let delete_request = DeleteContainerRequest {
@ -197,7 +196,7 @@ impl Service {
.expect("Failed to delete container"); .expect("Failed to delete container");
self.remove_netns_ip(cid).await; self.remove_netns_ip(cid).await;
// println!("Container: {:?} deleted", cc); println!("Container: {:?} deleted", cc);
} else { } else {
todo!("Container not found"); todo!("Container not found");
} }
@ -237,9 +236,15 @@ impl Service {
.await? .await?
.into_inner() .into_inner()
.mounts; .mounts;
println!("mounts ok");
drop(sc); drop(sc);
println!("drop sc ok");
let _ = init_net_work();
println!("init_net_work ok");
let (ip, path) = cni::cni_network::create_cni_network(cid.to_string(), ns.to_string())?; let (ip, path) = cni::cni_network::create_cni_network(cid.to_string(), ns.to_string())?;
println!("create_cni_network ok");
self.save_netns_ip(cid, &path, &ip).await; self.save_netns_ip(cid, &path, &ip).await;
println!("save_netns_ip ok");
let mut tc = self.client.tasks(); let mut tc = self.client.tasks();
let req = CreateTaskRequest { let req = CreateTaskRequest {
container_id: cid.to_string(), container_id: cid.to_string(),
@ -257,6 +262,7 @@ impl Service {
..Default::default() ..Default::default()
}; };
let _resp = self.client.tasks().start(with_namespace!(req, ns)).await?; let _resp = self.client.tasks().start(with_namespace!(req, ns)).await?;
println!("Task: {:?} started", cid);
Ok(()) Ok(())
} }
@ -284,7 +290,7 @@ impl Service {
pub async fn resume_task() { pub async fn resume_task() {
todo!() todo!()
} }
pub async fn delete_task(&self, cid: &str, ns: &str) { pub async fn delete_task(&self, cid: &str, ns: &str) -> Result<(), Err> {
let namespace = self.check_namespace(ns); let namespace = self.check_namespace(ns);
let namespace = namespace.as_str(); let namespace = namespace.as_str();
@ -300,6 +306,7 @@ impl Service {
Ok::<(), Err>(()) Ok::<(), Err>(())
}) })
.await; .await;
println!(" after wait");
let kill_request = KillRequest { let kill_request = KillRequest {
container_id: cid.to_string(), container_id: cid.to_string(),
@ -317,22 +324,43 @@ impl Service {
container_id: cid.to_string(), container_id: cid.to_string(),
}; };
let _resp = c // let _resp = c
.delete(with_namespace!(req, namespace)) // .delete(with_namespace!(req, namespace))
.await // .await
.expect("Failed to delete task"); // .expect("Failed to delete task");
// println!("Task: {:?} deleted", cid);
match c.delete(with_namespace!(req, namespace)).await {
Ok(_) => {
println!("Task: {:?} deleted", cid); println!("Task: {:?} deleted", cid);
Ok(())
} }
_ => { Err(e) => {
eprintln!("Failed to delete task: {}", e);
Err(e.into())
}
}
}
Ok(Err(e)) => {
eprintln!("Wait task failed: {}", e);
Err(e)
}
Err(_) => {
let kill_request = KillRequest { let kill_request = KillRequest {
container_id: cid.to_string(), container_id: cid.to_string(),
signal: 9, signal: 9,
all: true, all: true,
..Default::default() ..Default::default()
}; };
c.kill(with_namespace!(kill_request, namespace)) match c.kill(with_namespace!(kill_request, namespace)).await {
.await Ok(_) => {
.expect("Failed to FORCE kill task"); println!("Task: {:?} force killed", cid);
Ok(())
}
Err(e) => {
eprintln!("Failed to force kill task: {}", e);
Err(e.into())
}
}
} }
} }
} }
@ -390,7 +418,7 @@ impl Service {
let task = tasks let task = tasks
.into_iter() .into_iter()
.find(|task| task.container_id == cid) .find(|task| task.id == cid)
.ok_or_else(|| -> Err { format!("Task for container {} not found", cid).into() })?; .ok_or_else(|| -> Err { format!("Task for container {} not found", cid).into() })?;
Ok(task) Ok(task)

View File

@ -329,7 +329,9 @@ pub fn generate_spec(
let mut spec = populate_default_unix_spec(id, ns); let mut spec = populate_default_unix_spec(id, ns);
spec.process.args = args; spec.process.args = args;
spec.process.env = env; spec.process.env = env;
let path = format!("{}/{}/{}.json", PATH_TO_SPEC_PREFIX, namespace, id); let dir_path = format!("{}/{}", PATH_TO_SPEC_PREFIX, namespace);
let path = format!("{}/{}.json", dir_path, id);
std::fs::create_dir_all(&dir_path)?;
save_spec_to_file(&spec, &path)?; save_spec_to_file(&spec, &path)?;
Ok(path) Ok(path)
} }

View File

@ -13,7 +13,7 @@
}; };
outputs = { self, nixpkgs, crane, flake-utils, rust-overlay, nix-github-actions, ... }: outputs = { self, nixpkgs, crane, flake-utils, rust-overlay, nix-github-actions, ... }:
flake-utils.lib.eachDefaultSystem (system: flake-utils.lib.eachSystem [ "x86_64-linux" "aarch64-linux" ] (system:
let let
# reference: https://crane.dev/examples/quick-start-workspace.html # reference: https://crane.dev/examples/quick-start-workspace.html
overlays = [ (import rust-overlay) ]; overlays = [ (import rust-overlay) ];
@ -33,6 +33,8 @@
strictDeps = true; strictDeps = true;
# Add additional build inputs here # Add additional build inputs here
buildInputs = with pkgs; [ buildInputs = with pkgs; [
cni
cni-plugins
openssl openssl
protobuf protobuf
pkg-config pkg-config
@ -153,9 +155,7 @@
// { // {
githubActions = nix-github-actions.lib.mkGithubMatrix { githubActions = nix-github-actions.lib.mkGithubMatrix {
checks.x86_64-linux = self.checks.x86_64-linux; checks.x86_64-linux = self.checks.x86_64-linux;
checks.x86_64-darwin.faas-rs-crate = self.checks.x86_64-darwin.faas-rs-crate;
checks.aarch64-linux.faas-rs-crate = self.checks.aarch64-linux.faas-rs-crate; checks.aarch64-linux.faas-rs-crate = self.checks.aarch64-linux.faas-rs-crate;
checks.aarch64-darwin.faas-rs-crate = self.checks.aarch64-darwin.faas-rs-crate;
}; };
}; };
} }