迁移项目,完善provider的框架 (#6)

This commit is contained in:
YuLong Huang 2025-03-02 16:37:46 +08:00 committed by GitHub
parent 8ec1bdb475
commit 1759d28dd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1288 additions and 100 deletions

1047
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,3 +5,23 @@ version.workspace = true
authors.workspace = true authors.workspace = true
[dependencies] [dependencies]
actix-web = "4.5.1"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.37.0", features = ["full"] }
bollard = "0.13.0"
uuid = { version = "1.8.0", features = ["v4"] }
actix-web-httpauth = "0.6"
config = "0.11"
thiserror = "1.0"
reqwest = "0.11"
lazy_static = "1.4"
prometheus = "0.13"
tempfile = "3.2"
hyper = "0.14"
tower = "0.4"
regex = "1"
futures = "0.3"
actix-service = "2"
base64 = "0.13"
futures-util = "0.3"

View File

@ -0,0 +1,85 @@
use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error, HttpMessage, HttpResponse};
use actix_web::http::header::HeaderValue;
use actix_web::dev::{Service, Transform};
use futures_util::future::{ok, Ready, LocalBoxFuture};
use std::rc::Rc;
use std::task::{Context, Poll};
use std::collections::HashMap;
//写到使用actix-web-httpauth作为中间件还没有解决read_basic_auth函数的实现返回值和之前在bootstrap的调用不一样
pub struct BasicAuthCredentials {
user: String,
password: String,
}
impl BasicAuthCredentials {
pub fn new(username: &str, password: &str) -> Self {
BasicAuthCredentials {
user: username.to_string(),
password: password.to_string(),
}
}
}
pub struct ReadBasicAuthFromDisk{
secret_mount_path: String,
user_filename: String,
password_filename: String,
}
impl ReadBasicAuthFromDisk {
pub fn new(secret_mount_path: &str, user_filename: &str, password_filename: &str) -> Self {
ReadBasicAuthFromDisk {
secret_mount_path: secret_mount_path.to_string(),
user_filename: user_filename.to_string(),
password_filename: password_filename.to_string(),
}
}
//TODO:这里应该加密?
pub async fn read_basic_auth(&self) -> HashMap<String, String> {
let mut user_map = HashMap::new();
let user_file = std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.user_filename)).unwrap();
let password_file = std::fs::read_to_string(format!("{}/{}", self.secret_mount_path, self.password_filename)).unwrap();
let user_vec: Vec<&str> = user_file.split("\n").collect();
let password_vec: Vec<&str> = password_file.split("\n").collect();
for i in 0..user_vec.len() {
user_map.insert(user_vec[i].to_string(), password_vec[i].to_string());
}
user_map
}
pub async fn basic_auth_validator(&self, req: ServiceRequest) -> Result<ServiceRequest, Error> {
let auth_header = req.headers().get("Authorization");
if let Some(auth_header) = auth_header {
//TODO:to_str()转化失败的处理,或者在之前限制用户输入非法字符
let auth_header = auth_header.to_str().unwrap();
let auth_header = auth_header.split(" ").collect::<Vec<&str>>();
if auth_header.len() != 2 {
return Err(actix_web::error::ErrorUnauthorized("Invalid Authorization Header"));
}
let auth_header = auth_header[1];
let auth_header = base64::decode(auth_header).unwrap();
let auth_header = String::from_utf8(auth_header).unwrap();
let auth_header = auth_header.split(":").collect::<Vec<&str>>();
if auth_header.len() != 2 {
return Err(actix_web::error::ErrorUnauthorized("Invalid Authorization Header"));
}
let username = auth_header[0];
let password = auth_header[1];
let user_map = self.read_basic_auth().await;
if let Some(user) = user_map.get(username) {
if user == password {
return Ok(req);
}
}
}
Err(actix_web::error::ErrorUnauthorized("Invalid Username or Password"))
}
}
async fn index() -> HttpResponse {
HttpResponse::Ok().body("欢迎访问受保护的资源!")
}

View File

