Feat provider framework (#9)

添加了handler大概框架,具体逻辑尚未实现
This commit is contained in:
火花 2025-03-13 15:55:36 +08:00 committed by GitHub
parent 1759d28dd0
commit c33fefa635
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 366 additions and 120 deletions

2
Cargo.lock generated
View File

@ -2018,6 +2018,7 @@ dependencies = [
"actix-service",
"actix-web",
"actix-web-httpauth",
"async-trait",
"base64 0.13.1",
"bollard",
"config",
@ -2030,6 +2031,7 @@ dependencies = [
"reqwest",
"serde 1.0.217",
"serde_json",
"service",
"tempfile",
"thiserror 1.0.69",
"tokio",

View File

@ -6,8 +6,8 @@ authors.workspace = true
[dependencies]
actix-web = "4.5.1"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.37.0", features = ["full"] }
bollard = "0.13.0"
uuid = { version = "1.8.0", features = ["v4"] }
@ -15,7 +15,6 @@ actix-web-httpauth = "0.6"
config = "0.11"
thiserror = "1.0"
reqwest = "0.11"
lazy_static = "1.4"
prometheus = "0.13"
tempfile = "3.2"
hyper = "0.14"
@ -25,3 +24,6 @@ futures = "0.3"
actix-service = "2"
base64 = "0.13"
futures-util = "0.3"
service = { path = "../service" }
async-trait = "0.1"
lazy_static = "1.4.0"

View File

@ -1,10 +1,10 @@
use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error, HttpMessage, HttpResponse};
use actix_web::http::header::HeaderValue;
use actix_web::dev::{Service, Transform};
use futures_util::future::{ok, Ready, LocalBoxFuture};
use actix_web::http::header::HeaderValue;
use actix_web::{Error, HttpMessage, HttpResponse, dev::ServiceRequest, dev::ServiceResponse};
use futures_util::future::{LocalBoxFuture, Ready, ok};
use std::collections::HashMap;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::collections::HashMap;
//写到使用actix-web-httpauth作为中间件还没有解决read_basic_auth函数的实现返回值和之前在bootstrap的调用不一样
@ -22,7 +22,7 @@ impl BasicAuthCredentials {
}
}
pub struct ReadBasicAuthFromDisk{
pub struct ReadBasicAuthFromDisk {
secret_mount_path: String,
user_filename: String,
password_filename: String,
@ -39,15 +39,20 @@ impl ReadBasicAuthFromDisk {
//TODO:这里应该加密?
pub async fn read_basic_auth(&self) -> HashMap<String, String> {
let mut user_map = HashMap::new();
let user_file = std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.user_filename)).unwrap();
let password_file = std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.password_filename)).unwrap();
let user_file =
std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.user_filename))
.unwrap();
let password_file = std::fs::read_to_string(format!(
"{}/{}",
self.secret_mount_path, self.password_filename
))
.unwrap();
let user_vec: Vec<&str> = user_file.split("\n").collect();
let password_vec: Vec<&str> = password_file.split("\n").collect();
for i in 0..user_vec.len() {
user_map.insert(user_vec[i].to_string(), password_vec[i].to_string());
}
user_map
}
pub async fn basic_auth_validator(&self, req: ServiceRequest) -> Result<ServiceRequest, Error> {
@ -57,14 +62,18 @@ impl ReadBasicAuthFromDisk {
let auth_header = auth_header.to_str().unwrap();
let auth_header = auth_header.split(" ").collect::<Vec<&str>>();
if auth_header.len() != 2 {
return Err(actix_web::error::ErrorUnauthorized("Invalid Authorization Header"));
return Err(actix_web::error::ErrorUnauthorized(
"Invalid Authorization Header",
));
}
let auth_header = auth_header[1];
let auth_header = base64::decode(auth_header).unwrap();
let auth_header = String::from_utf8(auth_header).unwrap();
let auth_header = auth_header.split(":").collect::<Vec<&str>>();
if auth_header.len() != 2 {
return Err(actix_web::error::ErrorUnauthorized("Invalid Authorization Header"));
return Err(actix_web::error::ErrorUnauthorized(
"Invalid Authorization Header",
));
}
let username = auth_header[0];
let password = auth_header[1];
@ -75,11 +84,12 @@ impl ReadBasicAuthFromDisk {
}
}
}
Err(actix_web::error::ErrorUnauthorized("Invalid Username or Password"))
Err(actix_web::error::ErrorUnauthorized(
"Invalid Username or Password",
))
}
}
async fn index() -> HttpResponse {
HttpResponse::Ok().body("欢迎访问受保护的资源!")
}

