Feat(image) 移动image成为单独的逻辑 (#61)

* 更改cni的位置

* 提取关于Image的逻辑

* 更改全局ImageMap的名字

* runtime_config获取部分改为expect

* 修改save_img_config的位置

* 完善一下错误处理

* 遗漏的错误处理

* 增加deploy处的错误处理

* fmt
This commit is contained in:
火花 2025-04-14 19:36:56 +08:00 committed by GitHub
parent f5f223db6b
commit 7258f4f6ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 677 additions and 524 deletions

View File

@ -1,183 +0,0 @@
use crate::Err;
use lazy_static::lazy_static;
use serde_json::Value;
use std::{
fmt::Error,
fs::{self, File},
io::Write,
net::IpAddr,
path::Path,
};
lazy_static! {
static ref CNI_BIN_DIR: String =
std::env::var("CNI_BIN_DIR").expect("Environment variable CNI_BIN_DIR is not set");
static ref CNI_CONF_DIR: String =
std::env::var("CNI_CONF_DIR").expect("Environment variable CNI_CONF_DIR is not set");
static ref CNI_TOOL: String =
std::env::var("CNI_TOOL").expect("Environment variable CNI_TOOL is not set");
}
// const NET_NS_PATH_FMT: &str = "/proc/{}/ns/net";
const CNI_DATA_DIR: &str = "/var/run/cni";
const DEFAULT_CNI_CONF_FILENAME: &str = "10-faasrs.conflist";
const DEFAULT_NETWORK_NAME: &str = "faasrs-cni-bridge";
const DEFAULT_BRIDGE_NAME: &str = "faasrs0";
const DEFAULT_SUBNET: &str = "10.66.0.0/16";
// const DEFAULT_IF_PREFIX: &str = "eth";
fn default_cni_conf() -> String {
format!(
r#"
{{
"cniVersion": "0.4.0",
"name": "{}",
"plugins": [
{{
"type": "bridge",
"bridge": "{}",
"isGateway": true,
"ipMasq": true,
"ipam": {{
"type": "host-local",
"subnet": "{}",
"dataDir": "{}",
"routes": [
{{ "dst": "0.0.0.0/0" }}
]
}}
}},
{{
"type": "firewall"
}}
]
}}
"#,
DEFAULT_NETWORK_NAME, DEFAULT_BRIDGE_NAME, DEFAULT_SUBNET, CNI_DATA_DIR
)
}
pub fn init_net_work() -> Result<(), Err> {
let cni_conf_dir = CNI_CONF_DIR.as_str();
if !dir_exists(Path::new(cni_conf_dir)) {
fs::create_dir_all(cni_conf_dir)?;
}
let net_config = Path::new(cni_conf_dir).join(DEFAULT_CNI_CONF_FILENAME);
let mut file = File::create(&net_config)?;
file.write_all(default_cni_conf().as_bytes())?;
Ok(())
}
fn get_netns(ns: &str, cid: &str) -> String {
format!("{}-{}", ns, cid)
}
fn get_path(netns: &str) -> String {
format!("/var/run/netns/{}", netns)
}
//TODO: 创建网络和删除网络的错误处理
pub fn create_cni_network(cid: String, ns: String) -> Result<(String, String), Err> {
// let netid = format!("{}-{}", cid, pid);
let netns = get_netns(ns.as_str(), cid.as_str());
let path = get_path(netns.as_str());
let mut ip = String::new();
let output = std::process::Command::new("ip")
.arg("netns")
.arg("add")
.arg(&netns)
.output()?;
if !output.status.success() {
return Err(Box::new(Error));
}
let bin = CNI_BIN_DIR.as_str();
let cnitool = CNI_TOOL.as_str();
let output = std::process::Command::new(cnitool)
.arg("add")
.arg("faasrs-cni-bridge")
.arg(&path)
.env("CNI_PATH", bin)
.output();
match output {
Ok(output) => {
if !output.status.success() {
return Err(Box::new(Error));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let json: Value = match serde_json::from_str(&stdout) {
Ok(json) => json,
Err(e) => {
return Err(Box::new(e));
}
};
if let Some(ips) = json.get("ips").and_then(|ips| ips.as_array()) {
if let Some(first_ip) = ips
.first()
.and_then(|ip| ip.get("address"))
.and_then(|addr| addr.as_str())
{
ip = first_ip.to_string();
}
}
}
Err(e) => {
return Err(Box::new(e));
}
}
Ok((ip, path))
}
pub fn delete_cni_network(ns: &str, cid: &str) {
let netns = get_netns(ns, cid);
let path = get_path(&netns);
let bin = CNI_BIN_DIR.as_str();
let cnitool = CNI_TOOL.as_str();
let _output_del = std::process::Command::new(cnitool)
.arg("del")
.arg("faasrs-cni-bridge")
.arg(&path)
.env("CNI_PATH", bin)
.output();
let _output = std::process::Command::new("ip")
.arg("netns")
.arg("delete")
.arg(&netns)
.output();
}
fn dir_exists(dirname: &Path) -> bool {
path_exists(dirname).is_some_and(|info| info.is_dir())
}
fn path_exists(path: &Path) -> Option<fs::Metadata> {
fs::metadata(path).ok()
}
#[allow(unused)]
fn cni_gateway() -> Result<String, Err> {
let ip: IpAddr = DEFAULT_SUBNET.parse().unwrap();
if let IpAddr::V4(ip) = ip {
let octets = &mut ip.octets();
octets[3] = 1;
return Ok(ip.to_string());
}
Err(Box::new(Error))
}
#[allow(unused)]
fn dir_empty(dirname: &Path) -> bool {
if !dir_exists(dirname) {
return false;
}
match fs::read_dir(dirname) {
Ok(mut entries) => entries.next().is_none(),
Err(_) => false,
}
}

View File

@ -1,3 +1,184 @@
pub mod cni_network;
type Err = Box<dyn std::error::Error>; type Err = Box<dyn std::error::Error>;
use lazy_static::lazy_static;
use serde_json::Value;
use std::{
fmt::Error,
fs::{self, File},
io::Write,
net::IpAddr,
path::Path,
};
lazy_static! {
static ref CNI_BIN_DIR: String =
std::env::var("CNI_BIN_DIR").expect("Environment variable CNI_BIN_DIR is not set");
static ref CNI_CONF_DIR: String =
std::env::var("CNI_CONF_DIR").expect("Environment variable CNI_CONF_DIR is not set");
static ref CNI_TOOL: String =
std::env::var("CNI_TOOL").expect("Environment variable CNI_TOOL is not set");
}
// const NET_NS_PATH_FMT: &str = "/proc/{}/ns/net";
const CNI_DATA_DIR: &str = "/var/run/cni";
const DEFAULT_CNI_CONF_FILENAME: &str = "10-faasrs.conflist";
const DEFAULT_NETWORK_NAME: &str = "faasrs-cni-bridge";
const DEFAULT_BRIDGE_NAME: &str = "faasrs0";
const DEFAULT_SUBNET: &str = "10.66.0.0/16";
// const DEFAULT_IF_PREFIX: &str = "eth";
fn default_cni_conf() -> String {
format!(
r#"
{{
"cniVersion": "0.4.0",
"name": "{}",
"plugins": [
{{
"type": "bridge",
"bridge": "{}",
"isGateway": true,
"ipMasq": true,
"ipam": {{
"type": "host-local",
"subnet": "{}",
"dataDir": "{}",
"routes": [
{{ "dst": "0.0.0.0/0" }}
]
}}
}},
{{
"type": "firewall"
}}
]
}}
"#,
DEFAULT_NETWORK_NAME, DEFAULT_BRIDGE_NAME, DEFAULT_SUBNET, CNI_DATA_DIR
)
}
pub fn init_net_work() -> Result<(), Err> {
let cni_conf_dir = CNI_CONF_DIR.as_str();
if !dir_exists(Path::new(cni_conf_dir)) {
fs::create_dir_all(cni_conf_dir)?;
}
let net_config = Path::new(cni_conf_dir).join(DEFAULT_CNI_CONF_FILENAME);
let mut file = File::create(&net_config)?;
file.write_all(default_cni_conf().as_bytes())?;
Ok(())
}
fn get_netns(ns: &str, cid: &str) -> String {
format!("{}-{}", ns, cid)
}
fn get_path(netns: &str) -> String {
format!("/var/run/netns/{}", netns)
}
//TODO: 创建网络和删除网络的错误处理
pub fn create_cni_network(cid: String, ns: String) -> Result<(String, String), Err> {
// let netid = format!("{}-{}", cid, pid);
let netns = get_netns(ns.as_str(), cid.as_str());
let path = get_path(netns.as_str());
let mut ip = String::new();
let output = std::process::Command::new("ip")
.arg("netns")
.arg("add")
.arg(&netns)
.output()?;
if !output.status.success() {
return Err(Box::new(Error));
}
let bin = CNI_BIN_DIR.as_str();
let cnitool = CNI_TOOL.as_str();
let output = std::process::Command::new(cnitool)
.arg("add")
.arg("faasrs-cni-bridge")
.arg(&path)
.env("CNI_PATH", bin)
.output();
match output {
Ok(output) => {
if !output.status.success() {
return Err(Box::new(Error));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let json: Value = match serde_json::from_str(&stdout) {
Ok(json) => json,
Err(e) => {
return Err(Box::new(e));
}
};
if let Some(ips) = json.get("ips").and_then(|ips| ips.as_array()) {
if let Some(first_ip) = ips
.first()
.and_then(|ip| ip.get("address"))
.and_then(|addr| addr.as_str())
{
ip = first_ip.to_string();
}
}
}
Err(e) => {
return Err(Box::new(e));
}
}
Ok((ip, path))
}
pub fn delete_cni_network(ns: &str, cid: &str) {
let netns = get_netns(ns, cid);
let path = get_path(&netns);
let bin = CNI_BIN_DIR.as_str();
let cnitool = CNI_TOOL.as_str();
let _output_del = std::process::Command::new(cnitool)
.arg("del")
.arg("faasrs-cni-bridge")
.arg(&path)
.env("CNI_PATH", bin)
.output();
let _output = std::process::Command::new("ip")
.arg("netns")
.arg("delete")
.arg(&netns)
.output();
}
fn dir_exists(dirname: &Path) -> bool {
path_exists(dirname).is_some_and(|info| info.is_dir())
}
fn path_exists(path: &Path) -> Option<fs::Metadata> {
fs::metadata(path).ok()
}
#[allow(unused)]
fn cni_gateway() -> Result<String, Err> {
let ip: IpAddr = DEFAULT_SUBNET.parse().unwrap();
if let IpAddr::V4(ip) = ip {
let octets = &mut ip.octets();
octets[3] = 1;
return Ok(ip.to_string());
}
Err(Box::new(Error))
}
#[allow(unused)]
fn dir_empty(dirname: &Path) -> bool {
if !dir_exists(dirname) {
return false;
}
match fs::read_dir(dirname) {
Ok(mut entries) => entries.next().is_none(),
Err(_) => false,
}
}

View File

@ -46,7 +46,7 @@ async fn delete(
let function = get_function(service, function_name, namespace).await?; let function = get_function(service, function_name, namespace).await?;
if function.replicas != 0 { if function.replicas != 0 {
println!(" delete_cni_network ing {:?}", function.replicas); println!(" delete_cni_network ing {:?}", function.replicas);
cni::cni_network::delete_cni_network(namespace, function_name); cni::delete_cni_network(namespace, function_name);
} else { } else {
println!(" function.replicas {:?}", function.replicas); println!(" function.replicas {:?}", function.replicas);
} }

View File

@ -1,11 +1,11 @@
use crate::{ use crate::{
consts, consts,
handlers::utils::{CustomError, map_service_error}, handlers::utils::CustomError,
types::function_deployment::{DeployFunctionInfo, FunctionDeployment}, types::function_deployment::{DeployFunctionInfo, FunctionDeployment},
}; };
use actix_web::{HttpResponse, Responder, web}; use actix_web::{HttpResponse, Responder, web};
use service::Service; use service::{Service, image_manager::ImageManager};
use std::sync::Arc; use std::sync::Arc;
pub async fn deploy_handler( pub async fn deploy_handler(
@ -66,10 +66,11 @@ async fn deploy(service: &Arc<Service>, config: &FunctionDeployment) -> Result<(
)); ));
} }
service //todo 这里暂时将client设为pub
.prepare_image(&config.image, &namespace, true) let client = service.client.as_ref();
ImageManager::prepare_image(client, &config.image, &namespace, true)
.await .await
.map_err(map_service_error)?; .map_err(CustomError::from)?;
println!("Image '{}' validated", &config.image); println!("Image '{}' validated", &config.image);
service service
@ -83,7 +84,7 @@ async fn deploy(service: &Arc<Service>, config: &FunctionDeployment) -> Result<(
); );
service service
.create_and_start_task(&config.service, &namespace) .create_and_start_task(&config.service, &namespace, &config.image)
.await .await
.map_err(|e| { .map_err(|e| {
CustomError::OtherError(format!( CustomError::OtherError(format!(

View File

@ -10,6 +10,8 @@ const ANNOTATION_LABEL_PREFIX: &str = "com.openfaas.annotations.";
pub enum FunctionError { pub enum FunctionError {
#[error("Function not found: {0}")] #[error("Function not found: {0}")]
FunctionNotFound(String), FunctionNotFound(String),
#[error("Runtime Config not found: {0}")]
RuntimeConfigNotFound(String),
} }
impl From<Box<dyn std::error::Error>> for FunctionError { impl From<Box<dyn std::error::Error>> for FunctionError {
@ -39,7 +41,9 @@ pub async fn get_function(
let all_labels = container.labels; let all_labels = container.labels;
let (labels, _) = build_labels_and_annotations(all_labels); let (labels, _) = build_labels_and_annotations(all_labels);
let env = client.get_runtime_config(&image, namespace).await?.env; let env = service::image_manager::ImageManager::get_runtime_config(&image)
.map_err(|e| FunctionError::RuntimeConfigNotFound(e.to_string()))?
.env;
let (env_vars, env_process) = read_env_from_process_env(env); let (env_vars, env_process) = read_env_from_process_env(env);
// let secrets = read_secrets_from_mounts(&spec.mounts); // let secrets = read_secrets_from_mounts(&spec.mounts);
// let memory_limit = read_memory_limit_from_spec(&spec); // let memory_limit = read_memory_limit_from_spec(&spec);

View File

@ -1,6 +1,7 @@
use crate::handlers::function_get::FunctionError; use crate::handlers::function_get::FunctionError;
use actix_web::{Error, HttpResponse, ResponseError}; use actix_web::{Error, HttpResponse, ResponseError};
use derive_more::Display; use derive_more::Display;
use service::image_manager::ImageError;
pub fn map_service_error(e: Box<dyn std::error::Error>) -> Error { pub fn map_service_error(e: Box<dyn std::error::Error>) -> Error {
eprintln!("Service error: {}", e); eprintln!("Service error: {}", e);
@ -18,6 +19,8 @@ pub enum CustomError {
ActixError(actix_web::Error), ActixError(actix_web::Error),
#[display("FunctionError: {}", _0)] #[display("FunctionError: {}", _0)]
FunctionError(FunctionError), FunctionError(FunctionError),
#[display("ImageError: {}", _0)]
ImageError(ImageError),
} }
impl ResponseError for CustomError { impl ResponseError for CustomError {
@ -42,6 +45,9 @@ impl ResponseError for CustomError {
CustomError::FunctionError(err) => { CustomError::FunctionError(err) => {
HttpResponse::InternalServerError().body(err.to_string()) HttpResponse::InternalServerError().body(err.to_string())
} }
CustomError::ImageError(err) => {
HttpResponse::InternalServerError().body(err.to_string())
}
} }
} }
} }
@ -63,3 +69,9 @@ impl From<FunctionError> for CustomError {
CustomError::FunctionError(err) CustomError::FunctionError(err)
} }
} }
impl From<ImageError> for CustomError {
fn from(err: ImageError) -> Self {
CustomError::ImageError(err)
}
}

View File

@ -0,0 +1,411 @@
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use containerd_client::{
Client,
services::v1::{GetImageRequest, ReadContentRequest, TransferOptions, TransferRequest},
to_any,
tonic::Request,
types::{
Platform,
transfer::{ImageStore, OciRegistry, UnpackConfiguration},
},
with_namespace,
};
use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, MediaType, Os};
use crate::spec::DEFAULT_NAMESPACE;
type ImagesMap = Arc<RwLock<HashMap<String, ImageConfiguration>>>;
lazy_static::lazy_static! {
static ref GLOBAL_IMAGE_MAP: ImagesMap = Arc::new(RwLock::new(HashMap::new()));
}
#[derive(Debug, Clone)]
pub struct ImageRuntimeConfig {
pub env: Vec<String>,
pub args: Vec<String>,
pub ports: Vec<String>,
}
impl ImageRuntimeConfig {
pub fn new(env: Vec<String>, args: Vec<String>, ports: Vec<String>) -> Self {
ImageRuntimeConfig { env, args, ports }
}
}
impl Drop for ImageManager {
fn drop(&mut self) {
let mut map = GLOBAL_IMAGE_MAP.write().unwrap();
map.clear();
}
}
#[derive(Debug)]
pub enum ImageError {
ImageNotFound(String),
ImagePullFailed(String),
ImageConfigurationNotFound(String),
ReadContentFailed(String),
UnexpectedMediaType,
DeserializationFailed(String),
#[allow(dead_code)]
OtherError,
}
impl std::fmt::Display for ImageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ImageError::ImageNotFound(msg) => write!(f, "Image not found: {}", msg),
ImageError::ImagePullFailed(msg) => write!(f, "Image pull failed: {}", msg),
ImageError::ImageConfigurationNotFound(msg) => {
write!(f, "Image configuration not found: {}", msg)
}
ImageError::ReadContentFailed(msg) => write!(f, "Read content failed: {}", msg),
ImageError::UnexpectedMediaType => {
write!(f, "Unexpected media type")
}
ImageError::DeserializationFailed(msg) => {
write!(f, "Deserialization failed: {}", msg)
}
ImageError::OtherError => write!(f, "Other error happened"),
}
}
}
impl std::error::Error for ImageError {}
#[derive(Debug)]
pub struct ImageManager;
impl ImageManager {
pub async fn prepare_image(
client: &Client,
image_name: &str,
ns: &str,
always_pull: bool,
) -> Result<(), ImageError> {
if always_pull {
Self::pull_image(client, image_name, ns).await?;
} else {
let namespace = check_namespace(ns);
let namespace = namespace.as_str();
let mut c = client.images();
let req = GetImageRequest {
name: image_name.to_string(),
};
let resp = match c.get(with_namespace!(req, namespace)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ImageNotFound(format!(
"Failed to get image {}: {}",
image_name, e
)));
}
};
if resp.image.is_none() {
Self::pull_image(client, image_name, ns).await?;
}
}
Self::save_img_config(client, image_name, ns).await
}
pub async fn pull_image(client: &Client, image_name: &str, ns: &str) -> Result<(), ImageError> {
let ns = check_namespace(ns);
let namespace = ns.as_str();
let mut c: containerd_client::services::v1::transfer_client::TransferClient<
tonic::transport::Channel,
> = client.transfer();
let source = OciRegistry {
reference: image_name.to_string(),
resolver: Default::default(),
};
// 这里先写死linux amd64
let platform = Platform {
os: "linux".to_string(),
architecture: "amd64".to_string(),
..Default::default()
};
let dest = ImageStore {
name: image_name.to_string(),
platforms: vec![platform.clone()],
unpacks: vec![UnpackConfiguration {
platform: Some(platform),
..Default::default()
}],
..Default::default()
};
let anys = to_any(&source);
let anyd = to_any(&dest);
let req = TransferRequest {
source: Some(anys),
destination: Some(anyd),
options: Some(TransferOptions {
..Default::default()
}),
};
if let Err(e) = c.transfer(with_namespace!(req, namespace)).await {
return Err(ImageError::ImagePullFailed(format!(
"Failed to pull image {}: {}",
image_name, e
)));
}
Ok(())
// Self::save_img_config(client, image_name, ns.as_str()).await
}
pub async fn save_img_config(
client: &Client,
img_name: &str,
ns: &str,
) -> Result<(), ImageError> {
let mut c = client.images();
let req = GetImageRequest {
name: img_name.to_string(),
};
let resp = match c.get(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ImageNotFound(format!(
"Failed to get image {}: {}",
img_name, e
)));
}
};
let img_dscr = resp.image.unwrap().target.unwrap();
let media_type = MediaType::from(img_dscr.media_type.as_str());
let req = ReadContentRequest {
digest: img_dscr.digest,
..Default::default()
};
let mut c = client.content();
let mut inner = match c.read(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to read content of image {}: {}",
img_name, e
)));
}
};
let resp = match inner.message().await {
Ok(response) => response.unwrap().data,
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to get the inner content of image {}: {}",
img_name, e
)));
}
};
drop(c);
let img_config = match media_type {
MediaType::ImageIndex => Self::handle_index(client, &resp, ns).await.unwrap(),
MediaType::ImageManifest => Self::handle_manifest(client, &resp, ns).await.unwrap(),
MediaType::Other(media_type) => match media_type.as_str() {
"application/vnd.docker.distribution.manifest.list.v2+json" => {
Self::handle_index(client, &resp, ns).await.unwrap()
}
"application/vnd.docker.distribution.manifest.v2+json" => {
Self::handle_manifest(client, &resp, ns).await.unwrap()
}
_ => {
return Err(ImageError::UnexpectedMediaType);
}
},
_ => {
return Err(ImageError::UnexpectedMediaType);
}
};
if img_config.is_none() {
return Err(ImageError::ImageConfigurationNotFound(format!(
"save_img_config: Image configuration not found for image {}",
img_name
)));
}
let img_config = img_config.unwrap();
Self::insert_image_config(img_name, img_config)
}
async fn handle_index(
client: &Client,
data: &[u8],
ns: &str,
) -> Result<Option<ImageConfiguration>, ImageError> {
let image_index: ImageIndex = ::serde_json::from_slice(data).map_err(|e| {
ImageError::DeserializationFailed(format!("Failed to parse JSON: {}", e))
})?;
let img_manifest_dscr = image_index
.manifests()
.iter()
.find(|manifest_entry| match manifest_entry.platform() {
Some(p) => {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
matches!(p.architecture(), &Arch::Amd64) && matches!(p.os(), &Os::Linux)
}
#[cfg(target_arch = "aarch64")]
{
matches!(p.architecture(), &Arch::ARM64) && matches!(p.os(), Os::Linux)
//&& matches!(p.variant().as_ref().map(|s| s.as_str()), Some("v8"))
}
}
None => false,
})
.unwrap();
let req = ReadContentRequest {
digest: img_manifest_dscr.digest().to_owned(),
offset: 0,
size: 0,
};
let mut c = client.content();
let mut inner = match c.read(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handler index : {}",
e
)));
}
};
let resp = match inner.message().await {
Ok(response) => response.unwrap().data,
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handle index inner : {}",
e
)));
}
};
drop(c);
Self::handle_manifest(client, &resp, ns).await
}
async fn handle_manifest(
client: &Client,
data: &[u8],
ns: &str,
) -> Result<Option<ImageConfiguration>, ImageError> {
let img_manifest: ImageManifest = match ::serde_json::from_slice(data) {
Ok(manifest) => manifest,
Err(e) => {
return Err(ImageError::DeserializationFailed(format!(
"Failed to deserialize image manifest: {}",
e
)));
}
};
let img_manifest_dscr = img_manifest.config();
let req = ReadContentRequest {
digest: img_manifest_dscr.digest().to_owned(),
offset: 0,
size: 0,
};
let mut c = client.content();
let mut inner = match c.read(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handler index : {}",
e
)));
}
};
let resp = match inner.message().await {
Ok(response) => response.unwrap().data,
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handle index inner : {}",
e
)));
}
};
Ok(::serde_json::from_slice(&resp).unwrap())
}
fn insert_image_config(image_name: &str, config: ImageConfiguration) -> Result<(), ImageError> {
let mut map = GLOBAL_IMAGE_MAP.write().unwrap();
map.insert(image_name.to_string(), config);
Ok(())
}
pub fn get_image_config(image_name: &str) -> Result<ImageConfiguration, ImageError> {
let map = GLOBAL_IMAGE_MAP.read().unwrap();
if let Some(config) = map.get(image_name) {
Ok(config.clone())
} else {
Err(ImageError::ImageConfigurationNotFound(format!(
"get_image_config: Image configuration not found for image {}",
image_name
)))
}
}
pub fn get_runtime_config(image_name: &str) -> Result<ImageRuntimeConfig, ImageError> {
let map = GLOBAL_IMAGE_MAP.read().unwrap();
if let Some(config) = map.get(image_name) {
if let Some(config) = config.config() {
let env = config
.env()
.clone()
.expect("Failed to get environment variables");
let args = config
.cmd()
.clone()
.expect("Failed to get command arguments");
let ports = config
.exposed_ports()
.clone()
.expect("Failed to get exposed ports");
Ok(ImageRuntimeConfig::new(env, args, ports))
} else {
Err(ImageError::ImageConfigurationNotFound(format!(
"Image configuration is empty for image {}",
image_name
)))
}
} else {
Err(ImageError::ImageConfigurationNotFound(format!(
"get_runtime_config: Image configuration not found for image {}",
image_name
)))
}
}
// 不用这个也能拉取镜像?
pub fn get_resolver() {
todo!()
}
}
fn check_namespace(ns: &str) -> String {
match ns {
"" => DEFAULT_NAMESPACE.to_string(),
_ => ns.to_string(),
}
}