@ -0,0 +1,113 @@
use actix_web::{web, App, HttpServer, HttpResponse, middleware, guard, Responder};
use prometheus::Registry;
use std::collections::HashMap;
use crate::{
auth,
metrics::{self, HttpMetrics},
//httputil,
//proxy,
types::config::FaaSConfig,
handlers,
};
//用于函数/服务名称的表达式
const NAME_EXPRESSION: &str = r"-a-zA-Z_0-9\.";
//应用程序状态,存储共享的数据,如配置、指标、认证信息等,为业务函数提供支持
#[derive(Clone)]
struct AppState {
config: FaaSConfig,//应用程序的配置用于识别是否开启Basic Auth等
metrics: HttpMetrics,//用于监视http请求的持续时间和总数
credentials: Option<HashMap<String, String>>, //当有认证信息的时候,获取认证信息
}
//serve 把处理程序headlers load到正确路由规范。这个函数是阻塞的。
async fn serve() -> std::io::Result<()> {
let config = FaaSConfig::new(); //加载配置用于识别是否开启Basic Auth等
let registry = Registry::new();
let metrics = metrics::HttpMetrics::new(); //metrics监视http请求的持续时间和总数
// 用于存储应用程序状态的结构体
let mut app_state = AppState {
config: config.clone(),
metrics: metrics.clone(),
credentials: None,
};
// 如果启用了Basic Auth从指定路径读取认证凭证并存储在应用程序状态中
if config.enable_basic_auth {
// 读取Basic Auth凭证
let auth = auth::ReadBasicAuthFromDisk::new(&config.secret_mount_path, "users.txt", "passwords.txt");
let credentials = auth.read_basic_auth().await; //这里的credentials是所有的账号密码
app_state.credentials = Some(credentials);
//TODO:handlers decorate with basic auth,尚未清楚是不是需要给所有的函数都加上
}
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(app_state.clone())) // 将app_state存储在web::Data中以便在处理程序中访问
.wrap(middleware::Logger::default()) // 记录请求日志
.service(
web::scope("/system")
.service(
web::resource("/functions")
.route(web::get().to(handlers::function_lister))
.route(web::post().to(handlers::deploy_function))
.route(web::delete().to(handlers::delete_function))
.route(web::put().to(handlers::update_function))
)
.service(
web::resource("/function/{name}")
.route(web::get().to(handlers::function_status))
)
.service(
web::resource("/scale-function/{name}")
.route(web::post().to(handlers::scale_function))
)
.service(
web::resource("/info")
.route(web::get().to(handlers::info))
)
.service(
web::resource("/secrets")
.route(web::get().to(handlers::secrets))
.route(web::post().to(handlers::secrets))
.route(web::put().to(handlers::secrets))
.route(web::delete().to(handlers::secrets))
)
.service(
web::resource("/logs")
.route(web::get().to(handlers::logs))
)
.service(
web::resource("/namespaces")
.route(web::get().to(handlers::list_namespaces))
.route(web::post().to(handlers::mutate_namespace))
)
)
.service(
web::scope("/function")
.service(
web::resource("/{name}")
.route(web::get().to(handlers::function_proxy))
.route(web::post().to(handlers::function_proxy))
)
.service(
web::resource("/{name}/{params:.*}")
.route(web::get().to(handlers::function_proxy))
.route(web::post().to(handlers::function_proxy))
)
)
.route("/metrics", web::get().to(handlers::telemetry))
.route("/healthz", web::get().to(handlers::health))
})
.bind(("0.0.0.0", config.tcp_port.unwrap_or(8080)))?
.run()
.await
}
//当上下文完成的时候关闭服务器
//无法关闭时候写进log,并且返回错误

View File

@ -0,0 +1,61 @@
use actix_web::{HttpResponse, Responder, HttpRequest};
pub async fn function_lister(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("函数列表")
}
pub async fn deploy_function(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("部署函数")
}
pub async fn delete_function(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("删除函数")
}
pub async fn update_function(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("更新函数")
}
pub async fn function_status(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("函数状态")
}
pub async fn scale_function(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("扩展函数")
}
pub async fn info(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("信息")
}
pub async fn secrets(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("秘密")
}
pub async fn logs(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("日志")
}
pub async fn list_namespaces(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("命名空间列表")
}
pub async fn mutate_namespace(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("变更命名空间")
}
pub async fn function_proxy(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("函数代理")
}
pub async fn telemetry(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("遥测")
}
pub async fn health(_req: HttpRequest) -> impl Responder {
HttpResponse::Ok().body("健康检查")
}

View File

@ -1,23 +1,9 @@
pub mod config; pub mod config;
pub mod handler; pub mod handlers;
pub mod types; pub mod types;
pub mod httputils; pub mod httputils;
pub mod proxy; pub mod proxy;
pub mod auth; pub mod auth;
pub mod logs; pub mod logs;
pub mod metrics; pub mod metrics;
pub mod bootstrap;
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -0,0 +1,31 @@
use prometheus::{self, register_histogram_vec, register_int_counter_vec};
use lazy_static::lazy_static;
lazy_static! {
pub static ref HTTP_METRICS: HttpMetrics = HttpMetrics::new();
}
#[derive(Clone)]
pub struct HttpMetrics {
pub request_duration: prometheus::HistogramVec,
pub requests_total: prometheus::IntCounterVec,
}
impl HttpMetrics {
pub fn new() -> Self {
Self {
request_duration: register_histogram_vec!(
"http_request_duration_seconds",
"Request duration in seconds",
&["method", "path", "status"]
).unwrap(),
requests_total: register_int_counter_vec!(
"http_requests_total",
"Total number of HTTP requests",
&["method", "path", "status"]
).unwrap(),
}
}
}
pub const TEXT_CONTENT_TYPE: &str = "text/plain; version=0.0.4";

View File

@ -20,6 +20,7 @@ pub struct FaasHandler<S> {
pub telemetry: S, pub telemetry: S,
} }
#[derive(Debug, Clone)]
pub struct FaaSConfig { pub struct FaaSConfig {
pub tcp_port: Option<u16>, pub tcp_port: Option<u16>,
pub read_timeout: Duration, pub read_timeout: Duration,
@ -32,6 +33,18 @@ pub struct FaaSConfig {
} }
impl FaaSConfig { impl FaaSConfig {
pub fn new() -> Self {
Self {
tcp_port: None,
read_timeout: Duration::from_secs(0),
write_timeout: Duration::from_secs(0),
enable_health: false,
enable_basic_auth: false,
secret_mount_path: String::from("/var/openfaas/secrets"),
max_idle_conns: 0,
max_idle_conns_per_host: 0,
}
}
pub fn get_read_timeout(&self) -> Duration { pub fn get_read_timeout(&self) -> Duration {
if self.read_timeout <= Duration::from_secs(0) { if self.read_timeout <= Duration::from_secs(0) {
DEFAULT_READ_TIMEOUT DEFAULT_READ_TIMEOUT