View File

@ -1,25 +1,24 @@
use actix_web::{web, App, HttpServer, HttpResponse, middleware, guard, Responder};
use actix_web::{App, HttpResponse, HttpServer, Responder, guard, middleware, web};
use prometheus::Registry;
use std::collections::HashMap;
use crate::{
auth,
handlers,
metrics::{self, HttpMetrics},
//httputil,
//proxy,
types::config::FaaSConfig,
handlers,
};
//用于函数/服务名称的表达式
const NAME_EXPRESSION: &str = r"-a-zA-Z_0-9\.";
//应用程序状态,存储共享的数据,如配置、指标、认证信息等,为业务函数提供支持
#[derive(Clone)]
struct AppState {
config: FaaSConfig,//应用程序的配置用于识别是否开启Basic Auth等
metrics: HttpMetrics,//用于监视http请求的持续时间和总数
config: FaaSConfig, //应用程序的配置用于识别是否开启Basic Auth等
metrics: HttpMetrics, //用于监视http请求的持续时间和总数
credentials: Option<HashMap<String, String>>, //当有认证信息的时候,获取认证信息
}
@ -39,7 +38,11 @@ async fn serve() -> std::io::Result<()> {
// 如果启用了Basic Auth从指定路径读取认证凭证并存储在应用程序状态中
if config.enable_basic_auth {
// 读取Basic Auth凭证
let auth = auth::ReadBasicAuthFromDisk::new(&config.secret_mount_path, "users.txt", "passwords.txt");
let auth = auth::ReadBasicAuthFromDisk::new(
&config.secret_mount_path,
"users.txt",
"passwords.txt",
);
let credentials = auth.read_basic_auth().await; //这里的credentials是所有的账号密码
app_state.credentials = Some(credentials);
//TODO:handlers decorate with basic auth,尚未清楚是不是需要给所有的函数都加上
@ -56,49 +59,43 @@ async fn serve() -> std::io::Result<()> {
.route(web::get().to(handlers::function_lister))
.route(web::post().to(handlers::deploy_function))
.route(web::delete().to(handlers::delete_function))
.route(web::put().to(handlers::update_function))
.route(web::put().to(handlers::update_function)),
)
.service(
web::resource("/function/{name}")
.route(web::get().to(handlers::function_status))
.route(web::get().to(handlers::function_status)),
)
.service(
web::resource("/scale-function/{name}")
.route(web::post().to(handlers::scale_function))
)
.service(
web::resource("/info")
.route(web::get().to(handlers::info))
.route(web::post().to(handlers::scale_function)),
)
.service(web::resource("/info").route(web::get().to(handlers::info)))
.service(
web::resource("/secrets")
.route(web::get().to(handlers::secrets))
.route(web::post().to(handlers::secrets))
.route(web::put().to(handlers::secrets))
.route(web::delete().to(handlers::secrets))
)
.service(
web::resource("/logs")
.route(web::get().to(handlers::logs))
.route(web::delete().to(handlers::secrets)),
)
.service(web::resource("/logs").route(web::get().to(handlers::logs)))
.service(
web::resource("/namespaces")
.route(web::get().to(handlers::list_namespaces))
.route(web::post().to(handlers::mutate_namespace))
)
.route(web::post().to(handlers::mutate_namespace)),
),
)
.service(
web::scope("/function")
.service(
web::resource("/{name}")
.route(web::get().to(handlers::function_proxy))
.route(web::post().to(handlers::function_proxy))
.route(web::post().to(handlers::function_proxy)),
)
.service(
web::resource("/{name}/{params:.*}")
.route(web::get().to(handlers::function_proxy))
.route(web::post().to(handlers::function_proxy))
)
.route(web::post().to(handlers::function_proxy)),
),
)
.route("/metrics", web::get().to(handlers::telemetry))
.route("/healthz", web::get().to(handlers::health))
@ -108,6 +105,5 @@ async fn serve() -> std::io::Result<()> {
.await
}
//当上下文完成的时候关闭服务器
//无法关闭时候写进log,并且返回错误

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,14 @@
#[allow(unused)]
const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-fn";
#[allow(unused)]
const NAMESPACE_LABEL: &str = "faasrs";
#[allow(unused)]
const FAASRS_NAMESPACE: &str = "faasrs";
#[allow(unused)]
const FAASRS_SERVICE_PULL_ALWAYS: bool = false;
#[allow(unused)]
const DEFAULT_SNAPSHOTTER: &str = "overlayfs";

View File

