From c33fefa6359ac6198dfc71f77bbe25942db026fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=81=AB=E8=8A=B1?= Date: Thu, 13 Mar 2025 15:55:36 +0800 Subject: [PATCH] Feat provider framework (#9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加了handler大概框架,具体逻辑尚未实现 --- Cargo.lock | 2 + crates/provider/Cargo.toml | 10 +- crates/provider/src/auth/mod.rs | 36 +++--- crates/provider/src/bootstrap/mod.rs | 106 +++++++++--------- crates/provider/src/config/mod.rs | 1 + crates/provider/src/consts.rs | 14 +++ crates/provider/src/handlers/deploy.rs | 27 +++++ crates/provider/src/handlers/function_list.rs | 25 +++++ crates/provider/src/handlers/mod.rs | 74 +++++++++++- .../provider/src/handlers/namespace_list.rs | 17 +++ crates/provider/src/httputils/mod.rs | 1 + crates/provider/src/lib.rs | 8 +- crates/provider/src/logs/mod.rs | 1 + crates/provider/src/metrics/mod.rs | 8 +- crates/provider/src/proxy/mod.rs | 1 + .../provider/src/types/function_deployment.rs | 64 +++++++++++ crates/provider/src/types/mod.rs | 12 +- crates/service/src/lib.rs | 79 +++++++------ 18 files changed, 366 insertions(+), 120 deletions(-) create mode 100644 crates/provider/src/consts.rs create mode 100644 crates/provider/src/handlers/deploy.rs create mode 100644 crates/provider/src/handlers/function_list.rs create mode 100644 crates/provider/src/handlers/namespace_list.rs create mode 100644 crates/provider/src/types/function_deployment.rs diff --git a/Cargo.lock b/Cargo.lock index 0c8d07f..52e60cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2018,6 +2018,7 @@ dependencies = [ "actix-service", "actix-web", "actix-web-httpauth", + "async-trait", "base64 0.13.1", "bollard", "config", @@ -2030,6 +2031,7 @@ dependencies = [ "reqwest", "serde 1.0.217", "serde_json", + "service", "tempfile", "thiserror 1.0.69", "tokio", diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index a6e2e92..149a0d2 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -6,8 +6,8 @@ authors.workspace = true [dependencies] actix-web = "4.5.1" -serde = { version = "1.0.197", features = ["derive"] } -serde_json = "1.0.114" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" tokio = { version = "1.37.0", features = ["full"] } bollard = "0.13.0" uuid = { version = "1.8.0", features = ["v4"] } @@ -15,7 +15,6 @@ actix-web-httpauth = "0.6" config = "0.11" thiserror = "1.0" reqwest = "0.11" -lazy_static = "1.4" prometheus = "0.13" tempfile = "3.2" hyper = "0.14" @@ -24,4 +23,7 @@ regex = "1" futures = "0.3" actix-service = "2" base64 = "0.13" -futures-util = "0.3" \ No newline at end of file +futures-util = "0.3" +service = { path = "../service" } +async-trait = "0.1" +lazy_static = "1.4.0" \ No newline at end of file diff --git a/crates/provider/src/auth/mod.rs b/crates/provider/src/auth/mod.rs index 0b7dd9e..521009f 100644 --- a/crates/provider/src/auth/mod.rs +++ b/crates/provider/src/auth/mod.rs @@ -1,10 +1,10 @@ -use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error, HttpMessage, HttpResponse}; -use actix_web::http::header::HeaderValue; use actix_web::dev::{Service, Transform}; -use futures_util::future::{ok, Ready, LocalBoxFuture}; +use actix_web::http::header::HeaderValue; +use actix_web::{Error, HttpMessage, HttpResponse, dev::ServiceRequest, dev::ServiceResponse}; +use futures_util::future::{LocalBoxFuture, Ready, ok}; +use std::collections::HashMap; use std::rc::Rc; use std::task::{Context, Poll}; -use std::collections::HashMap; //写到使用actix-web-httpauth作为中间件,还没有解决read_basic_auth函数的实现,返回值和之前在bootstrap的调用不一样 @@ -22,7 +22,7 @@ impl BasicAuthCredentials { } } -pub struct ReadBasicAuthFromDisk{ +pub struct ReadBasicAuthFromDisk { secret_mount_path: String, user_filename: String, password_filename: String, @@ -39,17 +39,22 @@ impl ReadBasicAuthFromDisk { //TODO:这里应该加密? pub async fn read_basic_auth(&self) -> HashMap { let mut user_map = HashMap::new(); - let user_file = std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.user_filename)).unwrap(); - let password_file = std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.password_filename)).unwrap(); + let user_file = + std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.user_filename)) + .unwrap(); + let password_file = std::fs::read_to_string(format!( + "{}/{}", + self.secret_mount_path, self.password_filename + )) + .unwrap(); let user_vec: Vec<&str> = user_file.split("\n").collect(); let password_vec: Vec<&str> = password_file.split("\n").collect(); for i in 0..user_vec.len() { user_map.insert(user_vec[i].to_string(), password_vec[i].to_string()); } user_map - } - + pub async fn basic_auth_validator(&self, req: ServiceRequest) -> Result { let auth_header = req.headers().get("Authorization"); if let Some(auth_header) = auth_header { @@ -57,14 +62,18 @@ impl ReadBasicAuthFromDisk { let auth_header = auth_header.to_str().unwrap(); let auth_header = auth_header.split(" ").collect::>(); if auth_header.len() != 2 { - return Err(actix_web::error::ErrorUnauthorized("Invalid Authorization Header")); + return Err(actix_web::error::ErrorUnauthorized( + "Invalid Authorization Header", + )); } let auth_header = auth_header[1]; let auth_header = base64::decode(auth_header).unwrap(); let auth_header = String::from_utf8(auth_header).unwrap(); let auth_header = auth_header.split(":").collect::>(); if auth_header.len() != 2 { - return Err(actix_web::error::ErrorUnauthorized("Invalid Authorization Header")); + return Err(actix_web::error::ErrorUnauthorized( + "Invalid Authorization Header", + )); } let username = auth_header[0]; let password = auth_header[1]; @@ -75,11 +84,12 @@ impl ReadBasicAuthFromDisk { } } } - Err(actix_web::error::ErrorUnauthorized("Invalid Username or Password")) + Err(actix_web::error::ErrorUnauthorized( + "Invalid Username or Password", + )) } } - async fn index() -> HttpResponse { HttpResponse::Ok().body("欢迎访问受保护的资源!") } diff --git a/crates/provider/src/bootstrap/mod.rs b/crates/provider/src/bootstrap/mod.rs index 8a8e33a..12cf09d 100644 --- a/crates/provider/src/bootstrap/mod.rs +++ b/crates/provider/src/bootstrap/mod.rs @@ -1,25 +1,24 @@ -use actix_web::{web, App, HttpServer, HttpResponse, middleware, guard, Responder}; +use actix_web::{App, HttpResponse, HttpServer, Responder, guard, middleware, web}; use prometheus::Registry; use std::collections::HashMap; use crate::{ auth, + handlers, metrics::{self, HttpMetrics}, //httputil, //proxy, types::config::FaaSConfig, - handlers, }; - //用于函数/服务名称的表达式 const NAME_EXPRESSION: &str = r"-a-zA-Z_0-9\."; //应用程序状态,存储共享的数据,如配置、指标、认证信息等,为业务函数提供支持 #[derive(Clone)] struct AppState { - config: FaaSConfig,//应用程序的配置,用于识别是否开启Basic Auth等 - metrics: HttpMetrics,//用于监视http请求的持续时间和总数 + config: FaaSConfig, //应用程序的配置,用于识别是否开启Basic Auth等 + metrics: HttpMetrics, //用于监视http请求的持续时间和总数 credentials: Option>, //当有认证信息的时候,获取认证信息 } @@ -39,7 +38,11 @@ async fn serve() -> std::io::Result<()> { // 如果启用了Basic Auth,从指定路径读取认证凭证并存储在应用程序状态中 if config.enable_basic_auth { // 读取Basic Auth凭证 - let auth = auth::ReadBasicAuthFromDisk::new(&config.secret_mount_path, "users.txt", "passwords.txt"); + let auth = auth::ReadBasicAuthFromDisk::new( + &config.secret_mount_path, + "users.txt", + "passwords.txt", + ); let credentials = auth.read_basic_auth().await; //这里的credentials是所有的账号密码 app_state.credentials = Some(credentials); //TODO:handlers decorate with basic auth,尚未清楚是不是需要给所有的函数都加上 @@ -50,55 +53,49 @@ async fn serve() -> std::io::Result<()> { .app_data(web::Data::new(app_state.clone())) // 将app_state存储在web::Data中,以便在处理程序中访问 .wrap(middleware::Logger::default()) // 记录请求日志 .service( - web::scope("/system") - .service( - web::resource("/functions") - .route(web::get().to(handlers::function_lister)) - .route(web::post().to(handlers::deploy_function)) - .route(web::delete().to(handlers::delete_function)) - .route(web::put().to(handlers::update_function)) - ) - .service( - web::resource("/function/{name}") - .route(web::get().to(handlers::function_status)) - ) - .service( - web::resource("/scale-function/{name}") - .route(web::post().to(handlers::scale_function)) - ) - .service( - web::resource("/info") - .route(web::get().to(handlers::info)) - ) - .service( - web::resource("/secrets") - .route(web::get().to(handlers::secrets)) - .route(web::post().to(handlers::secrets)) - .route(web::put().to(handlers::secrets)) - .route(web::delete().to(handlers::secrets)) - ) - .service( - web::resource("/logs") - .route(web::get().to(handlers::logs)) - ) - .service( - web::resource("/namespaces") - .route(web::get().to(handlers::list_namespaces)) - .route(web::post().to(handlers::mutate_namespace)) - ) + web::scope("/system") + .service( + web::resource("/functions") + .route(web::get().to(handlers::function_lister)) + .route(web::post().to(handlers::deploy_function)) + .route(web::delete().to(handlers::delete_function)) + .route(web::put().to(handlers::update_function)), + ) + .service( + web::resource("/function/{name}") + .route(web::get().to(handlers::function_status)), + ) + .service( + web::resource("/scale-function/{name}") + .route(web::post().to(handlers::scale_function)), + ) + .service(web::resource("/info").route(web::get().to(handlers::info))) + .service( + web::resource("/secrets") + .route(web::get().to(handlers::secrets)) + .route(web::post().to(handlers::secrets)) + .route(web::put().to(handlers::secrets)) + .route(web::delete().to(handlers::secrets)), + ) + .service(web::resource("/logs").route(web::get().to(handlers::logs))) + .service( + web::resource("/namespaces") + .route(web::get().to(handlers::list_namespaces)) + .route(web::post().to(handlers::mutate_namespace)), + ), ) .service( - web::scope("/function") - .service( - web::resource("/{name}") - .route(web::get().to(handlers::function_proxy)) - .route(web::post().to(handlers::function_proxy)) - ) - .service( - web::resource("/{name}/{params:.*}") - .route(web::get().to(handlers::function_proxy)) - .route(web::post().to(handlers::function_proxy)) - ) + web::scope("/function") + .service( + web::resource("/{name}") + .route(web::get().to(handlers::function_proxy)) + .route(web::post().to(handlers::function_proxy)), + ) + .service( + web::resource("/{name}/{params:.*}") + .route(web::get().to(handlers::function_proxy)) + .route(web::post().to(handlers::function_proxy)), + ), ) .route("/metrics", web::get().to(handlers::telemetry)) .route("/healthz", web::get().to(handlers::health)) @@ -108,6 +105,5 @@ async fn serve() -> std::io::Result<()> { .await } - //当上下文完成的时候关闭服务器 -//无法关闭时候写进log,并且返回错误 \ No newline at end of file +//无法关闭时候写进log,并且返回错误 diff --git a/crates/provider/src/config/mod.rs b/crates/provider/src/config/mod.rs index e69de29..8b13789 100644 --- a/crates/provider/src/config/mod.rs +++ b/crates/provider/src/config/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/provider/src/consts.rs b/crates/provider/src/consts.rs new file mode 100644 index 0000000..fe33525 --- /dev/null +++ b/crates/provider/src/consts.rs @@ -0,0 +1,14 @@ +#[allow(unused)] +const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn"; + +#[allow(unused)] +const NAMESPACE_LABEL: &str = "faasrs"; + +#[allow(unused)] +const FAASRS_NAMESPACE: &str = "faasrs"; + +#[allow(unused)] +const FAASRS_SERVICE_PULL_ALWAYS: bool = false; + +#[allow(unused)] +const DEFAULT_SNAPSHOTTER: &str = "overlayfs"; diff --git a/crates/provider/src/handlers/deploy.rs b/crates/provider/src/handlers/deploy.rs new file mode 100644 index 0000000..59e4a3f --- /dev/null +++ b/crates/provider/src/handlers/deploy.rs @@ -0,0 +1,27 @@ +use super::IAmHandler; +use crate::types::{self, CreateContainerInfo}; +use actix_web::HttpResponse; +use service::Service; +use std::sync::Arc; + +pub struct DeployHandler { + pub config: types::function_deployment::FunctionDeployment, + service: Arc, +} + +impl IAmHandler for DeployHandler { + type Input = CreateContainerInfo; + // type Output = String; + + async fn execute(&self, input: Self::Input) -> impl actix_web::Responder { + let cid = input.container_id.clone(); + let image = input.image.clone(); + let ns = input.ns.clone(); + let _ = self + .service + .create_container(&image, &cid, &ns) + .await + .unwrap(); + HttpResponse::Ok().json(format!("Container {} created successfully!", cid)) + } +} diff --git a/crates/provider/src/handlers/function_list.rs b/crates/provider/src/handlers/function_list.rs new file mode 100644 index 0000000..b9ccc30 --- /dev/null +++ b/crates/provider/src/handlers/function_list.rs @@ -0,0 +1,25 @@ +use actix_web::HttpResponse; + +pub struct FunctionLister { + service: std::sync::Arc, +} + +impl super::IAmHandler for FunctionLister { + type Input = String; + // type Output = Vec; + + async fn execute(&self, input: Self::Input) -> impl actix_web::Responder { + // faasd进来的第一步是验证命名空间的标签是否具有某个值,也就是验证是否为true,确保命名空间有效 + // 但是这里省略,因为好像标签为空? + + let containers = self + .service + .get_container_list(input.as_str()) + .await + .unwrap(); + + for container in containers.iter() {} + + HttpResponse::Ok().json("函数列表") + } +} diff --git a/crates/provider/src/handlers/mod.rs b/crates/provider/src/handlers/mod.rs index d324d59..0c00718 100644 --- a/crates/provider/src/handlers/mod.rs +++ b/crates/provider/src/handlers/mod.rs @@ -1,8 +1,11 @@ +pub mod deploy; +pub mod function_list; +pub mod namespace_list; +use lazy_static::lazy_static; +use std::collections::HashMap; - -use actix_web::{HttpResponse, Responder, HttpRequest}; - - +use actix_web::{HttpRequest, HttpResponse, Responder}; +use serde::{Serialize, de::DeserializeOwned}; pub async fn function_lister(_req: HttpRequest) -> impl Responder { HttpResponse::Ok().body("函数列表") @@ -58,4 +61,65 @@ pub async fn telemetry(_req: HttpRequest) -> impl Responder { pub async fn health(_req: HttpRequest) -> impl Responder { HttpResponse::Ok().body("健康检查") -} \ No newline at end of file +} + +// lazy_static! { +// pub static ref HANDLERS: HashMap> = { +// let mut map = HashMap::new(); +// map.insert( +// "function_list".to_string(), +// Box::new(function_list::FunctionLister), +// ); +// map.insert( +// "namespace_list".to_string(), +// Box::new(namespace_list::NamespaceLister), +// ); +// map +// }; +// } + +#[derive(Debug, thiserror::Error)] +pub struct FaasError { + message: String, + error_type: FaasErrorType, + #[source] + source: Option>, +} + +#[derive(Debug)] +pub enum FaasErrorType { + ContainerFailure, + Timeout, + InternalError, +} + +impl std::fmt::Display for FaasError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[{:?}] {}", self.error_type, self.message) + } +} + +// 实现从常见错误类型转换 +impl From for FaasError { + fn from(err: std::io::Error) -> Self { + FaasError { + message: format!("IO error: {}", err), + error_type: FaasErrorType::InternalError, + source: Some(Box::new(err)), + } + } +} + +pub trait IAmHandler { + type Input: DeserializeOwned + Send + 'static; + // type Output: Serialize + Send + 'static; + + /// 获取Handler元数据(函数名、超时时间等) + // fn metadata(&self) -> HandlerMeta; + + /// 执行核心逻辑 + fn execute( + &self, + input: Self::Input, + ) -> impl std::future::Future + Send; +} diff --git a/crates/provider/src/handlers/namespace_list.rs b/crates/provider/src/handlers/namespace_list.rs new file mode 100644 index 0000000..17a2780 --- /dev/null +++ b/crates/provider/src/handlers/namespace_list.rs @@ -0,0 +1,17 @@ +use super::IAmHandler; +use actix_web::{HttpResponse, Responder}; +use service::Service; +use std::sync::Arc; + +pub struct NamespaceLister { + service: Arc, +} + +impl IAmHandler for NamespaceLister { + type Input = (); + // type Output = Vec; + async fn execute(&self, _input: Self::Input) -> impl Responder { + let ns_list = self.service.list_namespaces().await.unwrap(); + HttpResponse::Ok().json(ns_list) + } +} diff --git a/crates/provider/src/httputils/mod.rs b/crates/provider/src/httputils/mod.rs index e69de29..8b13789 100644 --- a/crates/provider/src/httputils/mod.rs +++ b/crates/provider/src/httputils/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index bbf3952..fba7144 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -1,9 +1,9 @@ +pub mod auth; +pub mod bootstrap; pub mod config; pub mod handlers; -pub mod types; pub mod httputils; -pub mod proxy; -pub mod auth; pub mod logs; pub mod metrics; -pub mod bootstrap; +pub mod proxy; +pub mod types; diff --git a/crates/provider/src/logs/mod.rs b/crates/provider/src/logs/mod.rs index e69de29..8b13789 100644 --- a/crates/provider/src/logs/mod.rs +++ b/crates/provider/src/logs/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/provider/src/metrics/mod.rs b/crates/provider/src/metrics/mod.rs index 0484561..6362eb4 100644 --- a/crates/provider/src/metrics/mod.rs +++ b/crates/provider/src/metrics/mod.rs @@ -1,5 +1,5 @@ -use prometheus::{self, register_histogram_vec, register_int_counter_vec}; use lazy_static::lazy_static; +use prometheus::{self, register_histogram_vec, register_int_counter_vec}; lazy_static! { pub static ref HTTP_METRICS: HttpMetrics = HttpMetrics::new(); @@ -18,12 +18,14 @@ impl HttpMetrics { "http_request_duration_seconds", "Request duration in seconds", &["method", "path", "status"] - ).unwrap(), + ) + .unwrap(), requests_total: register_int_counter_vec!( "http_requests_total", "Total number of HTTP requests", &["method", "path", "status"] - ).unwrap(), + ) + .unwrap(), } } } diff --git a/crates/provider/src/proxy/mod.rs b/crates/provider/src/proxy/mod.rs index e69de29..8b13789 100644 --- a/crates/provider/src/proxy/mod.rs +++ b/crates/provider/src/proxy/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/provider/src/types/function_deployment.rs b/crates/provider/src/types/function_deployment.rs new file mode 100644 index 0000000..731b499 --- /dev/null +++ b/crates/provider/src/types/function_deployment.rs @@ -0,0 +1,64 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Serialize, Deserialize, Debug)] +pub struct FunctionDeployment { + /// Service is the name of the function deployment + pub service: String, + + /// Image is a fully-qualified container image + pub image: String, + + /// Namespace for the function, if supported by the faas-provider + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option, + + /// EnvProcess overrides the fprocess environment variable and can be used + /// with the watchdog + #[serde(rename = "envProcess", skip_serializing_if = "Option::is_none")] + pub env_process: Option, + + /// EnvVars can be provided to set environment variables for the function runtime. + #[serde(skip_serializing_if = "Option::is_none")] + pub env_vars: Option>, + + /// Constraints are specific to the faas-provider. + #[serde(skip_serializing_if = "Option::is_none")] + pub constraints: Option>, + + /// Secrets list of secrets to be made available to function + #[serde(skip_serializing_if = "Option::is_none")] + pub secrets: Option>, + + /// Labels are metadata for functions which may be used by the + /// faas-provider or the gateway + #[serde(skip_serializing_if = "Option::is_none")] + pub labels: Option>, + + /// Annotations are metadata for functions which may be used by the + /// faas-provider or the gateway + #[serde(skip_serializing_if = "Option::is_none")] + pub annotations: Option>, + + /// Limits for function + #[serde(skip_serializing_if = "Option::is_none")] + pub limits: Option, + + /// Requests of resources requested by function + #[serde(skip_serializing_if = "Option::is_none")] + pub requests: Option, + + /// ReadOnlyRootFilesystem removes write-access from the root filesystem + /// mount-point. + #[serde(rename = "readOnlyRootFilesystem", default)] + pub read_only_root_filesystem: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct FunctionResources { + #[serde(skip_serializing_if = "Option::is_none")] + pub memory: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu: Option, +} diff --git a/crates/provider/src/types/mod.rs b/crates/provider/src/types/mod.rs index a105933..2057759 100644 --- a/crates/provider/src/types/mod.rs +++ b/crates/provider/src/types/mod.rs @@ -1 +1,11 @@ -pub mod config; \ No newline at end of file +use serde::{Deserialize, Serialize}; + +pub mod config; +pub mod function_deployment; + +#[derive(Serialize, Deserialize)] +pub struct CreateContainerInfo { + pub container_id: String, + pub image: String, + pub ns: String, +} diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index 4ec5ea6..b456835 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -5,8 +5,9 @@ use containerd_client::{ Client, services::v1::{ Container, CreateContainerRequest, CreateTaskRequest, DeleteContainerRequest, - DeleteTaskRequest, GetImageRequest, KillRequest, ListContainersRequest, ListTasksRequest, - ReadContentRequest, StartRequest, TransferOptions, TransferRequest, WaitRequest, + DeleteTaskRequest, GetImageRequest, KillRequest, ListContainersRequest, + ListNamespacesRequest, ListTasksRequest, ReadContentRequest, StartRequest, TransferOptions, + TransferRequest, WaitRequest, container::Runtime, snapshots::{MountsRequest, PrepareSnapshotRequest}, }, @@ -22,12 +23,7 @@ use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, Media use prost_types::Any; use sha2::{Digest, Sha256}; use spec::{DEFAULT_NAMESPACE, generate_spec}; -use std::{ - fs, - sync::{Arc, Mutex}, - time::Duration, - vec, -}; +use std::{fs, sync::Arc, time::Duration, vec}; use tokio::time::timeout; // config.json,dockerhub密钥 @@ -36,14 +32,14 @@ use tokio::time::timeout; type Err = Box; pub struct Service { - client: Arc>, + client: Arc, } impl Service { pub async fn new(endpoint: String) -> Result { let client = Client::from_path(endpoint).await.unwrap(); Ok(Service { - client: Arc::new(Mutex::new(client)), + client: Arc::new(client), }) } @@ -57,8 +53,6 @@ impl Service { }; let resp = self .client - .lock() - .unwrap() .snapshots() .prepare(with_namespace!(req, ns)) .await? @@ -84,7 +78,7 @@ impl Service { value: spec.into_bytes(), }; - let mut containers_client = self.client.lock().unwrap().containers(); + let mut containers_client = self.client.containers(); let container = Container { id: cid.to_string(), image: image_name.to_string(), @@ -117,11 +111,10 @@ impl Service { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); - let c = self.client.lock().unwrap(); let request = ListContainersRequest { ..Default::default() }; - let mut cc = c.containers(); + let mut cc = self.client.containers(); let responce = cc .list(with_namespace!(request, namespace)) @@ -133,7 +126,7 @@ impl Service { .find(|container| container.id == cid); if let Some(container) = container { - let mut tc = c.tasks(); + let mut tc = self.client.tasks(); let request = ListTasksRequest { filter: format!("container=={}", cid), @@ -199,8 +192,7 @@ impl Service { } async fn create_task(&self, cid: &str, ns: &str) -> Result<(), Err> { - let c = self.client.lock().unwrap(); - let mut sc = c.snapshots(); + let mut sc = self.client.snapshots(); let req = MountsRequest { snapshotter: "overlayfs".to_string(), key: cid.to_string(), @@ -211,7 +203,7 @@ impl Service { .into_inner() .mounts; drop(sc); - let mut tc = c.tasks(); + let mut tc = self.client.tasks(); let req = CreateTaskRequest { container_id: cid.to_string(), rootfs: mounts, @@ -227,13 +219,7 @@ impl Service { container_id: cid.to_string(), ..Default::default() }; - let _resp = self - .client - .lock() - .unwrap() - .tasks() - .start(with_namespace!(req, ns)) - .await?; + let _resp = self.client.tasks().start(with_namespace!(req, ns)).await?; Ok(()) } @@ -242,7 +228,7 @@ impl Service { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); - let mut c = self.client.lock().unwrap().tasks(); + let mut c = self.client.tasks(); let kill_request = KillRequest { container_id: cid.to_string(), signal: 15, @@ -265,7 +251,7 @@ impl Service { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); - let mut c = self.client.lock().unwrap().tasks(); + let mut c = self.client.tasks(); let time_out = Duration::from_secs(30); let wait_result = timeout(time_out, async { let wait_request = WaitRequest { @@ -318,7 +304,7 @@ impl Service { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); - let mut c = self.client.lock().unwrap().containers(); + let mut c = self.client.containers(); let request = ListContainersRequest { ..Default::default() @@ -351,7 +337,7 @@ impl Service { } else { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); - let mut c = self.client.lock().unwrap().images(); + let mut c = self.client.images(); let req = GetImageRequest { name: image_name.to_string(), }; @@ -379,7 +365,7 @@ impl Service { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); - let mut c = self.client.lock().unwrap().transfer(); + let mut c = self.client.transfer(); let source = OciRegistry { reference: image_name.to_string(), resolver: Default::default(), @@ -451,7 +437,7 @@ impl Service { size: 0, }; - let mut c = self.client.lock().unwrap().content(); + let mut c = self.client.content(); let resp = c .read(with_namespace!(req, ns)) .await @@ -475,7 +461,7 @@ impl Service { offset: 0, size: 0, }; - let mut c = self.client.lock().unwrap().content(); + let mut c = self.client.content(); let resp = c .read(with_namespace!(req, ns)) @@ -492,7 +478,7 @@ impl Service { } pub async fn get_img_config(&self, name: &str, ns: &str) -> Option { - let mut c = self.client.lock().unwrap().images(); + let mut c = self.client.images(); let req = GetImageRequest { name: name.to_string(), @@ -518,7 +504,7 @@ impl Service { ..Default::default() }; - let mut c = self.client.lock().unwrap().content(); + let mut c = self.client.content(); let resp = c .read(with_namespace!(req, ns)) @@ -598,6 +584,29 @@ impl Service { _ => ns.to_string(), } } + + pub async fn list_namespaces(&self) -> Result, Err> { + let mut c = self.client.namespaces(); + let req = ListNamespacesRequest { + ..Default::default() + }; + let resp = c.list(req).await?; + Ok(resp + .into_inner() + .namespaces + .into_iter() + .map(|ns| ns.name) + .collect()) + } + + // pub async fn get_task_list(&self, ns: &str) -> Result, Err> { + // let mut c = self.client.tasks(); + // let req = ListTasksRequest { + // ..Default::default() + // }; + // let req = c.list(with_namespace!(req, ns)).await?.into_inner().tasks; + // Ok(()) + // } } //容器是容器,要先启动,然后才能运行任务 //要想删除一个正在运行的Task,必须先kill掉这个task,然后才能删除。