diff --git a/crates/cni/src/cni_network.rs b/crates/cni/src/cni_network.rs deleted file mode 100644 index 7b81347..0000000 --- a/crates/cni/src/cni_network.rs +++ /dev/null @@ -1,183 +0,0 @@ -use crate::Err; -use lazy_static::lazy_static; -use serde_json::Value; -use std::{ - fmt::Error, - fs::{self, File}, - io::Write, - net::IpAddr, - path::Path, -}; - -lazy_static! { - static ref CNI_BIN_DIR: String = - std::env::var("CNI_BIN_DIR").expect("Environment variable CNI_BIN_DIR is not set"); - static ref CNI_CONF_DIR: String = - std::env::var("CNI_CONF_DIR").expect("Environment variable CNI_CONF_DIR is not set"); - static ref CNI_TOOL: String = - std::env::var("CNI_TOOL").expect("Environment variable CNI_TOOL is not set"); -} - -// const NET_NS_PATH_FMT: &str = "/proc/{}/ns/net"; -const CNI_DATA_DIR: &str = "/var/run/cni"; -const DEFAULT_CNI_CONF_FILENAME: &str = "10-faasrs.conflist"; -const DEFAULT_NETWORK_NAME: &str = "faasrs-cni-bridge"; -const DEFAULT_BRIDGE_NAME: &str = "faasrs0"; -const DEFAULT_SUBNET: &str = "10.66.0.0/16"; -// const DEFAULT_IF_PREFIX: &str = "eth"; - -fn default_cni_conf() -> String { - format!( - r#" -{{ - "cniVersion": "0.4.0", - "name": "{}", - "plugins": [ - {{ - "type": "bridge", - "bridge": "{}", - "isGateway": true, - "ipMasq": true, - "ipam": {{ - "type": "host-local", - "subnet": "{}", - "dataDir": "{}", - "routes": [ - {{ "dst": "0.0.0.0/0" }} - ] - }} - }}, - {{ - "type": "firewall" - }} - ] -}} -"#, - DEFAULT_NETWORK_NAME, DEFAULT_BRIDGE_NAME, DEFAULT_SUBNET, CNI_DATA_DIR - ) -} - -pub fn init_net_work() -> Result<(), Err> { - let cni_conf_dir = CNI_CONF_DIR.as_str(); - if !dir_exists(Path::new(cni_conf_dir)) { - fs::create_dir_all(cni_conf_dir)?; - } - let net_config = Path::new(cni_conf_dir).join(DEFAULT_CNI_CONF_FILENAME); - let mut file = File::create(&net_config)?; - file.write_all(default_cni_conf().as_bytes())?; - - Ok(()) -} - -fn get_netns(ns: &str, cid: &str) -> String { - format!("{}-{}", ns, cid) -} - -fn get_path(netns: &str) -> String { - format!("/var/run/netns/{}", netns) -} - -//TODO: 创建网络和删除网络的错误处理 -pub fn create_cni_network(cid: String, ns: String) -> Result<(String, String), Err> { - // let netid = format!("{}-{}", cid, pid); - let netns = get_netns(ns.as_str(), cid.as_str()); - let path = get_path(netns.as_str()); - let mut ip = String::new(); - - let output = std::process::Command::new("ip") - .arg("netns") - .arg("add") - .arg(&netns) - .output()?; - - if !output.status.success() { - return Err(Box::new(Error)); - } - - let bin = CNI_BIN_DIR.as_str(); - let cnitool = CNI_TOOL.as_str(); - let output = std::process::Command::new(cnitool) - .arg("add") - .arg("faasrs-cni-bridge") - .arg(&path) - .env("CNI_PATH", bin) - .output(); - - match output { - Ok(output) => { - if !output.status.success() { - return Err(Box::new(Error)); - } - let stdout = String::from_utf8_lossy(&output.stdout); - let json: Value = match serde_json::from_str(&stdout) { - Ok(json) => json, - Err(e) => { - return Err(Box::new(e)); - } - }; - if let Some(ips) = json.get("ips").and_then(|ips| ips.as_array()) { - if let Some(first_ip) = ips - .first() - .and_then(|ip| ip.get("address")) - .and_then(|addr| addr.as_str()) - { - ip = first_ip.to_string(); - } - } - } - Err(e) => { - return Err(Box::new(e)); - } - } - - Ok((ip, path)) -} - -pub fn delete_cni_network(ns: &str, cid: &str) { - let netns = get_netns(ns, cid); - let path = get_path(&netns); - let bin = CNI_BIN_DIR.as_str(); - let cnitool = CNI_TOOL.as_str(); - - let _output_del = std::process::Command::new(cnitool) - .arg("del") - .arg("faasrs-cni-bridge") - .arg(&path) - .env("CNI_PATH", bin) - .output(); - let _output = std::process::Command::new("ip") - .arg("netns") - .arg("delete") - .arg(&netns) - .output(); -} - -fn dir_exists(dirname: &Path) -> bool { - path_exists(dirname).is_some_and(|info| info.is_dir()) -} - -fn path_exists(path: &Path) -> Option { - fs::metadata(path).ok() -} - -#[allow(unused)] -fn cni_gateway() -> Result { - let ip: IpAddr = DEFAULT_SUBNET.parse().unwrap(); - if let IpAddr::V4(ip) = ip { - let octets = &mut ip.octets(); - octets[3] = 1; - return Ok(ip.to_string()); - } - Err(Box::new(Error)) -} - -#[allow(unused)] -fn dir_empty(dirname: &Path) -> bool { - if !dir_exists(dirname) { - return false; - } - match fs::read_dir(dirname) { - Ok(mut entries) => entries.next().is_none(), - Err(_) => false, - } -} diff --git a/crates/cni/src/lib.rs b/crates/cni/src/lib.rs index c358cd4..59ba381 100644 --- a/crates/cni/src/lib.rs +++ b/crates/cni/src/lib.rs @@ -1,3 +1,184 @@ -pub mod cni_network; - type Err = Box; + +use lazy_static::lazy_static; +use serde_json::Value; +use std::{ + fmt::Error, + fs::{self, File}, + io::Write, + net::IpAddr, + path::Path, +}; + +lazy_static! { + static ref CNI_BIN_DIR: String = + std::env::var("CNI_BIN_DIR").expect("Environment variable CNI_BIN_DIR is not set"); + static ref CNI_CONF_DIR: String = + std::env::var("CNI_CONF_DIR").expect("Environment variable CNI_CONF_DIR is not set"); + static ref CNI_TOOL: String = + std::env::var("CNI_TOOL").expect("Environment variable CNI_TOOL is not set"); +} + +// const NET_NS_PATH_FMT: &str = "/proc/{}/ns/net"; +const CNI_DATA_DIR: &str = "/var/run/cni"; +const DEFAULT_CNI_CONF_FILENAME: &str = "10-faasrs.conflist"; +const DEFAULT_NETWORK_NAME: &str = "faasrs-cni-bridge"; +const DEFAULT_BRIDGE_NAME: &str = "faasrs0"; +const DEFAULT_SUBNET: &str = "10.66.0.0/16"; +// const DEFAULT_IF_PREFIX: &str = "eth"; + +fn default_cni_conf() -> String { + format!( + r#" +{{ + "cniVersion": "0.4.0", + "name": "{}", + "plugins": [ + {{ + "type": "bridge", + "bridge": "{}", + "isGateway": true, + "ipMasq": true, + "ipam": {{ + "type": "host-local", + "subnet": "{}", + "dataDir": "{}", + "routes": [ + {{ "dst": "0.0.0.0/0" }} + ] + }} + }}, + {{ + "type": "firewall" + }} + ] +}} +"#, + DEFAULT_NETWORK_NAME, DEFAULT_BRIDGE_NAME, DEFAULT_SUBNET, CNI_DATA_DIR + ) +} + +pub fn init_net_work() -> Result<(), Err> { + let cni_conf_dir = CNI_CONF_DIR.as_str(); + if !dir_exists(Path::new(cni_conf_dir)) { + fs::create_dir_all(cni_conf_dir)?; + } + let net_config = Path::new(cni_conf_dir).join(DEFAULT_CNI_CONF_FILENAME); + let mut file = File::create(&net_config)?; + file.write_all(default_cni_conf().as_bytes())?; + + Ok(()) +} + +fn get_netns(ns: &str, cid: &str) -> String { + format!("{}-{}", ns, cid) +} + +fn get_path(netns: &str) -> String { + format!("/var/run/netns/{}", netns) +} + +//TODO: 创建网络和删除网络的错误处理 +pub fn create_cni_network(cid: String, ns: String) -> Result<(String, String), Err> { + // let netid = format!("{}-{}", cid, pid); + let netns = get_netns(ns.as_str(), cid.as_str()); + let path = get_path(netns.as_str()); + let mut ip = String::new(); + + let output = std::process::Command::new("ip") + .arg("netns") + .arg("add") + .arg(&netns) + .output()?; + + if !output.status.success() { + return Err(Box::new(Error)); + } + + let bin = CNI_BIN_DIR.as_str(); + let cnitool = CNI_TOOL.as_str(); + let output = std::process::Command::new(cnitool) + .arg("add") + .arg("faasrs-cni-bridge") + .arg(&path) + .env("CNI_PATH", bin) + .output(); + + match output { + Ok(output) => { + if !output.status.success() { + return Err(Box::new(Error)); + } + let stdout = String::from_utf8_lossy(&output.stdout); + let json: Value = match serde_json::from_str(&stdout) { + Ok(json) => json, + Err(e) => { + return Err(Box::new(e)); + } + }; + if let Some(ips) = json.get("ips").and_then(|ips| ips.as_array()) { + if let Some(first_ip) = ips + .first() + .and_then(|ip| ip.get("address")) + .and_then(|addr| addr.as_str()) + { + ip = first_ip.to_string(); + } + } + } + Err(e) => { + return Err(Box::new(e)); + } + } + + Ok((ip, path)) +} + +pub fn delete_cni_network(ns: &str, cid: &str) { + let netns = get_netns(ns, cid); + let path = get_path(&netns); + let bin = CNI_BIN_DIR.as_str(); + let cnitool = CNI_TOOL.as_str(); + + let _output_del = std::process::Command::new(cnitool) + .arg("del") + .arg("faasrs-cni-bridge") + .arg(&path) + .env("CNI_PATH", bin) + .output(); + let _output = std::process::Command::new("ip") + .arg("netns") + .arg("delete") + .arg(&netns) + .output(); +} + +fn dir_exists(dirname: &Path) -> bool { + path_exists(dirname).is_some_and(|info| info.is_dir()) +} + +fn path_exists(path: &Path) -> Option { + fs::metadata(path).ok() +} + +#[allow(unused)] +fn cni_gateway() -> Result { + let ip: IpAddr = DEFAULT_SUBNET.parse().unwrap(); + if let IpAddr::V4(ip) = ip { + let octets = &mut ip.octets(); + octets[3] = 1; + return Ok(ip.to_string()); + } + Err(Box::new(Error)) +} + +#[allow(unused)] +fn dir_empty(dirname: &Path) -> bool { + if !dir_exists(dirname) { + return false; + } + match fs::read_dir(dirname) { + Ok(mut entries) => entries.next().is_none(), + Err(_) => false, + } +} diff --git a/crates/provider/src/handlers/delete.rs b/crates/provider/src/handlers/delete.rs index 2dee021..18ac163 100644 --- a/crates/provider/src/handlers/delete.rs +++ b/crates/provider/src/handlers/delete.rs @@ -46,7 +46,7 @@ async fn delete( let function = get_function(service, function_name, namespace).await?; if function.replicas != 0 { println!(" delete_cni_network ing {:?}", function.replicas); - cni::cni_network::delete_cni_network(namespace, function_name); + cni::delete_cni_network(namespace, function_name); } else { println!(" function.replicas {:?}", function.replicas); } diff --git a/crates/provider/src/handlers/deploy.rs b/crates/provider/src/handlers/deploy.rs index b5a3ee8..76625f1 100644 --- a/crates/provider/src/handlers/deploy.rs +++ b/crates/provider/src/handlers/deploy.rs @@ -1,11 +1,11 @@ use crate::{ consts, - handlers::utils::{CustomError, map_service_error}, + handlers::utils::CustomError, types::function_deployment::{DeployFunctionInfo, FunctionDeployment}, }; use actix_web::{HttpResponse, Responder, web}; -use service::Service; +use service::{Service, image_manager::ImageManager}; use std::sync::Arc; pub async fn deploy_handler( @@ -66,10 +66,11 @@ async fn deploy(service: &Arc, config: &FunctionDeployment) -> Result<( )); } - service - .prepare_image(&config.image, &namespace, true) + //todo 这里暂时将client设为pub + let client = service.client.as_ref(); + ImageManager::prepare_image(client, &config.image, &namespace, true) .await - .map_err(map_service_error)?; + .map_err(CustomError::from)?; println!("Image '{}' validated", &config.image); service @@ -83,7 +84,7 @@ async fn deploy(service: &Arc, config: &FunctionDeployment) -> Result<( ); service - .create_and_start_task(&config.service, &namespace) + .create_and_start_task(&config.service, &namespace, &config.image) .await .map_err(|e| { CustomError::OtherError(format!( diff --git a/crates/provider/src/handlers/function_get.rs b/crates/provider/src/handlers/function_get.rs index 8eba6c6..696e5b6 100644 --- a/crates/provider/src/handlers/function_get.rs +++ b/crates/provider/src/handlers/function_get.rs @@ -10,6 +10,8 @@ const ANNOTATION_LABEL_PREFIX: &str = "com.openfaas.annotations."; pub enum FunctionError { #[error("Function not found: {0}")] FunctionNotFound(String), + #[error("Runtime Config not found: {0}")] + RuntimeConfigNotFound(String), } impl From> for FunctionError { @@ -39,7 +41,9 @@ pub async fn get_function( let all_labels = container.labels; let (labels, _) = build_labels_and_annotations(all_labels); - let env = client.get_runtime_config(&image, namespace).await?.env; + let env = service::image_manager::ImageManager::get_runtime_config(&image) + .map_err(|e| FunctionError::RuntimeConfigNotFound(e.to_string()))? + .env; let (env_vars, env_process) = read_env_from_process_env(env); // let secrets = read_secrets_from_mounts(&spec.mounts); // let memory_limit = read_memory_limit_from_spec(&spec); diff --git a/crates/provider/src/handlers/utils.rs b/crates/provider/src/handlers/utils.rs index d96f97e..9717817 100644 --- a/crates/provider/src/handlers/utils.rs +++ b/crates/provider/src/handlers/utils.rs @@ -1,6 +1,7 @@ use crate::handlers::function_get::FunctionError; use actix_web::{Error, HttpResponse, ResponseError}; use derive_more::Display; +use service::image_manager::ImageError; pub fn map_service_error(e: Box) -> Error { eprintln!("Service error: {}", e); @@ -18,6 +19,8 @@ pub enum CustomError { ActixError(actix_web::Error), #[display("FunctionError: {}", _0)] FunctionError(FunctionError), + #[display("ImageError: {}", _0)] + ImageError(ImageError), } impl ResponseError for CustomError { @@ -42,6 +45,9 @@ impl ResponseError for CustomError { CustomError::FunctionError(err) => { HttpResponse::InternalServerError().body(err.to_string()) } + CustomError::ImageError(err) => { + HttpResponse::InternalServerError().body(err.to_string()) + } } } } @@ -63,3 +69,9 @@ impl From for CustomError { CustomError::FunctionError(err) } } + +impl From for CustomError { + fn from(err: ImageError) -> Self { + CustomError::ImageError(err) + } +} diff --git a/crates/service/src/image_manager.rs b/crates/service/src/image_manager.rs new file mode 100644 index 0000000..770783a --- /dev/null +++ b/crates/service/src/image_manager.rs @@ -0,0 +1,411 @@ +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use containerd_client::{ + Client, + services::v1::{GetImageRequest, ReadContentRequest, TransferOptions, TransferRequest}, + to_any, + tonic::Request, + types::{ + Platform, + transfer::{ImageStore, OciRegistry, UnpackConfiguration}, + }, + with_namespace, +}; +use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, MediaType, Os}; + +use crate::spec::DEFAULT_NAMESPACE; + +type ImagesMap = Arc>>; +lazy_static::lazy_static! { + static ref GLOBAL_IMAGE_MAP: ImagesMap = Arc::new(RwLock::new(HashMap::new())); +} + +#[derive(Debug, Clone)] +pub struct ImageRuntimeConfig { + pub env: Vec, + pub args: Vec, + pub ports: Vec, +} + +impl ImageRuntimeConfig { + pub fn new(env: Vec, args: Vec, ports: Vec) -> Self { + ImageRuntimeConfig { env, args, ports } + } +} + +impl Drop for ImageManager { + fn drop(&mut self) { + let mut map = GLOBAL_IMAGE_MAP.write().unwrap(); + map.clear(); + } +} + +#[derive(Debug)] +pub enum ImageError { + ImageNotFound(String), + ImagePullFailed(String), + ImageConfigurationNotFound(String), + ReadContentFailed(String), + UnexpectedMediaType, + DeserializationFailed(String), + #[allow(dead_code)] + OtherError, +} + +impl std::fmt::Display for ImageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ImageError::ImageNotFound(msg) => write!(f, "Image not found: {}", msg), + ImageError::ImagePullFailed(msg) => write!(f, "Image pull failed: {}", msg), + ImageError::ImageConfigurationNotFound(msg) => { + write!(f, "Image configuration not found: {}", msg) + } + ImageError::ReadContentFailed(msg) => write!(f, "Read content failed: {}", msg), + ImageError::UnexpectedMediaType => { + write!(f, "Unexpected media type") + } + ImageError::DeserializationFailed(msg) => { + write!(f, "Deserialization failed: {}", msg) + } + ImageError::OtherError => write!(f, "Other error happened"), + } + } +} + +impl std::error::Error for ImageError {} + +#[derive(Debug)] +pub struct ImageManager; + +impl ImageManager { + pub async fn prepare_image( + client: &Client, + image_name: &str, + ns: &str, + always_pull: bool, + ) -> Result<(), ImageError> { + if always_pull { + Self::pull_image(client, image_name, ns).await?; + } else { + let namespace = check_namespace(ns); + let namespace = namespace.as_str(); + let mut c = client.images(); + let req = GetImageRequest { + name: image_name.to_string(), + }; + + let resp = match c.get(with_namespace!(req, namespace)).await { + Ok(response) => response.into_inner(), + Err(e) => { + return Err(ImageError::ImageNotFound(format!( + "Failed to get image {}: {}", + image_name, e + ))); + } + }; + if resp.image.is_none() { + Self::pull_image(client, image_name, ns).await?; + } + } + Self::save_img_config(client, image_name, ns).await + } + + pub async fn pull_image(client: &Client, image_name: &str, ns: &str) -> Result<(), ImageError> { + let ns = check_namespace(ns); + let namespace = ns.as_str(); + + let mut c: containerd_client::services::v1::transfer_client::TransferClient< + tonic::transport::Channel, + > = client.transfer(); + let source = OciRegistry { + reference: image_name.to_string(), + resolver: Default::default(), + }; + + // 这里先写死linux amd64 + let platform = Platform { + os: "linux".to_string(), + architecture: "amd64".to_string(), + ..Default::default() + }; + + let dest = ImageStore { + name: image_name.to_string(), + platforms: vec![platform.clone()], + unpacks: vec![UnpackConfiguration { + platform: Some(platform), + ..Default::default() + }], + ..Default::default() + }; + + let anys = to_any(&source); + let anyd = to_any(&dest); + + let req = TransferRequest { + source: Some(anys), + destination: Some(anyd), + options: Some(TransferOptions { + ..Default::default() + }), + }; + + if let Err(e) = c.transfer(with_namespace!(req, namespace)).await { + return Err(ImageError::ImagePullFailed(format!( + "Failed to pull image {}: {}", + image_name, e + ))); + } + + Ok(()) + // Self::save_img_config(client, image_name, ns.as_str()).await + } + + pub async fn save_img_config( + client: &Client, + img_name: &str, + ns: &str, + ) -> Result<(), ImageError> { + let mut c = client.images(); + + let req = GetImageRequest { + name: img_name.to_string(), + }; + let resp = match c.get(with_namespace!(req, ns)).await { + Ok(response) => response.into_inner(), + Err(e) => { + return Err(ImageError::ImageNotFound(format!( + "Failed to get image {}: {}", + img_name, e + ))); + } + }; + + let img_dscr = resp.image.unwrap().target.unwrap(); + let media_type = MediaType::from(img_dscr.media_type.as_str()); + + let req = ReadContentRequest { + digest: img_dscr.digest, + ..Default::default() + }; + + let mut c = client.content(); + + let mut inner = match c.read(with_namespace!(req, ns)).await { + Ok(response) => response.into_inner(), + Err(e) => { + return Err(ImageError::ReadContentFailed(format!( + "Failed to read content of image {}: {}", + img_name, e + ))); + } + }; + + let resp = match inner.message().await { + Ok(response) => response.unwrap().data, + Err(e) => { + return Err(ImageError::ReadContentFailed(format!( + "Failed to get the inner content of image {}: {}", + img_name, e + ))); + } + }; + + drop(c); + + let img_config = match media_type { + MediaType::ImageIndex => Self::handle_index(client, &resp, ns).await.unwrap(), + MediaType::ImageManifest => Self::handle_manifest(client, &resp, ns).await.unwrap(), + MediaType::Other(media_type) => match media_type.as_str() { + "application/vnd.docker.distribution.manifest.list.v2+json" => { + Self::handle_index(client, &resp, ns).await.unwrap() + } + "application/vnd.docker.distribution.manifest.v2+json" => { + Self::handle_manifest(client, &resp, ns).await.unwrap() + } + _ => { + return Err(ImageError::UnexpectedMediaType); + } + }, + _ => { + return Err(ImageError::UnexpectedMediaType); + } + }; + if img_config.is_none() { + return Err(ImageError::ImageConfigurationNotFound(format!( + "save_img_config: Image configuration not found for image {}", + img_name + ))); + } + let img_config = img_config.unwrap(); + Self::insert_image_config(img_name, img_config) + } + + async fn handle_index( + client: &Client, + data: &[u8], + ns: &str, + ) -> Result, ImageError> { + let image_index: ImageIndex = ::serde_json::from_slice(data).map_err(|e| { + ImageError::DeserializationFailed(format!("Failed to parse JSON: {}", e)) + })?; + let img_manifest_dscr = image_index + .manifests() + .iter() + .find(|manifest_entry| match manifest_entry.platform() { + Some(p) => { + #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] + { + matches!(p.architecture(), &Arch::Amd64) && matches!(p.os(), &Os::Linux) + } + #[cfg(target_arch = "aarch64")] + { + matches!(p.architecture(), &Arch::ARM64) && matches!(p.os(), Os::Linux) + //&& matches!(p.variant().as_ref().map(|s| s.as_str()), Some("v8")) + } + } + None => false, + }) + .unwrap(); + + let req = ReadContentRequest { + digest: img_manifest_dscr.digest().to_owned(), + offset: 0, + size: 0, + }; + + let mut c = client.content(); + let mut inner = match c.read(with_namespace!(req, ns)).await { + Ok(response) => response.into_inner(), + Err(e) => { + return Err(ImageError::ReadContentFailed(format!( + "Failed to handler index : {}", + e + ))); + } + }; + + let resp = match inner.message().await { + Ok(response) => response.unwrap().data, + Err(e) => { + return Err(ImageError::ReadContentFailed(format!( + "Failed to handle index inner : {}", + e + ))); + } + }; + drop(c); + + Self::handle_manifest(client, &resp, ns).await + } + + async fn handle_manifest( + client: &Client, + data: &[u8], + ns: &str, + ) -> Result, ImageError> { + let img_manifest: ImageManifest = match ::serde_json::from_slice(data) { + Ok(manifest) => manifest, + Err(e) => { + return Err(ImageError::DeserializationFailed(format!( + "Failed to deserialize image manifest: {}", + e + ))); + } + }; + let img_manifest_dscr = img_manifest.config(); + + let req = ReadContentRequest { + digest: img_manifest_dscr.digest().to_owned(), + offset: 0, + size: 0, + }; + let mut c = client.content(); + + let mut inner = match c.read(with_namespace!(req, ns)).await { + Ok(response) => response.into_inner(), + Err(e) => { + return Err(ImageError::ReadContentFailed(format!( + "Failed to handler index : {}", + e + ))); + } + }; + + let resp = match inner.message().await { + Ok(response) => response.unwrap().data, + Err(e) => { + return Err(ImageError::ReadContentFailed(format!( + "Failed to handle index inner : {}", + e + ))); + } + }; + + Ok(::serde_json::from_slice(&resp).unwrap()) + } + + fn insert_image_config(image_name: &str, config: ImageConfiguration) -> Result<(), ImageError> { + let mut map = GLOBAL_IMAGE_MAP.write().unwrap(); + map.insert(image_name.to_string(), config); + Ok(()) + } + + pub fn get_image_config(image_name: &str) -> Result { + let map = GLOBAL_IMAGE_MAP.read().unwrap(); + if let Some(config) = map.get(image_name) { + Ok(config.clone()) + } else { + Err(ImageError::ImageConfigurationNotFound(format!( + "get_image_config: Image configuration not found for image {}", + image_name + ))) + } + } + + pub fn get_runtime_config(image_name: &str) -> Result { + let map = GLOBAL_IMAGE_MAP.read().unwrap(); + if let Some(config) = map.get(image_name) { + if let Some(config) = config.config() { + let env = config + .env() + .clone() + .expect("Failed to get environment variables"); + let args = config + .cmd() + .clone() + .expect("Failed to get command arguments"); + let ports = config + .exposed_ports() + .clone() + .expect("Failed to get exposed ports"); + Ok(ImageRuntimeConfig::new(env, args, ports)) + } else { + Err(ImageError::ImageConfigurationNotFound(format!( + "Image configuration is empty for image {}", + image_name + ))) + } + } else { + Err(ImageError::ImageConfigurationNotFound(format!( + "get_runtime_config: Image configuration not found for image {}", + image_name + ))) + } + } + + // 不用这个也能拉取镜像? + pub fn get_resolver() { + todo!() + } +} + +fn check_namespace(ns: &str) -> String { + match ns { + "" => DEFAULT_NAMESPACE.to_string(), + _ => ns.to_string(), + } +} diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index 6932ab2..44a0315 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -1,27 +1,21 @@ +pub mod image_manager; pub mod spec; pub mod systemd; -use cni::cni_network::init_net_work; use containerd_client::{ Client, services::v1::{ Container, CreateContainerRequest, CreateTaskRequest, DeleteContainerRequest, - DeleteTaskRequest, GetImageRequest, KillRequest, ListContainersRequest, - ListNamespacesRequest, ListTasksRequest, ReadContentRequest, StartRequest, TransferOptions, - TransferRequest, WaitRequest, + DeleteTaskRequest, KillRequest, ListContainersRequest, ListNamespacesRequest, + ListTasksRequest, StartRequest, WaitRequest, container::Runtime, snapshots::{MountsRequest, PrepareSnapshotRequest}, }, - to_any, tonic::Request, - types::{ - Mount, Platform, - transfer::{ImageStore, OciRegistry, UnpackConfiguration}, - v1::Process, - }, + types::v1::Process, with_namespace, }; -use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, MediaType, Os}; +use image_manager::ImageManager; use prost_types::Any; use sha2::{Digest, Sha256}; use spec::{DEFAULT_NAMESPACE, generate_spec}; @@ -30,7 +24,6 @@ use std::{ fs, sync::{Arc, RwLock}, time::Duration, - vec, }; use tokio::time::timeout; @@ -45,7 +38,7 @@ lazy_static::lazy_static! { type Err = Box; pub struct Service { - client: Arc, + pub client: Arc, netns_map: NetnsMap, } @@ -70,13 +63,12 @@ impl Service { pub async fn get_ip(&self, cid: &str) -> Option { let map = self.netns_map.read().unwrap(); - map.get(cid).map(|net_conf| net_conf.ip.clone()) + map.get(cid).map(|net_conf| net_conf.get_ip()) } pub async fn get_address(&self, cid: &str) -> Option { let map = self.netns_map.read().unwrap(); - map.get(cid) - .map(|net_conf| format!("{}:{}", net_conf.ip, net_conf.ports[0])) + map.get(cid).map(|net_conf| net_conf.get_address()) } pub async fn remove_netns_ip(&self, cid: &str) { @@ -84,40 +76,33 @@ impl Service { map.remove(cid); } - async fn prepare_snapshot( - &self, - cid: &str, - ns: &str, - img_name: &str, - ) -> Result, Err> { - let parent_snapshot = self.get_parent_snapshot(img_name, ns).await?; + async fn prepare_snapshot(&self, cid: &str, ns: &str, img_name: &str) -> Result<(), Err> { + let parent_snapshot = self.get_parent_snapshot(img_name).await?; let req = PrepareSnapshotRequest { snapshotter: "overlayfs".to_string(), key: cid.to_string(), parent: parent_snapshot, ..Default::default() }; - let resp = self + let _resp = self .client .snapshots() .prepare(with_namespace!(req, ns)) - .await? - .into_inner() - .mounts; + .await?; - Ok(resp) + Ok(()) } pub async fn create_container(&self, image_name: &str, cid: &str, ns: &str) -> Result<(), Err> { - let namespace = match ns { - "" => spec::DEFAULT_NAMESPACE, - _ => ns, - }; + let namespace = self.check_namespace(ns); + let namespace = namespace.as_str(); + + self.prepare_snapshot(cid, ns, image_name).await?; + let config = ImageManager::get_runtime_config(image_name).unwrap(); - let _mount = self.prepare_snapshot(cid, ns, image_name).await?; - let config = self.get_runtime_config(image_name, ns).await?; let env = config.env; let args = config.args; + let spec_path = generate_spec(cid, ns, args, env).unwrap(); let spec = fs::read_to_string(spec_path).unwrap(); @@ -144,14 +129,11 @@ impl Service { container: Some(container), }; - // let req = with_namespace!(req, namespace); - let _resp = containers_client .create(with_namespace!(req, namespace)) .await .expect("Failed to create container"); - // println!("Container: {:?} created", cid); Ok(()) } @@ -206,6 +188,7 @@ impl Service { .delete(with_namespace!(delete_request, namespace)) .await .expect("Failed to delete container"); + //todo 这里删除cni? self.remove_netns_ip(cid).await; println!("Container: {:?} deleted", cc); @@ -216,49 +199,44 @@ impl Service { Ok(()) } - pub async fn create_and_start_task(&self, cid: &str, ns: &str) -> Result<(), Err> { - // let tmp = std::env::temp_dir().join("containerd-client-test"); - // println!("Temp dir: {:?}", tmp); - // fs::create_dir_all(&tmp).expect("Failed to create temp directory"); - // let stdin = tmp.join("stdin"); - // let stdout = tmp.join("stdout"); - // let stderr = tmp.join("stderr"); - // File::create(&stdin).expect("Failed to create stdin"); - // File::create(&stdout).expect("Failed to create stdout"); - // File::create(&stderr).expect("Failed to create stderr"); - - let namespace = match ns { - "" => spec::DEFAULT_NAMESPACE, - _ => ns, - }; - self.create_task(cid, namespace).await?; + pub async fn create_and_start_task( + &self, + cid: &str, + ns: &str, + img_name: &str, + ) -> Result<(), Err> { + let namespace = self.check_namespace(ns); + let namespace = namespace.as_str(); + self.create_task(cid, namespace, img_name).await?; self.start_task(cid, namespace).await?; Ok(()) } /// 返回任务的pid - async fn create_task(&self, cid: &str, ns: &str) -> Result { + async fn create_task(&self, cid: &str, ns: &str, img_name: &str) -> Result { let mut sc = self.client.snapshots(); let req = MountsRequest { snapshotter: "overlayfs".to_string(), key: cid.to_string(), }; + let mounts = sc .mounts(with_namespace!(req, ns)) .await? .into_inner() .mounts; + println!("mounts ok"); drop(sc); println!("drop sc ok"); - let _ = init_net_work(); + let _ = cni::init_net_work(); println!("init_net_work ok"); - let (ip, path) = cni::cni_network::create_cni_network(cid.to_string(), ns.to_string())?; - let ports = self.get_runtime_config(cid, ns).await?.ports; + let (ip, path) = cni::create_cni_network(cid.to_string(), ns.to_string())?; + let ports = ImageManager::get_runtime_config(img_name).unwrap().ports; let network_config = NetworkConfig::new(path, ip, ports); println!("create_cni_network ok"); - self.save_network_config(cid, network_config).await; - println!("save_netns_ip ok"); + self.save_network_config(cid, network_config.clone()).await; + println!("save_netns_ip ok, netconfig: {:?}", network_config); let mut tc = self.client.tasks(); let req = CreateTaskRequest { container_id: cid.to_string(), @@ -442,245 +420,8 @@ impl Service { todo!() } - pub async fn prepare_image( - &self, - image_name: &str, - ns: &str, - always_pull: bool, - ) -> Result<(), Err> { - if always_pull { - self.pull_image(image_name, ns).await?; - } else { - let namespace = self.check_namespace(ns); - let namespace = namespace.as_str(); - let mut c = self.client.images(); - let req = GetImageRequest { - name: image_name.to_string(), - }; - let resp = c - .get(with_namespace!(req, namespace)) - .await - .map_err(|e| { - eprintln!( - "Failed to get the config of {} in namespace {}: {}", - image_name, namespace, e - ); - e - }) - .ok() - .unwrap() - .into_inner(); - if resp.image.is_none() { - self.pull_image(image_name, ns).await?; - } - } - Ok(()) - } - - pub async fn pull_image(&self, image_name: &str, ns: &str) -> Result<(), Err> { - let namespace = self.check_namespace(ns); - let namespace = namespace.as_str(); - - let mut c = self.client.transfer(); - let source = OciRegistry { - reference: image_name.to_string(), - resolver: Default::default(), - }; - // 这里先写死linux amd64 - let platform = Platform { - os: "linux".to_string(), - architecture: "amd64".to_string(), - ..Default::default() - }; - let dest = ImageStore { - name: image_name.to_string(), - platforms: vec![platform.clone()], - unpacks: vec![UnpackConfiguration { - platform: Some(platform), - ..Default::default() - }], - ..Default::default() - }; - - let anys = to_any(&source); - let anyd = to_any(&dest); - - let req = TransferRequest { - source: Some(anys), - destination: Some(anyd), - options: Some(TransferOptions { - ..Default::default() - }), - }; - c.transfer(with_namespace!(req, namespace)) - .await - .unwrap_or_else(|_| { - panic!( - "Unable to transfer image {} to namespace {}", - image_name, namespace - ) - }); - Ok(()) - } - - // 不用这个也能拉取镜像? - pub fn get_resolver(&self) { - todo!() - } - - async fn handle_index(&self, data: &[u8], ns: &str) -> Option { - let image_index: ImageIndex = ::serde_json::from_slice(data).unwrap(); - let img_manifest_dscr = image_index - .manifests() - .iter() - .find(|manifest_entry| match manifest_entry.platform() { - Some(p) => { - #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] - { - matches!(p.architecture(), &Arch::Amd64) && matches!(p.os(), &Os::Linux) - } - #[cfg(target_arch = "aarch64")] - { - matches!(p.architecture(), &Arch::ARM64) && matches!(p.os(), Os::Linux) - //&& matches!(p.variant().as_ref().map(|s| s.as_str()), Some("v8")) - } - } - None => false, - }) - .unwrap(); - - let req = ReadContentRequest { - digest: img_manifest_dscr.digest().to_owned(), - offset: 0, - size: 0, - }; - - let mut c = self.client.content(); - let resp = c - .read(with_namespace!(req, ns)) - .await - .expect("Failed to read content") - .into_inner() - .message() - .await - .expect("Failed to read content message") - .unwrap() - .data; - - self.handle_manifest(&resp, ns).await - } - - async fn handle_manifest(&self, data: &[u8], ns: &str) -> Option { - let img_manifest: ImageManifest = ::serde_json::from_slice(data).unwrap(); - let img_manifest_dscr = img_manifest.config(); - - let req = ReadContentRequest { - digest: img_manifest_dscr.digest().to_owned(), - offset: 0, - size: 0, - }; - let mut c = self.client.content(); - - let resp = c - .read(with_namespace!(req, ns)) - .await - .unwrap() - .into_inner() - .message() - .await - .unwrap() - .unwrap() - .data; - - ::serde_json::from_slice(&resp).unwrap() - } - - pub async fn get_img_config(&self, img_name: &str, ns: &str) -> Option { - let mut c = self.client.images(); - - let req = GetImageRequest { - name: img_name.to_string(), - }; - let resp = c - .get(with_namespace!(req, ns)) - .await - .map_err(|e| { - eprintln!( - "Failed to get the config of {} in namespace {}: {}", - img_name, ns, e - ); - e - }) - .ok()? - .into_inner(); - - let img_dscr = resp.image?.target?; - let media_type = MediaType::from(img_dscr.media_type.as_str()); - - let req = ReadContentRequest { - digest: img_dscr.digest, - ..Default::default() - }; - - let mut c = self.client.content(); - - let resp = c - .read(with_namespace!(req, ns)) - .await - .map_err(|e| { - eprintln!( - "Failed to read content for {} in namespace {}: {}", - img_name, ns, e - ); - e - }) - .ok()? - .into_inner() - .message() - .await - .map_err(|e| { - eprintln!( - "Failed to read message for {} in namespace {}: {}", - img_name, ns, e - ); - e - }) - .ok()? - .ok_or_else(|| { - eprintln!("No data found for {} in namespace {}", img_name, ns); - std::io::Error::new(std::io::ErrorKind::NotFound, "No data found") - }) - .ok()? - .data; - - let img_config = match media_type { - MediaType::ImageIndex => self.handle_index(&resp, ns).await.unwrap(), - MediaType::ImageManifest => self.handle_manifest(&resp, ns).await.unwrap(), - MediaType::Other(media_type) => match media_type.as_str() { - "application/vnd.docker.distribution.manifest.list.v2+json" => { - self.handle_index(&resp, ns).await.unwrap() - } - "application/vnd.docker.distribution.manifest.v2+json" => { - self.handle_manifest(&resp, ns).await.unwrap() - } - _ => { - eprintln!("Unexpected media type '{}'", media_type); - return None; - } - }, - _ => { - eprintln!("Unexpected media type '{}'", media_type); - return None; - } - }; - Some(img_config) - } - - async fn get_parent_snapshot(&self, img_name: &str, ns: &str) -> Result { - let img_config = match self.get_img_config(img_name, ns).await { - Some(config) => config, - None => return Err("Failed to get image configuration".into()), - }; + async fn get_parent_snapshot(&self, img_name: &str) -> Result { + let img_config = image_manager::ImageManager::get_image_config(img_name)?; let mut iter = img_config.rootfs().diff_ids().iter(); let mut ret = iter @@ -699,21 +440,6 @@ impl Service { Ok(ret) } - pub async fn get_runtime_config(&self, name: &str, ns: &str) -> Result { - let img_config = self.get_img_config(name, ns).await.unwrap(); - if let Some(config) = img_config.config() { - let env = config.env().as_ref().map_or_else(Vec::new, |v| v.clone()); - let args = config.cmd().as_ref().map_or_else(Vec::new, |v| v.clone()); - let ports = config - .exposed_ports() - .as_ref() - .map_or_else(Vec::new, |v| v.clone()); - Ok(RunTimeConfig::new(env, args, ports)) - } else { - Err("No config found".into()) - } - } - fn check_namespace(&self, ns: &str) -> String { match ns { "" => DEFAULT_NAMESPACE.to_string(), @@ -744,31 +470,32 @@ impl Service { // Ok(()) // } } -//容器是容器,要先启动,然后才能运行任务 -//要想删除一个正在运行的Task,必须先kill掉这个task,然后才能删除。 - -#[derive(Debug)] -pub struct RunTimeConfig { - pub env: Vec, - pub args: Vec, - pub ports: Vec, -} - -impl RunTimeConfig { - pub fn new(env: Vec, args: Vec, ports: Vec) -> Self { - RunTimeConfig { env, args, ports } - } -} #[derive(Debug, Clone)] pub struct NetworkConfig { - pub netns: String, - pub ip: String, - pub ports: Vec, + netns: String, + ip: String, + ports: Vec, } impl NetworkConfig { pub fn new(netns: String, ip: String, ports: Vec) -> Self { NetworkConfig { netns, ip, ports } } + + pub fn get_netns(&self) -> String { + self.netns.clone() + } + + pub fn get_ip(&self) -> String { + self.ip.clone() + } + + pub fn get_address(&self) -> String { + format!( + "{}:{}", + self.ip.split('/').next().unwrap_or(""), + self.ports[0].split('/').next().unwrap_or("") + ) + } }