@ -0,0 +1,27 @@
use super::IAmHandler;
use crate::types::{self, CreateContainerInfo};
use actix_web::HttpResponse;
use service::Service;
use std::sync::Arc;
pub struct DeployHandler {
pub config: types::function_deployment::FunctionDeployment,
service: Arc<Service>,
}
impl IAmHandler for DeployHandler {
type Input = CreateContainerInfo;
// type Output = String;
async fn execute(&self, input: Self::Input) -> impl actix_web::Responder {
let cid = input.container_id.clone();
let image = input.image.clone();
let ns = input.ns.clone();
let _ = self
.service
.create_container(&image, &cid, &ns)
.await
.unwrap();
HttpResponse::Ok().json(format!("Container {} created successfully!", cid))
}
}

View File

@ -0,0 +1,25 @@
use actix_web::HttpResponse;
pub struct FunctionLister {
service: std::sync::Arc<service::Service>,
}
impl super::IAmHandler for FunctionLister {
type Input = String;
// type Output = Vec<String>;
async fn execute(&self, input: Self::Input) -> impl actix_web::Responder {
// faasd进来的第一步是验证命名空间的标签是否具有某个值也就是验证是否为true确保命名空间有效
// 但是这里省略,因为好像标签为空?
let containers = self
.service
.get_container_list(input.as_str())
.await
.unwrap();
for container in containers.iter() {}
HttpResponse::Ok().json("函数列表")
}
}

View File

@ -1,8 +1,11 @@
pub mod deploy;
pub mod function_list;
pub mod namespace_list;
use lazy_static::lazy_static;
use std::collections::HashMap;
use actix_web::{HttpResponse, Responder, HttpRequest};
use actix_web::{HttpRequest, HttpResponse, Responder};
use serde::{Serialize, de::DeserializeOwned};
pub async fn function_lister(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("函数列表")
@ -59,3 +62,64 @@ pub async fn telemetry(_req: HttpRequest) -> impl Responder {
pub async fn health(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("健康检查")
}
// lazy_static! {
// pub static ref HANDLERS: HashMap<String, Box<dyn IAmHandler>> = {
// let mut map = HashMap::new();
// map.insert(
// "function_list".to_string(),
// Box::new(function_list::FunctionLister),
// );
// map.insert(
// "namespace_list".to_string(),
// Box::new(namespace_list::NamespaceLister),
// );
// map
// };
// }
#[derive(Debug, thiserror::Error)]
pub struct FaasError {
message: String,
error_type: FaasErrorType,
#[source]
source: Option<Box<dyn std::error::Error + Send + Sync>>,
}
#[derive(Debug)]
pub enum FaasErrorType {
ContainerFailure,
Timeout,
InternalError,
}
impl std::fmt::Display for FaasError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{:?}] {}", self.error_type, self.message)
}
}
// 实现从常见错误类型转换
impl From<std::io::Error> for FaasError {
fn from(err: std::io::Error) -> Self {
FaasError {
message: format!("IO error: {}", err),
error_type: FaasErrorType::InternalError,
source: Some(Box::new(err)),
}
}
}
pub trait IAmHandler {
type Input: DeserializeOwned + Send + 'static;
// type Output: Serialize + Send + 'static;
/// 获取Handler元数据函数名、超时时间等
// fn metadata(&self) -> HandlerMeta;
/// 执行核心逻辑
fn execute(
&self,
input: Self::Input,
) -> impl std::future::Future<Output = impl Responder> + Send;
}

View File

@ -0,0 +1,17 @@
use super::IAmHandler;
use actix_web::{HttpResponse, Responder};
use service::Service;
use std::sync::Arc;
pub struct NamespaceLister {
service: Arc<Service>,
}
impl IAmHandler for NamespaceLister {
type Input = ();
// type Output = Vec<String>;
async fn execute(&self, _input: Self::Input) -> impl Responder {
let ns_list = self.service.list_namespaces().await.unwrap();
HttpResponse::Ok().json(ns_list)
}
}

View File

@ -0,0 +1 @@

View File

@ -1,9 +1,9 @@
pub mod auth;
pub mod bootstrap;
pub mod config;
pub mod handlers;
pub mod types;
pub mod httputils;
pub mod proxy;
pub mod auth;
pub mod logs;
pub mod metrics;
pub mod bootstrap;
pub mod proxy;
pub mod types;

View File

@ -0,0 +1 @@

View File

