feat(provider): get information from container. (#19)

This commit is contained in:
aLinChe 2025-03-28 20:40:49 +08:00 committed by GitHub
parent 21e9f3cbe8
commit 81a3613231
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 207 additions and 3 deletions

1
Cargo.lock generated
View File

@ -2036,6 +2036,7 @@ dependencies = [
"async-trait",
"base64 0.13.1",
"bollard",
"cni",
"config",
"futures",
"futures-util",

View File

@ -25,6 +25,7 @@ actix-service = "2"
base64 = "0.13"
futures-util = "0.3"
service = { path = "../service" }
cni = { path = "../cni" }
async-trait = "0.1"
lazy_static = "1.4.0"
log = "0.4"

View File

@ -0,0 +1,144 @@
use crate::handlers::function_list::Function;
// use service::spec::{ Mount, Spec};
use actix_web::cookie::time::Duration;
use std::{collections::HashMap, time::UNIX_EPOCH};
use thiserror::Error;
const ANNOTATION_LABEL_PREFIX: &str = "com.openfaas.annotations.";
#[derive(Error, Debug)]
pub enum FunctionError {
#[error("Function not found: {0}")]
FunctionNotFound(String),
}
impl From<Box<dyn std::error::Error>> for FunctionError {
fn from(error: Box<dyn std::error::Error>) -> Self {
FunctionError::FunctionNotFound(error.to_string())
}
}
pub async fn get_function(
client: &service::Service,
function_name: &str,
namespace: &str,
) -> Result<Function, FunctionError> {
let cid = function_name;
let (_, ip) = client
.get_netns_ip(cid)
.await
.unwrap_or((namespace.to_string(), String::new()));
let container = client
.load_container(cid, namespace)
.await
.map_err(|e| FunctionError::FunctionNotFound(e.to_string()))?;
let container_name = container.id.to_string();
let image = container.image.clone();
let mut pid = 0;
let mut replicas = 0;
let all_labels = container.labels;
let (labels, _) = build_labels_and_annotations(all_labels);
let (env, _) = client.get_env_and_args(&image, namespace).await?;
let (env_vars, env_process) = read_env_from_process_env(env);
// let secrets = read_secrets_from_mounts(&spec.mounts);
// let memory_limit = read_memory_limit_from_spec(&spec);
let timestamp = container.created_at.unwrap_or_default();
let created_at = UNIX_EPOCH + Duration::new(timestamp.seconds, timestamp.nanos);
let task = client
.get_task(cid, namespace)
.await
.map_err(|e| FunctionError::FunctionNotFound(e.to_string()));
match task {
Ok(task) => {
let status = task.status;
if status == 2 {
pid = task.pid;
replicas = 1;
}
}
Err(_) => {
replicas = 0;
}
}
Ok(Function {
name: container_name,
namespace: namespace.to_string(),
image,
pid,
replicas,
ip,
labels,
env_vars,
env_process,
created_at,
})
}
fn build_labels_and_annotations(
ctr_labels: HashMap<String, String>,
) -> (HashMap<String, String>, HashMap<String, String>) {
let mut labels = HashMap::new();
let mut annotations = HashMap::new();
for (k, v) in ctr_labels {
if k.starts_with(ANNOTATION_LABEL_PREFIX) {
annotations.insert(k.trim_start_matches(ANNOTATION_LABEL_PREFIX).to_string(), v);
} else {
labels.insert(k, v);
}
}
(labels, annotations)
}
fn read_env_from_process_env(env: Vec<String>) -> (HashMap<String, String>, String) {
let mut found_env = HashMap::new();
let mut fprocess = String::new();
for e in env {
let kv: Vec<&str> = e.splitn(2, '=').collect();
if kv.len() == 1 {
continue;
}
if kv[0] == "PATH" {
continue;
}
if kv[0] == "fprocess" {
fprocess = kv[1].to_string();
continue;
}
found_env.insert(kv[0].to_string(), kv[1].to_string());
}
(found_env, fprocess)
}
// fn read_secrets_from_mounts(mounts: &[Mount]) -> Vec<String> {
// let mut secrets = Vec::new();
// for mnt in mounts {
// let parts: Vec<&str> = mnt.destination.split("/var/openfaas/secrets/").collect();
// if parts.len() > 1 {
// secrets.push(parts[1].to_string());
// }
// }
// secrets
// }
// fn read_memory_limit_from_spec(spec: &Spec) -> i64 {
// match &spec.linux {
// linux => match &linux.resources {
// resources => match &resources.memory {
// Some(memory) => memory.limit.unwrap_or(0),
// None => 0,
// },
// _ => 0,
// },
// _ => 0,
// }
// }

View File

@ -1,3 +1,5 @@
use std::{collections::HashMap, time::SystemTime};
use actix_web::HttpResponse;
pub struct FunctionLister {
@ -25,3 +27,19 @@ impl super::IAmHandler for FunctionLister {
HttpResponse::Ok().json("函数列表")
}
}
pub struct Function {
pub name: String,
pub namespace: String,
pub image: String,
pub pid: u32,
pub replicas: i32,
pub ip: String,
pub labels: HashMap<String, String>,
// pub annotations: HashMap<String, String>,
// pub secrets: Vec<String>,
pub env_vars: HashMap<String, String>,
pub env_process: String,
// pub memory_limit: i64,
pub created_at: SystemTime,
}

View File

@ -1,4 +1,5 @@
pub mod deploy;
pub mod function_get;
pub mod function_list;
pub mod namespace_list;

View File

@ -16,6 +16,7 @@ use containerd_client::{
types::{
Mount, Platform,
transfer::{ImageStore, OciRegistry, UnpackConfiguration},
v1::Process,
},
with_namespace,
};
@ -62,7 +63,8 @@ impl Service {
}
pub async fn get_netns_ip(&self, cid: &str) -> Option<(String, String)> {
let map = self.netns_map.read().unwrap();
let map: std::sync::RwLockReadGuard<'_, HashMap<String, (String, String)>> =
self.netns_map.read().unwrap();
map.get(cid).cloned()
}
@ -330,6 +332,24 @@ impl Service {
}
}
pub async fn load_container(&self, cid: &str, ns: &str) -> Result<Container, Err> {
let namespace = self.check_namespace(ns);
let mut c = self.client.containers();
let request = ListContainersRequest {
..Default::default()
};
let response = c
.list(with_namespace!(request, namespace))
.await?
.into_inner();
let container = response
.containers
.into_iter()
.find(|container| container.id == cid)
.ok_or_else(|| -> Err { format!("Container {} not found", cid).into() })?;
Ok(container)
}
pub async fn get_container_list(&self, ns: &str) -> Result<Vec<String>, tonic::Status> {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
@ -352,6 +372,25 @@ impl Service {
.collect())
}
pub async fn get_task(&self, cid: &str, ns: &str) -> Result<Process, Err> {
let namespace = self.check_namespace(ns);
let mut tc = self.client.tasks();
let request = ListTasksRequest {
filter: format!("container=={}", cid),
};
let response = tc.list(with_namespace!(request, namespace)).await?;
let tasks = response.into_inner().tasks;
let task = tasks
.into_iter()
.find(|task| task.container_id == cid)
.ok_or_else(|| -> Err { format!("Task for container {} not found", cid).into() })?;
Ok(task)
}
pub async fn get_task_list() {
todo!()
}
@ -610,7 +649,7 @@ impl Service {
Ok(ret)
}
async fn get_env_and_args(
pub async fn get_env_and_args(
&self,
name: &str,
ns: &str,