From 81a3613231a2ad44f1610461984ab0b89f8b5350 Mon Sep 17 00:00:00 2001 From: aLinChe <140829724+oeasy1412@users.noreply.github.com> Date: Fri, 28 Mar 2025 20:40:49 +0800 Subject: [PATCH] feat(provider): get information from container. (#19) --- Cargo.lock | 1 + crates/provider/Cargo.toml | 3 +- crates/provider/src/handlers/function_get.rs | 144 ++++++++++++++++++ crates/provider/src/handlers/function_list.rs | 18 +++ crates/provider/src/handlers/mod.rs | 1 + crates/service/src/lib.rs | 43 +++++- 6 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 crates/provider/src/handlers/function_get.rs diff --git a/Cargo.lock b/Cargo.lock index 9954313..6529716 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2036,6 +2036,7 @@ dependencies = [ "async-trait", "base64 0.13.1", "bollard", + "cni", "config", "futures", "futures-util", diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index ba1f30c..21dde36 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -24,7 +24,8 @@ futures = "0.3" actix-service = "2" base64 = "0.13" futures-util = "0.3" -service = { path = "../service" } +service = { path = "../service" } +cni = { path = "../cni" } async-trait = "0.1" lazy_static = "1.4.0" log = "0.4" diff --git a/crates/provider/src/handlers/function_get.rs b/crates/provider/src/handlers/function_get.rs new file mode 100644 index 0000000..8600a55 --- /dev/null +++ b/crates/provider/src/handlers/function_get.rs @@ -0,0 +1,144 @@ +use crate::handlers::function_list::Function; +// use service::spec::{ Mount, Spec}; +use actix_web::cookie::time::Duration; +use std::{collections::HashMap, time::UNIX_EPOCH}; +use thiserror::Error; + +const ANNOTATION_LABEL_PREFIX: &str = "com.openfaas.annotations."; + +#[derive(Error, Debug)] +pub enum FunctionError { + #[error("Function not found: {0}")] + FunctionNotFound(String), +} + +impl From> for FunctionError { + fn from(error: Box) -> Self { + FunctionError::FunctionNotFound(error.to_string()) + } +} + +pub async fn get_function( + client: &service::Service, + function_name: &str, + namespace: &str, +) -> Result { + let cid = function_name; + let (_, ip) = client + .get_netns_ip(cid) + .await + .unwrap_or((namespace.to_string(), String::new())); + + let container = client + .load_container(cid, namespace) + .await + .map_err(|e| FunctionError::FunctionNotFound(e.to_string()))?; + + let container_name = container.id.to_string(); + let image = container.image.clone(); + let mut pid = 0; + let mut replicas = 0; + + let all_labels = container.labels; + let (labels, _) = build_labels_and_annotations(all_labels); + + let (env, _) = client.get_env_and_args(&image, namespace).await?; + let (env_vars, env_process) = read_env_from_process_env(env); + // let secrets = read_secrets_from_mounts(&spec.mounts); + // let memory_limit = read_memory_limit_from_spec(&spec); + let timestamp = container.created_at.unwrap_or_default(); + let created_at = UNIX_EPOCH + Duration::new(timestamp.seconds, timestamp.nanos); + + let task = client + .get_task(cid, namespace) + .await + .map_err(|e| FunctionError::FunctionNotFound(e.to_string())); + match task { + Ok(task) => { + let status = task.status; + if status == 2 { + pid = task.pid; + replicas = 1; + } + } + Err(_) => { + replicas = 0; + } + } + + Ok(Function { + name: container_name, + namespace: namespace.to_string(), + image, + pid, + replicas, + ip, + labels, + env_vars, + env_process, + created_at, + }) +} + +fn build_labels_and_annotations( + ctr_labels: HashMap, +) -> (HashMap, HashMap) { + let mut labels = HashMap::new(); + let mut annotations = HashMap::new(); + + for (k, v) in ctr_labels { + if k.starts_with(ANNOTATION_LABEL_PREFIX) { + annotations.insert(k.trim_start_matches(ANNOTATION_LABEL_PREFIX).to_string(), v); + } else { + labels.insert(k, v); + } + } + + (labels, annotations) +} + +fn read_env_from_process_env(env: Vec) -> (HashMap, String) { + let mut found_env = HashMap::new(); + let mut fprocess = String::new(); + + for e in env { + let kv: Vec<&str> = e.splitn(2, '=').collect(); + if kv.len() == 1 { + continue; + } + if kv[0] == "PATH" { + continue; + } + if kv[0] == "fprocess" { + fprocess = kv[1].to_string(); + continue; + } + found_env.insert(kv[0].to_string(), kv[1].to_string()); + } + + (found_env, fprocess) +} + +// fn read_secrets_from_mounts(mounts: &[Mount]) -> Vec { +// let mut secrets = Vec::new(); +// for mnt in mounts { +// let parts: Vec<&str> = mnt.destination.split("/var/openfaas/secrets/").collect(); +// if parts.len() > 1 { +// secrets.push(parts[1].to_string()); +// } +// } +// secrets +// } + +// fn read_memory_limit_from_spec(spec: &Spec) -> i64 { +// match &spec.linux { +// linux => match &linux.resources { +// resources => match &resources.memory { +// Some(memory) => memory.limit.unwrap_or(0), +// None => 0, +// }, +// _ => 0, +// }, +// _ => 0, +// } +// } diff --git a/crates/provider/src/handlers/function_list.rs b/crates/provider/src/handlers/function_list.rs index 677ef21..56b6c3d 100644 --- a/crates/provider/src/handlers/function_list.rs +++ b/crates/provider/src/handlers/function_list.rs @@ -1,3 +1,5 @@ +use std::{collections::HashMap, time::SystemTime}; + use actix_web::HttpResponse; pub struct FunctionLister { @@ -25,3 +27,19 @@ impl super::IAmHandler for FunctionLister { HttpResponse::Ok().json("函数列表") } } + +pub struct Function { + pub name: String, + pub namespace: String, + pub image: String, + pub pid: u32, + pub replicas: i32, + pub ip: String, + pub labels: HashMap, + // pub annotations: HashMap, + // pub secrets: Vec, + pub env_vars: HashMap, + pub env_process: String, + // pub memory_limit: i64, + pub created_at: SystemTime, +} diff --git a/crates/provider/src/handlers/mod.rs b/crates/provider/src/handlers/mod.rs index d70789b..9cc6ee0 100644 --- a/crates/provider/src/handlers/mod.rs +++ b/crates/provider/src/handlers/mod.rs @@ -1,4 +1,5 @@ pub mod deploy; +pub mod function_get; pub mod function_list; pub mod namespace_list; diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index 24e6db2..dd1b351 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -16,6 +16,7 @@ use containerd_client::{ types::{ Mount, Platform, transfer::{ImageStore, OciRegistry, UnpackConfiguration}, + v1::Process, }, with_namespace, }; @@ -62,7 +63,8 @@ impl Service { } pub async fn get_netns_ip(&self, cid: &str) -> Option<(String, String)> { - let map = self.netns_map.read().unwrap(); + let map: std::sync::RwLockReadGuard<'_, HashMap> = + self.netns_map.read().unwrap(); map.get(cid).cloned() } @@ -330,6 +332,24 @@ impl Service { } } + pub async fn load_container(&self, cid: &str, ns: &str) -> Result { + let namespace = self.check_namespace(ns); + let mut c = self.client.containers(); + let request = ListContainersRequest { + ..Default::default() + }; + let response = c + .list(with_namespace!(request, namespace)) + .await? + .into_inner(); + let container = response + .containers + .into_iter() + .find(|container| container.id == cid) + .ok_or_else(|| -> Err { format!("Container {} not found", cid).into() })?; + Ok(container) + } + pub async fn get_container_list(&self, ns: &str) -> Result, tonic::Status> { let namespace = self.check_namespace(ns); let namespace = namespace.as_str(); @@ -352,6 +372,25 @@ impl Service { .collect()) } + pub async fn get_task(&self, cid: &str, ns: &str) -> Result { + let namespace = self.check_namespace(ns); + let mut tc = self.client.tasks(); + + let request = ListTasksRequest { + filter: format!("container=={}", cid), + }; + + let response = tc.list(with_namespace!(request, namespace)).await?; + let tasks = response.into_inner().tasks; + + let task = tasks + .into_iter() + .find(|task| task.container_id == cid) + .ok_or_else(|| -> Err { format!("Task for container {} not found", cid).into() })?; + + Ok(task) + } + pub async fn get_task_list() { todo!() } @@ -610,7 +649,7 @@ impl Service { Ok(ret) } - async fn get_env_and_args( + pub async fn get_env_and_args( &self, name: &str, ns: &str,