@ -1,5 +1,5 @@
use prometheus::{self, register_histogram_vec, register_int_counter_vec};
use lazy_static::lazy_static;
use prometheus::{self, register_histogram_vec, register_int_counter_vec};
lazy_static! {
pub static ref HTTP_METRICS: HttpMetrics = HttpMetrics::new();
@ -18,12 +18,14 @@ impl HttpMetrics {
"http_request_duration_seconds",
"Request duration in seconds",
&["method", "path", "status"]
).unwrap(),
)
.unwrap(),
requests_total: register_int_counter_vec!(
"http_requests_total",
"Total number of HTTP requests",
&["method", "path", "status"]
).unwrap(),
)
.unwrap(),
}
}
}

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,64 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize, Debug)]
pub struct FunctionDeployment {
/// Service is the name of the function deployment
pub service: String,
/// Image is a fully-qualified container image
pub image: String,
/// Namespace for the function, if supported by the faas-provider
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
/// EnvProcess overrides the fprocess environment variable and can be used
/// with the watchdog
#[serde(rename = "envProcess", skip_serializing_if = "Option::is_none")]
pub env_process: Option<String>,
/// EnvVars can be provided to set environment variables for the function runtime.
#[serde(skip_serializing_if = "Option::is_none")]
pub env_vars: Option<HashMap<String, String>>,
/// Constraints are specific to the faas-provider.
#[serde(skip_serializing_if = "Option::is_none")]
pub constraints: Option<Vec<String>>,
/// Secrets list of secrets to be made available to function
#[serde(skip_serializing_if = "Option::is_none")]
pub secrets: Option<Vec<String>>,
/// Labels are metadata for functions which may be used by the
/// faas-provider or the gateway
#[serde(skip_serializing_if = "Option::is_none")]
pub labels: Option<HashMap<String, String>>,
/// Annotations are metadata for functions which may be used by the
/// faas-provider or the gateway
#[serde(skip_serializing_if = "Option::is_none")]
pub annotations: Option<HashMap<String, String>>,
/// Limits for function
#[serde(skip_serializing_if = "Option::is_none")]
pub limits: Option<FunctionResources>,
/// Requests of resources requested by function
#[serde(skip_serializing_if = "Option::is_none")]
pub requests: Option<FunctionResources>,
/// ReadOnlyRootFilesystem removes write-access from the root filesystem
/// mount-point.
#[serde(rename = "readOnlyRootFilesystem", default)]
pub read_only_root_filesystem: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FunctionResources {
#[serde(skip_serializing_if = "Option::is_none")]
pub memory: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cpu: Option<String>,
}

View File

@ -1 +1,11 @@
use serde::{Deserialize, Serialize};
pub mod config;
pub mod function_deployment;
#[derive(Serialize, Deserialize)]
pub struct CreateContainerInfo {
pub container_id: String,
pub image: String,
pub ns: String,
}

View File

@ -5,8 +5,9 @@ use containerd_client::{
Client,
services::v1::{
Container, CreateContainerRequest, CreateTaskRequest, DeleteContainerRequest,
DeleteTaskRequest, GetImageRequest, KillRequest, ListContainersRequest, ListTasksRequest,
ReadContentRequest, StartRequest, TransferOptions, TransferRequest, WaitRequest,
DeleteTaskRequest, GetImageRequest, KillRequest, ListContainersRequest,
ListNamespacesRequest, ListTasksRequest, ReadContentRequest, StartRequest, TransferOptions,
TransferRequest, WaitRequest,
container::Runtime,
snapshots::{MountsRequest, PrepareSnapshotRequest},
},
@ -22,12 +23,7 @@ use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, Media
use prost_types::Any;
use sha2::{Digest, Sha256};
use spec::{DEFAULT_NAMESPACE, generate_spec};
use std::{
fs,
sync::{Arc, Mutex},
time::Duration,
vec,
};
use std::{fs, sync::Arc, time::Duration, vec};
use tokio::time::timeout;
// config.json,dockerhub密钥
@ -36,14 +32,14 @@ use tokio::time::timeout;
type Err = Box<dyn std::error::Error>;
pub struct Service {
client: Arc<Mutex<Client>>,
client: Arc<Client>,
}
impl Service {
pub async fn new(endpoint: String) -> Result<Self, Err> {
let client = Client::from_path(endpoint).await.unwrap();
Ok(Service {
client: Arc::new(Mutex::new(client)),
client: Arc::new(client),
})
}
@ -57,8 +53,6 @@ impl Service {
};
let resp = self
.client
.lock()
.unwrap()
.snapshots()
.prepare(with_namespace!(req, ns))
.await?
@ -84,7 +78,7 @@ impl Service {
value: spec.into_bytes(),
};
let mut containers_client = self.client.lock().unwrap().containers();
let mut containers_client = self.client.containers();
let container = Container {
id: cid.to_string(),
image: image_name.to_string(),
@ -117,11 +111,10 @@ impl Service {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let c = self.client.lock().unwrap();
let request = ListContainersRequest {
..Default::default()
};
let mut cc = c.containers();
let mut cc = self.client.containers();
let responce = cc
.list(with_namespace!(request, namespace))
@ -133,7 +126,7 @@ impl Service {
.find(|container| container.id == cid);
if let Some(container) = container {
let mut tc = c.tasks();
let mut tc = self.client.tasks();
let request = ListTasksRequest {
filter: format!("container=={}", cid),
@ -199,8 +192,7 @@ impl Service {
}
async fn create_task(&self, cid: &str, ns: &str) -> Result<(), Err> {
let c = self.client.lock().unwrap();
let mut sc = c.snapshots();
let mut sc = self.client.snapshots();
let req = MountsRequest {
snapshotter: "overlayfs".to_string(),
key: cid.to_string(),
@ -211,7 +203,7 @@ impl Service {
.into_inner()
.mounts;
drop(sc);
let mut tc = c.tasks();
let mut tc = self.client.tasks();
let req = CreateTaskRequest {
container_id: cid.to_string(),
rootfs: mounts,
@ -227,13 +219,7 @@ impl Service {
container_id: cid.to_string(),
..Default::default()
};
let _resp = self
.client
.lock()
.unwrap()
.tasks()
.start(with_namespace!(req, ns))
.await?;
let _resp = self.client.tasks().start(with_namespace!(req, ns)).await?;
Ok(())
}
@ -242,7 +228,7 @@ impl Service {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.lock().unwrap().tasks();
let mut c = self.client.tasks();
let kill_request = KillRequest {
container_id: cid.to_string(),
signal: 15,
@ -265,7 +251,7 @@ impl Service {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.lock().unwrap().tasks();
let mut c = self.client.tasks();
let time_out = Duration::from_secs(30);
let wait_result = timeout(time_out, async {
let wait_request = WaitRequest {
@ -318,7 +304,7 @@ impl Service {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.lock().unwrap().containers();
let mut c = self.client.containers();
let request = ListContainersRequest {
..Default::default()
@ -351,7 +337,7 @@ impl Service {
} else {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.lock().unwrap().images();
let mut c = self.client.images();
let req = GetImageRequest {
name: image_name.to_string(),
};
@ -379,7 +365,7 @@ impl Service {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.lock().unwrap().transfer();
let mut c = self.client.transfer();
let source = OciRegistry {
reference: image_name.to_string(),
resolver: Default::default(),
@ -451,7 +437,7 @@ impl Service {
size: 0,
};
let mut c = self.client.lock().unwrap().content();
let mut c = self.client.content();
let resp = c
.read(with_namespace!(req, ns))
.await
@ -475,7 +461,7 @@ impl Service {
offset: 0,
size: 0,
};
let mut c = self.client.lock().unwrap().content();
let mut c = self.client.content();
let resp = c
.read(with_namespace!(req, ns))
@ -492,7 +478,7 @@ impl Service {
}
pub async fn get_img_config(&self, name: &str, ns: &str) -> Option<ImageConfiguration> {
let mut c = self.client.lock().unwrap().images();
let mut c = self.client.images();
let req = GetImageRequest {
name: name.to_string(),
@ -518,7 +504,7 @@ impl Service {
..Default::default()
};
let mut c = self.client.lock().unwrap().content();
let mut c = self.client.content();
let resp = c
.read(with_namespace!(req, ns))
@ -598,6 +584,29 @@ impl Service {
_ => ns.to_string(),
}
}
pub async fn list_namespaces(&self) -> Result<Vec<String>, Err> {
let mut c = self.client.namespaces();
let req = ListNamespacesRequest {
..Default::default()
};
let resp = c.list(req).await?;
Ok(resp
.into_inner()
.namespaces
.into_iter()
.map(|ns| ns.name)
.collect())
}
// pub async fn get_task_list(&self, ns: &str) -> Result<Vec<String>, Err> {
// let mut c = self.client.tasks();
// let req = ListTasksRequest {
// ..Default::default()
// };
// let req = c.list(with_namespace!(req, ns)).await?.into_inner().tasks;
// Ok(())
// }
}
//容器是容器,要先启动,然后才能运行任务
//要想删除一个正在运行的Task必须先kill掉这个task然后才能删除。