View File

@ -1,27 +1,21 @@
pub mod image_manager;
pub mod spec; pub mod spec;
pub mod systemd; pub mod systemd;
use cni::cni_network::init_net_work;
use containerd_client::{ use containerd_client::{
Client, Client,
services::v1::{ services::v1::{
Container, CreateContainerRequest, CreateTaskRequest, DeleteContainerRequest, Container, CreateContainerRequest, CreateTaskRequest, DeleteContainerRequest,
DeleteTaskRequest, GetImageRequest, KillRequest, ListContainersRequest, DeleteTaskRequest, KillRequest, ListContainersRequest, ListNamespacesRequest,
ListNamespacesRequest, ListTasksRequest, ReadContentRequest, StartRequest, TransferOptions, ListTasksRequest, StartRequest, WaitRequest,
TransferRequest, WaitRequest,
container::Runtime, container::Runtime,
snapshots::{MountsRequest, PrepareSnapshotRequest}, snapshots::{MountsRequest, PrepareSnapshotRequest},
}, },
to_any,
tonic::Request, tonic::Request,
types::{ types::v1::Process,
Mount, Platform,
transfer::{ImageStore, OciRegistry, UnpackConfiguration},
v1::Process,
},
with_namespace, with_namespace,
}; };
use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, MediaType, Os}; use image_manager::ImageManager;
use prost_types::Any; use prost_types::Any;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use spec::{DEFAULT_NAMESPACE, generate_spec}; use spec::{DEFAULT_NAMESPACE, generate_spec};
@ -30,7 +24,6 @@ use std::{
fs, fs,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::Duration, time::Duration,
vec,
}; };
use tokio::time::timeout; use tokio::time::timeout;
@ -45,7 +38,7 @@ lazy_static::lazy_static! {
type Err = Box<dyn std::error::Error>; type Err = Box<dyn std::error::Error>;
pub struct Service { pub struct Service {
client: Arc<Client>, pub client: Arc<Client>,
netns_map: NetnsMap, netns_map: NetnsMap,
} }
@ -70,13 +63,12 @@ impl Service {
pub async fn get_ip(&self, cid: &str) -> Option<String> { pub async fn get_ip(&self, cid: &str) -> Option<String> {
let map = self.netns_map.read().unwrap(); let map = self.netns_map.read().unwrap();
map.get(cid).map(|net_conf| net_conf.ip.clone()) map.get(cid).map(|net_conf| net_conf.get_ip())
} }
pub async fn get_address(&self, cid: &str) -> Option<String> { pub async fn get_address(&self, cid: &str) -> Option<String> {
let map = self.netns_map.read().unwrap(); let map = self.netns_map.read().unwrap();
map.get(cid) map.get(cid).map(|net_conf| net_conf.get_address())
.map(|net_conf| format!("{}:{}", net_conf.ip, net_conf.ports[0]))
} }
pub async fn remove_netns_ip(&self, cid: &str) { pub async fn remove_netns_ip(&self, cid: &str) {
@ -84,40 +76,33 @@ impl Service {
map.remove(cid); map.remove(cid);
} }
async fn prepare_snapshot( async fn prepare_snapshot(&self, cid: &str, ns: &str, img_name: &str) -> Result<(), Err> {
&self, let parent_snapshot = self.get_parent_snapshot(img_name).await?;
cid: &str,
ns: &str,
img_name: &str,
) -> Result<Vec<Mount>, Err> {
let parent_snapshot = self.get_parent_snapshot(img_name, ns).await?;
let req = PrepareSnapshotRequest { let req = PrepareSnapshotRequest {
snapshotter: "overlayfs".to_string(), snapshotter: "overlayfs".to_string(),
key: cid.to_string(), key: cid.to_string(),
parent: parent_snapshot, parent: parent_snapshot,
..Default::default() ..Default::default()
}; };
let resp = self let _resp = self
.client .client
.snapshots() .snapshots()
.prepare(with_namespace!(req, ns)) .prepare(with_namespace!(req, ns))
.await? .await?;
.into_inner()
.mounts;
Ok(resp) Ok(())
} }
pub async fn create_container(&self, image_name: &str, cid: &str, ns: &str) -> Result<(), Err> { pub async fn create_container(&self, image_name: &str, cid: &str, ns: &str) -> Result<(), Err> {
let namespace = match ns { let namespace = self.check_namespace(ns);
"" => spec::DEFAULT_NAMESPACE, let namespace = namespace.as_str();
_ => ns,
}; self.prepare_snapshot(cid, ns, image_name).await?;
let config = ImageManager::get_runtime_config(image_name).unwrap();
let _mount = self.prepare_snapshot(cid, ns, image_name).await?;
let config = self.get_runtime_config(image_name, ns).await?;
let env = config.env; let env = config.env;
let args = config.args; let args = config.args;
let spec_path = generate_spec(cid, ns, args, env).unwrap(); let spec_path = generate_spec(cid, ns, args, env).unwrap();
let spec = fs::read_to_string(spec_path).unwrap(); let spec = fs::read_to_string(spec_path).unwrap();
@ -144,14 +129,11 @@ impl Service {
container: Some(container), container: Some(container),
}; };
// let req = with_namespace!(req, namespace);
let _resp = containers_client let _resp = containers_client
.create(with_namespace!(req, namespace)) .create(with_namespace!(req, namespace))
.await .await
.expect("Failed to create container"); .expect("Failed to create container");
// println!("Container: {:?} created", cid);
Ok(()) Ok(())
} }
@ -206,6 +188,7 @@ impl Service {
.delete(with_namespace!(delete_request, namespace)) .delete(with_namespace!(delete_request, namespace))
.await .await
.expect("Failed to delete container"); .expect("Failed to delete container");
//todo 这里删除cni?
self.remove_netns_ip(cid).await; self.remove_netns_ip(cid).await;
println!("Container: {:?} deleted", cc); println!("Container: {:?} deleted", cc);
@ -216,49 +199,44 @@ impl Service {
Ok(()) Ok(())
} }
pub async fn create_and_start_task(&self, cid: &str, ns: &str) -> Result<(), Err> { pub async fn create_and_start_task(
// let tmp = std::env::temp_dir().join("containerd-client-test"); &self,
// println!("Temp dir: {:?}", tmp); cid: &str,
// fs::create_dir_all(&tmp).expect("Failed to create temp directory"); ns: &str,
// let stdin = tmp.join("stdin"); img_name: &str,
// let stdout = tmp.join("stdout"); ) -> Result<(), Err> {
// let stderr = tmp.join("stderr"); let namespace = self.check_namespace(ns);
// File::create(&stdin).expect("Failed to create stdin"); let namespace = namespace.as_str();
// File::create(&stdout).expect("Failed to create stdout"); self.create_task(cid, namespace, img_name).await?;
// File::create(&stderr).expect("Failed to create stderr");
let namespace = match ns {
"" => spec::DEFAULT_NAMESPACE,
_ => ns,
};
self.create_task(cid, namespace).await?;
self.start_task(cid, namespace).await?; self.start_task(cid, namespace).await?;
Ok(()) Ok(())
} }
/// 返回任务的pid /// 返回任务的pid
async fn create_task(&self, cid: &str, ns: &str) -> Result<u32, Err> { async fn create_task(&self, cid: &str, ns: &str, img_name: &str) -> Result<u32, Err> {
let mut sc = self.client.snapshots(); let mut sc = self.client.snapshots();
let req = MountsRequest { let req = MountsRequest {
snapshotter: "overlayfs".to_string(), snapshotter: "overlayfs".to_string(),
key: cid.to_string(), key: cid.to_string(),
}; };
let mounts = sc let mounts = sc
.mounts(with_namespace!(req, ns)) .mounts(with_namespace!(req, ns))
.await? .await?
.into_inner() .into_inner()
.mounts; .mounts;
println!("mounts ok"); println!("mounts ok");
drop(sc); drop(sc);
println!("drop sc ok"); println!("drop sc ok");
let _ = init_net_work(); let _ = cni::init_net_work();
println!("init_net_work ok"); println!("init_net_work ok");
let (ip, path) = cni::cni_network::create_cni_network(cid.to_string(), ns.to_string())?; let (ip, path) = cni::create_cni_network(cid.to_string(), ns.to_string())?;
let ports = self.get_runtime_config(cid, ns).await?.ports; let ports = ImageManager::get_runtime_config(img_name).unwrap().ports;
let network_config = NetworkConfig::new(path, ip, ports); let network_config = NetworkConfig::new(path, ip, ports);
println!("create_cni_network ok"); println!("create_cni_network ok");
self.save_network_config(cid, network_config).await; self.save_network_config(cid, network_config.clone()).await;
println!("save_netns_ip ok"); println!("save_netns_ip ok, netconfig: {:?}", network_config);
let mut tc = self.client.tasks(); let mut tc = self.client.tasks();
let req = CreateTaskRequest { let req = CreateTaskRequest {
container_id: cid.to_string(), container_id: cid.to_string(),
@ -442,245 +420,8 @@ impl Service {
todo!() todo!()
} }
pub async fn prepare_image( async fn get_parent_snapshot(&self, img_name: &str) -> Result<String, Err> {
&self, let img_config = image_manager::ImageManager::get_image_config(img_name)?;
image_name: &str,
ns: &str,
always_pull: bool,
) -> Result<(), Err> {
if always_pull {
self.pull_image(image_name, ns).await?;
} else {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.images();
let req = GetImageRequest {
name: image_name.to_string(),
};
let resp = c
.get(with_namespace!(req, namespace))
.await
.map_err(|e| {
eprintln!(
"Failed to get the config of {} in namespace {}: {}",
image_name, namespace, e
);
e
})
.ok()
.unwrap()
.into_inner();
if resp.image.is_none() {
self.pull_image(image_name, ns).await?;
}
}
Ok(())
}
pub async fn pull_image(&self, image_name: &str, ns: &str) -> Result<(), Err> {
let namespace = self.check_namespace(ns);
let namespace = namespace.as_str();
let mut c = self.client.transfer();
let source = OciRegistry {
reference: image_name.to_string(),
resolver: Default::default(),
};
// 这里先写死linux amd64
let platform = Platform {
os: "linux".to_string(),
architecture: "amd64".to_string(),
..Default::default()
};
let dest = ImageStore {
name: image_name.to_string(),
platforms: vec![platform.clone()],
unpacks: vec![UnpackConfiguration {
platform: Some(platform),
..Default::default()
}],
..Default::default()
};
let anys = to_any(&source);
let anyd = to_any(&dest);
let req = TransferRequest {
source: Some(anys),
destination: Some(anyd),
options: Some(TransferOptions {
..Default::default()
}),
};
c.transfer(with_namespace!(req, namespace))
.await
.unwrap_or_else(|_| {
panic!(
"Unable to transfer image {} to namespace {}",
image_name, namespace
)
});
Ok(())
}
// 不用这个也能拉取镜像?
pub fn get_resolver(&self) {
todo!()
}
async fn handle_index(&self, data: &[u8], ns: &str) -> Option<ImageConfiguration> {
let image_index: ImageIndex = ::serde_json::from_slice(data).unwrap();
let img_manifest_dscr = image_index
.manifests()
.iter()
.find(|manifest_entry| match manifest_entry.platform() {
Some(p) => {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
matches!(p.architecture(), &Arch::Amd64) && matches!(p.os(), &Os::Linux)
}
#[cfg(target_arch = "aarch64")]
{
matches!(p.architecture(), &Arch::ARM64) && matches!(p.os(), Os::Linux)
//&& matches!(p.variant().as_ref().map(|s| s.as_str()), Some("v8"))
}
}
None => false,
})
.unwrap();
let req = ReadContentRequest {
digest: img_manifest_dscr.digest().to_owned(),
offset: 0,
size: 0,
};
let mut c = self.client.content();
let resp = c
.read(with_namespace!(req, ns))
.await
.expect("Failed to read content")
.into_inner()
.message()
.await
.expect("Failed to read content message")
.unwrap()
.data;
self.handle_manifest(&resp, ns).await
}
async fn handle_manifest(&self, data: &[u8], ns: &str) -> Option<ImageConfiguration> {
let img_manifest: ImageManifest = ::serde_json::from_slice(data).unwrap();
let img_manifest_dscr = img_manifest.config();
let req = ReadContentRequest {
digest: img_manifest_dscr.digest().to_owned(),
offset: 0,
size: 0,
};
let mut c = self.client.content();
let resp = c
.read(with_namespace!(req, ns))
.await
.unwrap()
.into_inner()
.message()
.await
.unwrap()
.unwrap()
.data;
::serde_json::from_slice(&resp).unwrap()
}
pub async fn get_img_config(&self, img_name: &str, ns: &str) -> Option<ImageConfiguration> {
let mut c = self.client.images();
let req = GetImageRequest {
name: img_name.to_string(),
};
let resp = c
.get(with_namespace!(req, ns))
.await
.map_err(|e| {
eprintln!(
"Failed to get the config of {} in namespace {}: {}",
img_name, ns, e
);
e
})
.ok()?
.into_inner();
let img_dscr = resp.image?.target?;
let media_type = MediaType::from(img_dscr.media_type.as_str());
let req = ReadContentRequest {
digest: img_dscr.digest,
..Default::default()
};
let mut c = self.client.content();
let resp = c
.read(with_namespace!(req, ns))
.await
.map_err(|e| {
eprintln!(
"Failed to read content for {} in namespace {}: {}",
img_name, ns, e
);
e
})
.ok()?
.into_inner()
.message()
.await
.map_err(|e| {
eprintln!(
"Failed to read message for {} in namespace {}: {}",
img_name, ns, e
);
e
})
.ok()?
.ok_or_else(|| {
eprintln!("No data found for {} in namespace {}", img_name, ns);
std::io::Error::new(std::io::ErrorKind::NotFound, "No data found")
})
.ok()?
.data;
let img_config = match media_type {
MediaType::ImageIndex => self.handle_index(&resp, ns).await.unwrap(),
MediaType::ImageManifest => self.handle_manifest(&resp, ns).await.unwrap(),
MediaType::Other(media_type) => match media_type.as_str() {
"application/vnd.docker.distribution.manifest.list.v2+json" => {
self.handle_index(&resp, ns).await.unwrap()
}
"application/vnd.docker.distribution.manifest.v2+json" => {
self.handle_manifest(&resp, ns).await.unwrap()
}
_ => {
eprintln!("Unexpected media type '{}'", media_type);
return None;
}
},
_ => {
eprintln!("Unexpected media type '{}'", media_type);
return None;
}
};
Some(img_config)
}
async fn get_parent_snapshot(&self, img_name: &str, ns: &str) -> Result<String, Err> {
let img_config = match self.get_img_config(img_name, ns).await {
Some(config) => config,
None => return Err("Failed to get image configuration".into()),
};
let mut iter = img_config.rootfs().diff_ids().iter(); let mut iter = img_config.rootfs().diff_ids().iter();
let mut ret = iter let mut ret = iter
@ -699,21 +440,6 @@ impl Service {
Ok(ret) Ok(ret)
} }
pub async fn get_runtime_config(&self, name: &str, ns: &str) -> Result<RunTimeConfig, Err> {
let img_config = self.get_img_config(name, ns).await.unwrap();
if let Some(config) = img_config.config() {
let env = config.env().as_ref().map_or_else(Vec::new, |v| v.clone());
let args = config.cmd().as_ref().map_or_else(Vec::new, |v| v.clone());
let ports = config
.exposed_ports()
.as_ref()
.map_or_else(Vec::new, |v| v.clone());
Ok(RunTimeConfig::new(env, args, ports))
} else {
Err("No config found".into())
}
}
fn check_namespace(&self, ns: &str) -> String { fn check_namespace(&self, ns: &str) -> String {
match ns { match ns {
"" => DEFAULT_NAMESPACE.to_string(), "" => DEFAULT_NAMESPACE.to_string(),
@ -744,31 +470,32 @@ impl Service {
// Ok(()) // Ok(())
// } // }
} }
//容器是容器,要先启动,然后才能运行任务
//要想删除一个正在运行的Task必须先kill掉这个task然后才能删除。
#[derive(Debug)]
pub struct RunTimeConfig {
pub env: Vec<String>,
pub args: Vec<String>,
pub ports: Vec<String>,
}
impl RunTimeConfig {
pub fn new(env: Vec<String>, args: Vec<String>, ports: Vec<String>) -> Self {
RunTimeConfig { env, args, ports }
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NetworkConfig { pub struct NetworkConfig {
pub netns: String, netns: String,
pub ip: String, ip: String,
pub ports: Vec<String>, ports: Vec<String>,
} }
impl NetworkConfig { impl NetworkConfig {
pub fn new(netns: String, ip: String, ports: Vec<String>) -> Self { pub fn new(netns: String, ip: String, ports: Vec<String>) -> Self {
NetworkConfig { netns, ip, ports } NetworkConfig { netns, ip, ports }
} }
pub fn get_netns(&self) -> String {
self.netns.clone()
}
pub fn get_ip(&self) -> String {
self.ip.clone()
}
pub fn get_address(&self) -> String {
format!(
"{}:{}",
self.ip.split('/').next().unwrap_or(""),
self.ports[0].split('/').next().unwrap_or("")
)
}
} }