refactor: project structure and better interfaces (#100)

* refactor: basically rewrite all interface

* refactor: rename crates to make clear meaning; use tokio runtime and handle shutdown within Provider

* remove tracker in main

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* feat(provider): enhance Provider trait with list, update, and status methods; refactor existing methods to async

* fix(containerd): fetch handle from environment and initialize it.

* fix(init): BACKEND init add handle fetching

* fix: add test framework

* fix: move the snapshot logic into snapshot.rs

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* fix: change some spec setting

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* feat: add created_at field, add http status code convertion

* refactor(spec): use builder to generate spec

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* fix: clippy

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* manage reference, fix boot issue

* fix: ip parsing

* feat: add cleanup logic on fail

* fix style: clippy for return function

* feat: add response message

* fix:1.修复proxy和resolve的逻辑 2.spec内netns的路径问题以及传参顺序

* feat:add update list status  service implmentation

* fix: move some consts into consts.rs

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* fix: fmt & clippy

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* fix: update dependecy

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* feat: add function with_vm_network

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>

* feat: integrate cni into containerd crate

* fix:修复proxy的路径正则匹配并添加单元测试

* fix:fix proxy_path and add default namespace for Query::from

* fix: integration_test

* fix: path dispatch test

* fix: more specified url dispatch in proxy handle

* feat: add persistent container record for restart service

* feat: add task error type

* fix: delete error handle logic

---------

Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>
Co-authored-by: sparkzky <sparkhhhhhhhhhh@outlook.com>
Co-authored-by: dolzhuying <1240800466@qq.com>
Co-authored-by: scutKKsix <1129332011@qq.com>
This commit is contained in:
2025-05-22 21:43:16 +08:00
committed by GitHub
parent ed6741cd8a
commit 308e9bcc5d
75 changed files with 3451 additions and 3431 deletions

View File

@ -0,0 +1,36 @@
[package]
name = "faas-containerd"
version = "0.1.0"
edition = "2024"
[dependencies]
containerd-client = "0.8"
futures = "0.3"
tokio = { version = "1", features = ["full"] }
tonic = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
scopeguard = "1.2.0"
log = "0.4"
env_logger = "0.10"
prost-types = "0.13.4"
oci-spec = "0.6"
sha2 = "0.10"
hex = "0.4"
my-workspace-hack = { version = "0.1", path = "../my-workspace-hack" }
gateway = { path = "../gateway" }
handlebars = "4.1.0"
tokio-util = { version = "0.7.15", features = ["full"] }
container_image_dist_ref = "0.3.0"
url = "2.4"
chrono = { version = "0.4", features = ["serde"] }
dotenv = "0.15.0"
derive_more = { version = "2", features = ["full"] }
cidr = "0.3.1"
async-safe-defer = "0.1.2"
actix-http = "*"
netns-rs = "0.1.0"
sled = "0.34.7"
[dev-dependencies]
actix-web = "4.11.0"

View File

@ -0,0 +1,15 @@
#[allow(unused)]
pub const DEFAULT_FUNCTION_NAMESPACE: &str = "faasrs-default";
#[allow(unused)]
pub const DEFAULT_SNAPSHOTTER: &str = "overlayfs";
pub const DEFAULT_CTRD_SOCK: &str = "/run/containerd/containerd.sock";
pub const DEFAULT_FAASDRS_DATA_DIR: &str = "/var/lib/faasdrs";
// 定义版本的常量
pub const VERSION_MAJOR: u32 = 1;
pub const VERSION_MINOR: u32 = 1;
pub const VERSION_PATCH: u32 = 0;
pub const VERSION_DEV: &str = ""; // 对应开发分支

View File

@ -0,0 +1,148 @@
type Err = Box<dyn std::error::Error>;
use derive_more::{Display, Error};
use netns_rs::NetNs;
use scopeguard::{ScopeGuard, guard};
use serde_json::Value;
use std::{fmt::Error, net::IpAddr, path::Path, sync::LazyLock};
use super::{Endpoint, command as cmd, util};
static CNI_CONF_DIR: LazyLock<String> = LazyLock::new(|| {
std::env::var("CNI_CONF_DIR").unwrap_or_else(|_| "/etc/cni/net.d".to_string())
});
const CNI_DATA_DIR: &str = "/var/run/cni";
const DEFAULT_CNI_CONF_FILENAME: &str = "10-faasrs.conflist";
const DEFAULT_NETWORK_NAME: &str = "faasrs-cni-bridge";
const DEFAULT_BRIDGE_NAME: &str = "faasrs0";
const DEFAULT_SUBNET: &str = "10.66.0.0/16";
pub fn init_cni_network() -> Result<(), Err> {
util::init_net_fs(
Path::new(CNI_CONF_DIR.as_str()),
DEFAULT_CNI_CONF_FILENAME,
DEFAULT_NETWORK_NAME,
DEFAULT_BRIDGE_NAME,
DEFAULT_SUBNET,
CNI_DATA_DIR,
)
}
#[derive(Debug, Display, Error)]
pub struct NetworkError {
pub msg: String,
}
//TODO: 创建网络和删除网络的错误处理
pub fn create_cni_network(endpoint: &Endpoint) -> Result<(cidr::IpInet, NetNs), NetworkError> {
let net_ns = guard(
NetNs::new(endpoint.to_string()).map_err(|e| NetworkError {
msg: format!("Failed to create netns: {}", e),
})?,
|ns| ns.remove().unwrap(),
);
let output = cmd::cni_add_bridge(net_ns.path(), DEFAULT_NETWORK_NAME);
match output {
Ok(output) => {
if !output.status.success() {
return Err(NetworkError {
msg: format!(
"Failed to add CNI bridge: {}",
String::from_utf8_lossy(&output.stderr)
),
});
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut json: Value = match serde_json::from_str(&stdout) {
Ok(json) => json,
Err(e) => {
log::error!("Failed to parse JSON: {}", e);
return Err(NetworkError {
msg: format!("Failed to parse JSON: {}", e),
});
}
};
log::trace!("CNI add bridge output: {:?}", json);
if let serde_json::Value::Array(ips) = json["ips"].take() {
let mut ip_list = Vec::new();
for mut ip in ips {
if let serde_json::Value::String(ip_str) = ip["address"].take() {
let ipcidr = ip_str.parse::<cidr::IpInet>().map_err(|e| {
log::error!("Failed to parse IP address: {}", e);
NetworkError { msg: e.to_string() }
})?;
ip_list.push(ipcidr);
}
}
if ip_list.is_empty() {
return Err(NetworkError {
msg: "No IP address found in CNI output".to_string(),
});
}
if ip_list.len() > 1 {
log::warn!("Multiple IP addresses found in CNI output: {:?}", ip_list);
}
log::trace!("CNI network created with IP: {:?}", ip_list[0]);
Ok((ip_list[0], ScopeGuard::into_inner(net_ns)))
} else {
log::error!("Invalid JSON format: {:?}", json);
Err(NetworkError {
msg: "Invalid JSON format".to_string(),
})
}
}
Err(e) => {
log::error!("Failed to add CNI bridge: {}", e);
Err(NetworkError {
msg: format!("Failed to add CNI bridge: {}", e),
})
}
}
}
pub fn delete_cni_network(endpoint: Endpoint) -> Result<(), NetworkError> {
match NetNs::get(endpoint.to_string()) {
Ok(ns) => {
let e1 = cmd::cni_del_bridge(ns.path(), DEFAULT_NETWORK_NAME);
let e2 = ns.remove();
if e1.is_err() || e2.is_err() {
let err = format!(
"NetNS exists, but failed to delete CNI network, cni bridge: {:?}, netns: {:?}",
e1, e2
);
log::error!("{}", err);
return Err(NetworkError { msg: err });
}
Ok(())
}
Err(e) => {
let msg = format!("Failed to get netns {}: {}", endpoint, e);
log::warn!("{}", msg);
Err(NetworkError { msg })
}
}
}
#[inline]
pub fn check_network_exists(addr: IpAddr) -> bool {
util::CNI_CONFIG_FILE
.get()
.unwrap()
.data_dir
.join(addr.to_string())
.exists()
}
#[allow(unused)]
fn cni_gateway() -> Result<String, Err> {
let ip: IpAddr = DEFAULT_SUBNET.parse().unwrap();
if let IpAddr::V4(ip) = ip {
let octets = &mut ip.octets();
octets[3] = 1;
return Ok(ip.to_string());
}
Err(Box::new(Error))
}

View File

@ -0,0 +1,94 @@
use std::{
io::Error,
path::Path,
process::{Command, Output},
sync::LazyLock,
};
static CNI_BIN_DIR: LazyLock<String> =
LazyLock::new(|| std::env::var("CNI_BIN_DIR").unwrap_or_else(|_| "/opt/cni/bin".to_string()));
static CNI_TOOL: LazyLock<String> =
LazyLock::new(|| std::env::var("CNI_TOOL").unwrap_or_else(|_| "cni-tool".to_string()));
pub(super) fn cni_add_bridge(
netns_path: &Path,
bridge_network_name: &str,
) -> Result<Output, Error> {
Command::new(CNI_TOOL.as_str())
.arg("add")
.arg(bridge_network_name)
.arg(netns_path)
.env("CNI_PATH", CNI_BIN_DIR.as_str())
.output()
}
pub(super) fn cni_del_bridge(
netns_path: &Path,
bridge_network_name: &str,
) -> Result<Output, Error> {
Command::new(CNI_TOOL.as_str())
.arg("del")
.arg(bridge_network_name)
.arg(netns_path)
.env("CNI_PATH", CNI_BIN_DIR.as_str())
.output()
}
// /// THESE TESTS SHOULD BE RUN WITH ROOT PRIVILEGES
// #[cfg(test)]
// mod test {
// use crate::impls::cni::util;
// use std::path::Path;
// use super::*;
// const CNI_DATA_DIR: &str = "/var/run/cni";
// const TEST_CNI_CONF_FILENAME: &str = "11-faasrstest.conflist";
// const TEST_NETWORK_NAME: &str = "faasrstest-cni-bridge";
// const TEST_BRIDGE_NAME: &str = "faasrstest0";
// const TEST_SUBNET: &str = "10.99.0.0/16";
// const CNI_CONF_DIR: &str = "/etc/cni/net.d";
// fn init_test_net_fs() {
// util::init_net_fs(
// Path::new(CNI_CONF_DIR),
// TEST_CNI_CONF_FILENAME,
// TEST_NETWORK_NAME,
// TEST_BRIDGE_NAME,
// TEST_SUBNET,
// CNI_DATA_DIR,
// )
// .unwrap()
// }
// #[test]
// #[ignore]
// fn test_cni_resource() {
// dotenv::dotenv().unwrap();
// env_logger::init_from_env(env_logger::Env::new().default_filter_or("trace"));
// init_test_net_fs();
// let netns = util::netns_from_cid_and_cns("123456", "cns");
// let net_namespace = netns::create(&netns).unwrap();
// defer::defer!({
// let _ = netns::remove(&netns);
// });
// net_namespace.path()
// let result = cni_add_bridge(&netns, TEST_NETWORK_NAME);
// log::debug!("add CNI result: {:?}", result);
// assert!(
// result.is_ok_and(|output| output.status.success()),
// "Failed to add CNI"
// );
// defer::defer!({
// let result = cni_del_bridge(&netns, TEST_NETWORK_NAME);
// log::debug!("del CNI result: {:?}", result);
// assert!(
// result.is_ok_and(|output| output.status.success()),
// "Failed to delete CNI"
// );
// });
// }
// }

View File

@ -0,0 +1,55 @@
use crate::consts;
pub mod cni_impl;
mod command;
mod util;
pub use cni_impl::init_cni_network;
use gateway::types::function::Query;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct Endpoint {
pub service: String,
pub namespace: String,
}
impl Endpoint {
pub fn new(service: &str, namespace: &str) -> Self {
Self {
service: service.to_string(),
namespace: namespace.to_string(),
}
}
}
/// format `<namespace>-<service>` as netns name, also the identifier of each function
impl std::fmt::Display for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}", self.namespace, self.service)
}
}
impl From<Query> for Endpoint {
fn from(query: Query) -> Self {
Self {
service: query.service,
namespace: query
.namespace
.unwrap_or(consts::DEFAULT_FUNCTION_NAMESPACE.to_string()),
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_ip_parsing() {
let raw_ip = "10.42.0.48/16";
let ipcidr = raw_ip.parse::<cidr::IpInet>().unwrap();
assert_eq!(
ipcidr.address(),
std::net::IpAddr::V4(std::net::Ipv4Addr::new(10, 42, 0, 48))
);
}
}

View File

@ -0,0 +1,101 @@
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
pub static CNI_CONFIG_FILE: OnceLock<CniConfFile> = OnceLock::new();
// /// Generate "cns-cid"
// #[inline(always)]
// pub fn netns_name_from_cid_ns(cid: &str, cns: &str) -> String {
// format!("{}-{}", cns, cid)
// }
pub fn init_net_fs(
conf_dir: &Path,
conf_filename: &str,
net_name: &str,
bridge: &str,
subnet: &str,
data_dir: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let conf_file = CniConfFile::new(conf_dir, conf_filename, net_name, bridge, subnet, data_dir)?;
CNI_CONFIG_FILE
.set(conf_file)
.map_err(|_| "Failed to set CNI_CONFIG_FILE")?;
Ok(())
}
fn cni_conf(name: &str, bridge: &str, subnet: &str, data_dir: &str) -> String {
format!(
r#"
{{
"cniVersion": "0.4.0",
"name": "{}",
"plugins": [
{{
"type": "bridge",
"bridge": "{}",
"isGateway": true,
"ipMasq": true,
"ipam": {{
"type": "host-local",
"subnet": "{}",
"dataDir": "{}",
"routes": [
{{ "dst": "0.0.0.0/0" }}
]
}}
}},
{{
"type": "firewall"
}}
]
}}
"#,
name, bridge, subnet, data_dir
)
}
pub(super) struct CniConfFile {
pub conf_dir: PathBuf,
pub conf_filename: String,
pub data_dir: PathBuf,
}
impl CniConfFile {
fn new(
conf_dir: &Path,
conf_filename: &str,
net_name: &str,
bridge: &str,
subnet: &str,
data_dir: &str,
) -> Result<Self, Box<dyn std::error::Error>> {
if !conf_dir.exists() {
std::fs::create_dir_all(conf_dir)?;
}
if !conf_dir.is_dir() {
log::error!("CNI_CONF_DIR is not a directory");
panic!("CNI_CONF_DIR is not a directory");
}
let net_config = conf_dir.join(conf_filename);
File::create(&net_config)?
.write_all(cni_conf(net_name, bridge, subnet, data_dir).as_bytes())?;
let data_dir = PathBuf::from(data_dir);
Ok(Self {
conf_dir: conf_dir.to_path_buf(),
conf_filename: conf_filename.to_string(),
data_dir: data_dir.join(net_name),
})
}
}
impl Drop for CniConfFile {
fn drop(&mut self) {
let net_config = self.conf_dir.join(&self.conf_filename);
if net_config.exists() {
std::fs::remove_file(&net_config).unwrap();
}
}
}

View File

@ -0,0 +1,125 @@
use containerd_client::{
services::v1::{Container, DeleteContainerRequest, GetContainerRequest, ListContainersRequest},
with_namespace,
};
use derive_more::Display;
use containerd_client::services::v1::container::Runtime;
use super::{ContainerdService, backend, cni::Endpoint, function::ContainerStaticMetadata};
use tonic::Request;
#[derive(Debug, Display)]
pub enum ContainerError {
NotFound,
AlreadyExists,
Internal,
}
impl ContainerdService {
/// 创建容器
pub async fn create_container(
&self,
metadata: &ContainerStaticMetadata,
) -> Result<Container, ContainerError> {
let container = Container {
id: metadata.endpoint.service.clone(),
image: metadata.image.clone(),
runtime: Some(Runtime {
name: "io.containerd.runc.v2".to_string(),
options: None,
}),
spec: Some(backend().get_spec(metadata).await.map_err(|_| {
log::error!("Failed to get spec");
ContainerError::Internal
})?),
snapshotter: crate::consts::DEFAULT_SNAPSHOTTER.to_string(),
snapshot_key: metadata.endpoint.service.clone(),
..Default::default()
};
let mut cc = backend().client.containers();
let req = containerd_client::services::v1::CreateContainerRequest {
container: Some(container),
};
let resp = cc
.create(with_namespace!(req, metadata.endpoint.namespace))
.await
.map_err(|e| {
log::error!("Failed to create container: {}", e);
ContainerError::Internal
})?;
resp.into_inner().container.ok_or(ContainerError::Internal)
}
/// 删除容器
pub async fn delete_container(&self, endpoint: &Endpoint) -> Result<(), ContainerError> {
let Endpoint {
service: cid,
namespace: ns,
} = endpoint;
let mut cc = self.client.containers();
let delete_request = DeleteContainerRequest { id: cid.clone() };
cc.delete(with_namespace!(delete_request, ns))
.await
.map_err(|e| {
log::error!("Failed to delete container: {}", e);
ContainerError::Internal
})
.map(|_| ())
}
/// 根据查询条件加载容器参数
pub async fn load_container(&self, endpoint: &Endpoint) -> Result<Container, ContainerError> {
let mut cc = self.client.containers();
let request = GetContainerRequest {
id: endpoint.service.clone(),
};
let resp = cc
.get(with_namespace!(request, endpoint.namespace))
.await
.map_err(|e| {
log::error!("Failed to list containers: {}", e);
ContainerError::Internal
})?;
resp.into_inner().container.ok_or(ContainerError::NotFound)
}
/// 获取容器列表
pub async fn list_container(&self, namespace: &str) -> Result<Vec<Container>, ContainerError> {
let mut cc = self.client.containers();
let request = ListContainersRequest {
..Default::default()
};
let resp = cc
.list(with_namespace!(request, namespace))
.await
.map_err(|e| {
log::error!("Failed to list containers: {}", e);
ContainerError::Internal
})?;
Ok(resp.into_inner().containers)
}
/// 不儿,这也要单独一个函数?
#[deprecated]
pub async fn list_container_into_string(
&self,
ns: &str,
) -> Result<Vec<String>, ContainerError> {
self.list_container(ns)
.await
.map(|ctrs| ctrs.into_iter().map(|ctr| ctr.id).collect())
}
}

View File

@ -0,0 +1,17 @@
use derive_more::derive::Display;
#[derive(Debug, Display)]
pub enum ContainerdError {
CreateContainerError(String),
CreateSnapshotError(String),
GetParentSnapshotError(String),
GenerateSpecError(String),
DeleteContainerError(String),
GetContainerListError(String),
KillTaskError(String),
DeleteTaskError(String),
WaitTaskError(String),
CreateTaskError(String),
StartTaskError(String),
#[allow(dead_code)]
OtherError,
}

View File

@ -0,0 +1,97 @@
use gateway::types::function;
use crate::consts;
use super::cni::Endpoint;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct ContainerStaticMetadata {
pub image: String,
pub endpoint: Endpoint,
}
impl From<function::Deployment> for ContainerStaticMetadata {
fn from(info: function::Deployment) -> Self {
ContainerStaticMetadata {
image: info.image,
endpoint: Endpoint::new(
&info.service,
&info
.namespace
.unwrap_or(consts::DEFAULT_FUNCTION_NAMESPACE.to_string()),
),
}
}
}
// impl From<ContainerStaticMetadata> for function::Query {
// fn from(metadata: ContainerStaticMetadata) -> Self {
// function::Query {
// service: metadata.container_id,
// namespace: Some(metadata.namespace),
// }
// }
// }
// /// A function is a container instance with correct cni connected
// #[derive(Debug)]
// pub struct FunctionInstance {
// container: containerd_client::services::v1::Container,
// namespace: String,
// // ip addr inside cni
// // network: CNIEndpoint,
// // port: Vec<u16>, default use 8080
// // manager: Weak<crate::provider::ContainerdProvider>,
// }
// impl FunctionInstance {
// pub async fn new(metadata: ContainerStaticMetadata) -> Result<Self, ContainerdError> {
// Ok(Self {
// container,
// namespace: metadata.namespace,
// // network,
// })
// }
// pub async fn delete(&self) -> Result<(), ContainerdError> {
// let container_id = self.container.id.clone();
// let namespace = self.namespace.clone();
// let kill_err = backend()
// .kill_task_with_timeout(&container_id, &namespace)
// .await
// .map_err(|e| {
// log::error!("Failed to kill task: {:?}", e);
// e
// });
// let del_ctr_err = backend()
// .delete_container(&container_id, &namespace)
// .await
// .map_err(|e| {
// log::error!("Failed to delete container: {:?}", e);
// e
// });
// let rm_snap_err = backend()
// .remove_snapshot(&container_id, &namespace)
// .await
// .map_err(|e| {
// log::error!("Failed to remove snapshot: {:?}", e);
// e
// });
// if kill_err.is_ok() && del_ctr_err.is_ok() && rm_snap_err.is_ok() {
// Ok(())
// } else {
// Err(ContainerdError::DeleteContainerError(format!(
// "{:?}, {:?}, {:?}",
// kill_err, del_ctr_err, rm_snap_err
// )))
// }
// }
// pub fn address(&self) -> IpAddr {
// self.network.address()
// }
// }

View File

@ -0,0 +1,30 @@
pub mod cni;
pub mod container;
pub mod error;
pub mod function;
pub mod oci_image;
pub mod snapshot;
pub mod spec;
pub mod task;
use std::sync::OnceLock;
pub static __BACKEND: OnceLock<ContainerdService> = OnceLock::new();
pub(crate) fn backend() -> &'static ContainerdService {
__BACKEND.get().unwrap()
}
/// TODO: Panic on failure, should be handled in a better way
pub async fn init_backend() {
let socket =
std::env::var("SOCKET_PATH").unwrap_or(crate::consts::DEFAULT_CTRD_SOCK.to_string());
let client = containerd_client::Client::from_path(socket).await.unwrap();
__BACKEND.set(ContainerdService { client }).ok().unwrap();
cni::init_cni_network().unwrap();
}
pub struct ContainerdService {
pub client: containerd_client::Client,
}

View File

@ -0,0 +1,320 @@
use super::ContainerdService;
use container_image_dist_ref::ImgRef;
use containerd_client::{
services::v1::{GetImageRequest, ReadContentRequest, TransferOptions, TransferRequest},
to_any,
tonic::Request,
types::{
Platform,
transfer::{ImageStore, OciRegistry, UnpackConfiguration},
},
with_namespace,
};
use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest, MediaType, Os};
impl ContainerdService {
async fn get_image(&self, image_name: &str, ns: &str) -> Result<(), ImageError> {
let mut c = self.client.images();
let req = GetImageRequest {
name: image_name.to_string(),
};
let resp = match c.get(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ImageNotFound(format!(
"Failed to get image {}: {}",
image_name, e
)));
}
};
if resp.image.is_none() {
self.pull_image(image_name, ns).await?;
}
Ok(())
}
pub async fn pull_image(&self, image_name: &str, ns: &str) -> Result<(), ImageError> {
let ns = check_namespace(ns);
let namespace = ns.as_str();
let mut trans_cli = self.client.transfer();
let source = OciRegistry {
reference: image_name.to_string(),
resolver: Default::default(),
};
// 这里先写死linux amd64
let platform = Platform {
os: "linux".to_string(),
architecture: "amd64".to_string(),
..Default::default()
};
let dest = ImageStore {
name: image_name.to_string(),
platforms: vec![platform.clone()],
unpacks: vec![UnpackConfiguration {
platform: Some(platform),
..Default::default()
}],
..Default::default()
};
let anys = to_any(&source);
let anyd = to_any(&dest);
let req = TransferRequest {
source: Some(anys),
destination: Some(anyd),
options: Some(TransferOptions {
..Default::default()
}),
};
trans_cli
.transfer(with_namespace!(req, namespace))
.await
.map_err(|e| {
log::error!("Failed to pull image: {}", e);
ImageError::ImagePullFailed(format!("Failed to pull image {}: {}", image_name, e))
})
.map(|resp| {
log::trace!("Pull image response: {:?}", resp);
})
}
pub async fn prepare_image(
&self,
image_name: &str,
ns: &str,
always_pull: bool,
) -> Result<(), ImageError> {
let _ = ImgRef::new(image_name).map_err(|e| {
ImageError::ImageNotFound(format!("Invalid image name: {:?}", e.kind()))
})?;
if always_pull {
self.pull_image(image_name, ns).await
} else {
let namespace = check_namespace(ns);
let namespace = namespace.as_str();
self.get_image(image_name, namespace).await
}
}
pub async fn image_config(
&self,
img_name: &str,
ns: &str,
) -> Result<ImageConfiguration, ImageError> {
let mut img_cli = self.client.images();
let req = GetImageRequest {
name: img_name.to_string(),
};
let resp = match img_cli.get(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ImageNotFound(format!(
"Failed to get image {}: {}",
img_name, e
)));
}
};
let img_dscr = resp.image.unwrap().target.unwrap();
let media_type = MediaType::from(img_dscr.media_type.as_str());
let req = ReadContentRequest {
digest: img_dscr.digest,
..Default::default()
};
let mut content_cli = self.client.content();
let mut inner = match content_cli.read(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to read content of image {}: {}",
img_name, e
)));
}
};
let resp = match inner.message().await {
Ok(response) => response.unwrap().data,
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to get the inner content of image {}: {}",
img_name, e
)));
}
};
drop(content_cli);
match media_type {
MediaType::ImageIndex => self.handle_index(&resp, ns).await,
MediaType::ImageManifest => self.handle_manifest(&resp, ns).await,
MediaType::Other(val)
if val == "application/vnd.docker.distribution.manifest.list.v2+json" =>
{
self.handle_index(&resp, ns).await
}
MediaType::Other(val)
if val == "application/vnd.docker.distribution.manifest.v2+json" =>
{
self.handle_manifest(&resp, ns).await
}
_ => Err(ImageError::UnexpectedMediaType),
}
}
async fn handle_index(&self, data: &[u8], ns: &str) -> Result<ImageConfiguration, ImageError> {
let image_index: ImageIndex = ::serde_json::from_slice(data).map_err(|e| {
ImageError::DeserializationFailed(format!("Failed to parse JSON: {}", e))
})?;
let img_manifest_dscr = image_index
.manifests()
.iter()
.find(|manifest_entry| match manifest_entry.platform() {
Some(p) => {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
matches!(p.architecture(), &Arch::Amd64) && matches!(p.os(), &Os::Linux)
}
#[cfg(target_arch = "aarch64")]
{
matches!(p.architecture(), &Arch::ARM64) && matches!(p.os(), Os::Linux)
//&& matches!(p.variant().as_ref().map(|s| s.as_str()), Some("v8"))
}
}
None => false,
})
.unwrap();
let req = ReadContentRequest {
digest: img_manifest_dscr.digest().to_owned(),
offset: 0,
size: 0,
};
let mut c = self.client.content();
let mut inner = match c.read(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handler index : {}",
e
)));
}
};
let resp = match inner.message().await {
Ok(response) => response.unwrap().data,
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handle index inner : {}",
e
)));
}
};
drop(c);
self.handle_manifest(&resp, ns).await
}
async fn handle_manifest(
&self,
data: &[u8],
ns: &str,
) -> Result<ImageConfiguration, ImageError> {
let img_manifest: ImageManifest = match serde_json::from_slice(data) {
Ok(manifest) => manifest,
Err(e) => {
return Err(ImageError::DeserializationFailed(format!(
"Failed to deserialize image manifest: {}",
e
)));
}
};
let img_manifest_dscr = img_manifest.config();
let req = ReadContentRequest {
digest: img_manifest_dscr.digest().to_owned(),
offset: 0,
size: 0,
};
let mut c = self.client.content();
let mut inner = match c.read(with_namespace!(req, ns)).await {
Ok(response) => response.into_inner(),
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handler index : {}",
e
)));
}
};
let resp = match inner.message().await {
Ok(response) => response.unwrap().data,
Err(e) => {
return Err(ImageError::ReadContentFailed(format!(
"Failed to handle index inner : {}",
e
)));
}
};
serde_json::from_slice(&resp)
.map_err(|e| ImageError::DeserializationFailed(format!("Failed to parse JSON: {}", e)))
}
}
#[derive(Debug)]
pub enum ImageError {
ImageNotFound(String),
ImagePullFailed(String),
ImageConfigurationNotFound(String),
ReadContentFailed(String),
UnexpectedMediaType,
DeserializationFailed(String),
#[allow(dead_code)]
OtherError,
}
impl std::fmt::Display for ImageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ImageError::ImageNotFound(msg) => write!(f, "Image not found: {}", msg),
ImageError::ImagePullFailed(msg) => write!(f, "Image pull failed: {}", msg),
ImageError::ImageConfigurationNotFound(msg) => {
write!(f, "Image configuration not found: {}", msg)
}
ImageError::ReadContentFailed(msg) => write!(f, "Read content failed: {}", msg),
ImageError::UnexpectedMediaType => {
write!(f, "Unexpected media type")
}
ImageError::DeserializationFailed(msg) => {
write!(f, "Deserialization failed: {}", msg)
}
ImageError::OtherError => write!(f, "Other error happened"),
}
}
}
// 不用这个也能拉取镜像?
pub fn get_resolver() {
todo!()
}
fn check_namespace(ns: &str) -> String {
match ns {
"" => crate::consts::DEFAULT_FUNCTION_NAMESPACE.to_string(),
_ => ns.to_string(),
}
}

View File

@ -0,0 +1,131 @@
use containerd_client::{
services::v1::snapshots::{MountsRequest, PrepareSnapshotRequest, RemoveSnapshotRequest},
types::Mount,
with_namespace,
};
use tonic::Request;
use crate::impls::error::ContainerdError;
use super::{ContainerdService, cni::Endpoint, function::ContainerStaticMetadata};
impl ContainerdService {
#[allow(unused)]
pub(super) async fn get_mounts(
&self,
cid: &str,
ns: &str,
) -> Result<Vec<Mount>, ContainerdError> {
let mut sc = self.client.snapshots();
let req = MountsRequest {
snapshotter: crate::consts::DEFAULT_SNAPSHOTTER.to_string(),
key: cid.to_string(),
};
let mounts = sc
.mounts(with_namespace!(req, ns))
.await
.map_err(|e| {
log::error!("Failed to get mounts: {}", e);
ContainerdError::CreateTaskError(e.to_string())
})?
.into_inner()
.mounts;
Ok(mounts)
}
pub async fn prepare_snapshot(
&self,
container: &ContainerStaticMetadata,
) -> Result<Vec<Mount>, ContainerdError> {
let parent_snapshot = self
.get_parent_snapshot(&container.image, &container.endpoint.namespace)
.await?;
self.do_prepare_snapshot(
&container.endpoint.service,
&container.endpoint.namespace,
parent_snapshot,
)
.await
}
async fn do_prepare_snapshot(
&self,
cid: &str,
ns: &str,
parent_snapshot: String,
) -> Result<Vec<Mount>, ContainerdError> {
let req = PrepareSnapshotRequest {
snapshotter: crate::consts::DEFAULT_SNAPSHOTTER.to_string(),
key: cid.to_string(),
parent: parent_snapshot,
..Default::default()
};
let mut client = self.client.snapshots();
let resp = client
.prepare(with_namespace!(req, ns))
.await
.map_err(|e| {
log::error!("Failed to prepare snapshot: {}", e);
ContainerdError::CreateSnapshotError(e.to_string())
})?;
log::trace!("Prepare snapshot response: {:?}", resp);
Ok(resp.into_inner().mounts)
}
async fn get_parent_snapshot(
&self,
image_name: &str,
namespace: &str,
) -> Result<String, ContainerdError> {
use sha2::Digest;
let config = self
.image_config(image_name, namespace)
.await
.map_err(|e| {
log::error!("Failed to get image config: {}", e);
ContainerdError::GetParentSnapshotError(e.to_string())
})?;
if config.rootfs().diff_ids().is_empty() {
log::error!("Image config has no diff_ids for image: {}", image_name);
return Err(ContainerdError::GetParentSnapshotError(
"No diff_ids found in image config".to_string(),
));
}
let mut iter = config.rootfs().diff_ids().iter();
let mut ret = iter
.next()
.map_or_else(String::new, |layer_digest| layer_digest.clone());
for layer_digest in iter {
let mut hasher = sha2::Sha256::new();
hasher.update(ret.as_bytes());
ret.push_str(&format!(",{}", layer_digest));
hasher.update(" ");
hasher.update(layer_digest);
let digest = ::hex::encode(hasher.finalize());
ret = format!("sha256:{digest}");
}
Ok(ret)
}
pub async fn remove_snapshot(&self, endpoint: &Endpoint) -> Result<(), ContainerdError> {
let mut sc = self.client.snapshots();
let req = RemoveSnapshotRequest {
snapshotter: crate::consts::DEFAULT_SNAPSHOTTER.to_string(),
key: endpoint.service.clone(),
};
sc.remove(with_namespace!(req, endpoint.namespace))
.await
.map_err(|e| {
log::error!("Failed to delete snapshot: {}", e);
ContainerdError::DeleteContainerError(e.to_string())
})?;
Ok(())
}
}

View File

@ -0,0 +1,329 @@
use super::{
ContainerdService, cni::Endpoint, error::ContainerdError, function::ContainerStaticMetadata,
};
use crate::consts::{VERSION_DEV, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use oci_spec::{
image::ImageConfiguration,
runtime::{
Capability, LinuxBuilder, LinuxCapabilitiesBuilder, LinuxDeviceCgroupBuilder,
LinuxNamespaceBuilder, LinuxNamespaceType, LinuxResourcesBuilder, MountBuilder,
PosixRlimitBuilder, PosixRlimitType, ProcessBuilder, RootBuilder, Spec, SpecBuilder,
UserBuilder,
},
};
use std::path::Path;
fn oci_version() -> String {
format!(
"{}.{}.{}{}",
VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH, VERSION_DEV
)
}
pub(super) fn generate_default_unix_spec(
ns: &str,
cid: &str,
runtime_config: &RuntimeConfig,
) -> Result<oci_spec::runtime::Spec, ContainerdError> {
let caps = [
Capability::Chown,
Capability::DacOverride,
Capability::Fsetid,
Capability::Fowner,
Capability::Mknod,
Capability::NetRaw,
Capability::Setgid,
Capability::Setuid,
Capability::Setfcap,
Capability::Setpcap,
Capability::NetBindService,
Capability::SysChroot,
Capability::Kill,
Capability::AuditWrite,
];
let spec = SpecBuilder::default()
.version(oci_version())
.root(
RootBuilder::default()
.path("rootfs")
.readonly(true)
.build()
.unwrap(),
)
.process(
ProcessBuilder::default()
.cwd(runtime_config.cwd.clone())
.no_new_privileges(true)
.user(UserBuilder::default().uid(0u32).gid(0u32).build().unwrap())
.capabilities(
LinuxCapabilitiesBuilder::default()
.bounding(caps)
.permitted(caps)
.effective(caps)
.build()
.unwrap(),
)
.rlimits([PosixRlimitBuilder::default()
.typ(PosixRlimitType::RlimitNofile)
.hard(1024u64)
.soft(1024u64)
.build()
.unwrap()])
.args(runtime_config.args.clone())
.env(runtime_config.env.clone())
.build()
.unwrap(),
)
.linux(
LinuxBuilder::default()
.masked_paths([
"/proc/acpi".into(),
"/proc/asound".into(),
"/proc/kcore".into(),
"/proc/keys".into(),
"/proc/latency_stats".into(),
"/proc/timer_list".into(),
"/proc/timer_stats".into(),
"/proc/sched_debug".into(),
"/sys/firmware".into(),
"/proc/scsi".into(),
"/sys/devices/virtual/powercap".into(),
])
.readonly_paths([
"/proc/bus".into(),
"/proc/fs".into(),
"/proc/irq".into(),
"/proc/sys".into(),
"/proc/sysrq-trigger".into(),
])
.cgroups_path(Path::new("/").join(ns).join(cid))
.resources(
LinuxResourcesBuilder::default()
.devices([LinuxDeviceCgroupBuilder::default()
.allow(false)
.access("rwm")
.build()
.unwrap()])
.build()
.unwrap(),
)
.namespaces([
LinuxNamespaceBuilder::default()
.typ(LinuxNamespaceType::Pid)
.build()
.unwrap(),
LinuxNamespaceBuilder::default()
.typ(LinuxNamespaceType::Ipc)
.build()
.unwrap(),
LinuxNamespaceBuilder::default()
.typ(LinuxNamespaceType::Uts)
.build()
.unwrap(),
LinuxNamespaceBuilder::default()
.typ(LinuxNamespaceType::Mount)
.build()
.unwrap(),
LinuxNamespaceBuilder::default()
.typ(LinuxNamespaceType::Network)
.path(format!("/var/run/netns/{}", Endpoint::new(cid, ns)))
.build()
.unwrap(),
])
.build()
.unwrap(),
)
.mounts([
MountBuilder::default()
.destination("/proc")
.typ("proc")
.source("proc")
.options(["nosuid".into(), "noexec".into(), "nodev".into()])
.build()
.unwrap(),
MountBuilder::default()
.destination("/dev")
.typ("tmpfs")
.source("tmpfs")
.options([
"nosuid".into(),
"strictatime".into(),
"mode=755".into(),
"size=65536k".into(),
])
.build()
.unwrap(),
MountBuilder::default()
.destination("/dev/pts")
.typ("devpts")
.source("devpts")
.options([
"nosuid".into(),
"noexec".into(),
"newinstance".into(),
"ptmxmode=0666".into(),
"mode=0620".into(),
"gid=5".into(),
])
.build()
.unwrap(),
MountBuilder::default()
.destination("/dev/shm")
.typ("tmpfs")
.source("shm")
.options([
"nosuid".into(),
"noexec".into(),
"nodev".into(),
"mode=1777".into(),
"size=65536k".into(),
])
.build()
.unwrap(),
MountBuilder::default()
.destination("/dev/mqueue")
.typ("mqueue")
.source("mqueue")
.options(["nosuid".into(), "noexec".into(), "nodev".into()])
.build()
.unwrap(),
MountBuilder::default()
.destination("/sys")
.typ("sysfs")
.source("sysfs")
.options([
"nosuid".into(),
"noexec".into(),
"nodev".into(),
"ro".into(),
])
.build()
.unwrap(),
MountBuilder::default()
.destination("/run")
.typ("tmpfs")
.source("tmpfs")
.options([
"nosuid".into(),
"strictatime".into(),
"mode=755".into(),
"size=65536k".into(),
])
.build()
.unwrap(),
])
.build()
.map_err(|e| {
log::error!("Failed to generate spec: {}", e);
ContainerdError::GenerateSpecError(e.to_string())
})?;
Ok(spec)
}
#[allow(unused)]
pub(super) fn with_vm_network(spec: &mut Spec) -> Result<(), ContainerdError> {
let mounts = spec
.mounts()
.as_ref()
.expect("Spec's 'Mounts' field should not be None");
let mut new_mounts = mounts.clone();
new_mounts.extend([
MountBuilder::default()
.destination("/etc/resolv.conf")
.typ("bind")
.source("/etc/resolv.conf")
.options(["rbind".into(), "ro".into()])
.build()
.map_err(|e| {
log::error!("Failed to build OCI (resolv.conf) Mount: {}", e);
ContainerdError::GenerateSpecError(e.to_string())
})?,
MountBuilder::default()
.destination("/etc/hosts")
.typ("bind")
.source("/etc/hosts")
.options(["rbind".into(), "ro".into()])
.build()
.map_err(|e| {
log::error!("Failed to build OCI (hosts) Mount: {}", e);
ContainerdError::GenerateSpecError(e.to_string())
})?,
]);
let _ = spec.set_mounts(Some(new_mounts));
Ok(())
}
#[derive(Debug, Clone)]
pub struct RuntimeConfig {
pub env: Vec<String>,
pub args: Vec<String>,
pub ports: Vec<String>,
pub cwd: String,
}
impl TryFrom<ImageConfiguration> for RuntimeConfig {
type Error = ContainerdError;
fn try_from(value: ImageConfiguration) -> Result<Self, Self::Error> {
let conf_ref = value.config().as_ref();
let config = conf_ref.ok_or(ContainerdError::GenerateSpecError(
"Image configuration not found".to_string(),
))?;
let env = config.env().clone().ok_or_else(|| {
ContainerdError::GenerateSpecError("Environment variables not found".to_string())
})?;
let args = config.cmd().clone().ok_or_else(|| {
ContainerdError::GenerateSpecError("Command arguments not found".to_string())
})?;
let ports = config.exposed_ports().clone().unwrap_or_else(|| {
log::warn!("Exposed ports not found, using default port 8080/tcp");
vec!["8080/tcp".to_string()]
});
let cwd = config.working_dir().clone().unwrap_or_else(|| {
log::warn!("Working directory not found, using default /");
"/".to_string()
});
Ok(RuntimeConfig {
env,
args,
ports,
cwd,
})
}
}
impl ContainerdService {
pub async fn get_spec(
&self,
metadata: &ContainerStaticMetadata,
) -> Result<prost_types::Any, ContainerdError> {
let image_conf = self
.image_config(&metadata.image, &metadata.endpoint.namespace)
.await
.map_err(|e| {
log::error!("Failed to get image config: {}", e);
ContainerdError::GenerateSpecError(e.to_string())
})?;
let rt_conf = RuntimeConfig::try_from(image_conf)?;
let spec = generate_default_unix_spec(
&metadata.endpoint.namespace,
&metadata.endpoint.service,
&rt_conf,
)?;
let spec_json = serde_json::to_string(&spec).map_err(|e| {
log::error!("Failed to serialize spec to JSON: {}", e);
ContainerdError::GenerateSpecError(e.to_string())
})?;
let any_spec = prost_types::Any {
type_url: "types.containerd.io/opencontainers/runtime-spec/1/Spec".to_string(),
value: spec_json.into_bytes(),
};
Ok(any_spec)
}
}

View File

@ -0,0 +1,210 @@
use std::time::Duration;
use containerd_client::{
services::v1::{
CreateTaskRequest, DeleteTaskRequest, GetRequest, KillRequest, ListTasksRequest,
ListTasksResponse, StartRequest, WaitRequest, WaitResponse,
},
types::{Mount, v1::Process},
with_namespace,
};
use derive_more::Display;
use gateway::handlers::function::{DeleteError, DeployError};
use tonic::Request;
use super::{ContainerdService, cni::Endpoint};
#[derive(Debug, Clone, Hash, Eq, PartialEq, Display)]
pub enum TaskError {
NotFound,
AlreadyExists,
InvalidArgument,
// PermissionDenied,
Internal(String),
}
impl From<tonic::Status> for TaskError {
fn from(status: tonic::Status) -> Self {
use tonic::Code::*;
match status.code() {
NotFound => TaskError::NotFound,
AlreadyExists => TaskError::AlreadyExists,
InvalidArgument => TaskError::InvalidArgument,
// PermissionDenied => TaskError::PermissionDenied,
_ => TaskError::Internal(status.message().to_string()),
}
}
}
impl From<TaskError> for DeployError {
fn from(e: TaskError) -> DeployError {
match e {
TaskError::InvalidArgument => DeployError::Invalid(e.to_string()),
_ => DeployError::InternalError(e.to_string()),
}
}
}
impl From<TaskError> for DeleteError {
fn from(e: TaskError) -> DeleteError {
log::trace!("DeleteTaskError: {:?}", e);
match e {
TaskError::NotFound => DeleteError::NotFound(e.to_string()),
TaskError::InvalidArgument => DeleteError::Invalid(e.to_string()),
_ => DeleteError::Internal(e.to_string()),
}
}
}
impl ContainerdService {
/// 创建并启动任务
pub async fn new_task(&self, mounts: Vec<Mount>, endpoint: &Endpoint) -> Result<(), TaskError> {
let Endpoint {
service: cid,
namespace: ns,
} = endpoint;
// let mounts = self.get_mounts(cid, ns).await?;
self.do_create_task(cid, ns, mounts).await?;
self.do_start_task(cid, ns).await?;
Ok(())
}
async fn do_start_task(&self, cid: &str, ns: &str) -> Result<(), TaskError> {
let mut c: containerd_client::services::v1::tasks_client::TasksClient<
tonic::transport::Channel,
> = self.client.tasks();
let req = StartRequest {
container_id: cid.to_string(),
..Default::default()
};
let resp = c.start(with_namespace!(req, ns)).await?;
log::debug!("Task: {:?} started", cid);
log::trace!("Task start response: {:?}", resp);
Ok(())
}
async fn do_create_task(
&self,
cid: &str,
ns: &str,
rootfs: Vec<Mount>,
) -> Result<(), TaskError> {
let mut tc = self.client.tasks();
let create_request = CreateTaskRequest {
container_id: cid.to_string(),
rootfs,
..Default::default()
};
let _resp = tc.create(with_namespace!(create_request, ns)).await?;
Ok(())
}
pub async fn get_task(&self, endpoint: &Endpoint) -> Result<Process, TaskError> {
let Endpoint {
service: cid,
namespace: ns,
} = endpoint;
let mut tc = self.client.tasks();
let req = GetRequest {
container_id: cid.clone(),
..Default::default()
};
let resp = tc.get(with_namespace!(req, ns)).await?;
let task = resp.into_inner().process.ok_or(TaskError::NotFound)?;
Ok(task)
}
#[allow(dead_code)]
async fn list_task_by_cid(&self, cid: &str, ns: &str) -> Result<ListTasksResponse, TaskError> {
let mut c = self.client.tasks();
let request = ListTasksRequest {
filter: format!("container=={}", cid),
};
let response = c.list(with_namespace!(request, ns)).await?.into_inner();
Ok(response)
}
async fn do_kill_task(&self, cid: &str, ns: &str) -> Result<(), TaskError> {
let mut c = self.client.tasks();
let kill_request = KillRequest {
container_id: cid.to_string(),
signal: 15,
all: true,
..Default::default()
};
c.kill(with_namespace!(kill_request, ns)).await?;
Ok(())
}
async fn do_kill_task_force(&self, cid: &str, ns: &str) -> Result<(), TaskError> {
let mut c = self.client.tasks();
let kill_request = KillRequest {
container_id: cid.to_string(),
signal: 9,
all: true,
..Default::default()
};
c.kill(with_namespace!(kill_request, ns)).await?;
Ok(())
}
async fn do_delete_task(&self, cid: &str, ns: &str) -> Result<(), TaskError> {
let mut c = self.client.tasks();
let delete_request = DeleteTaskRequest {
container_id: cid.to_string(),
};
c.delete(with_namespace!(delete_request, ns)).await?;
Ok(())
}
async fn do_wait_task(&self, cid: &str, ns: &str) -> Result<WaitResponse, TaskError> {
let mut c = self.client.tasks();
let wait_request = WaitRequest {
container_id: cid.to_string(),
..Default::default()
};
let resp = c
.wait(with_namespace!(wait_request, ns))
.await?
.into_inner();
Ok(resp)
}
/// 杀死并删除任务
pub async fn kill_task_with_timeout(&self, endpoint: &Endpoint) -> Result<(), TaskError> {
let Endpoint {
service: cid,
namespace: ns,
} = endpoint;
let kill_timeout = Duration::from_secs(5);
let wait_future = self.do_wait_task(cid, ns);
self.do_kill_task(cid, ns).await?;
match tokio::time::timeout(kill_timeout, wait_future).await {
Ok(Ok(_)) => {
// 正常退出,尝试删除任务
self.do_delete_task(cid, ns).await?;
}
Ok(Err(e)) => {
// wait 报错
log::error!("Error while waiting for task {}: {:?}", cid, e);
return Err(e);
}
Err(_) => {
// 超时,强制 kill
log::warn!("Task {} did not exit in time, sending SIGKILL", cid);
self.do_kill_task_force(cid, ns).await?;
// 尝试删除任务
if let Err(e) = self.do_delete_task(cid, ns).await {
log::error!("Failed to delete task {} after SIGKILL: {:?}", cid, e);
}
}
}
Ok(())
}
}

View File

@ -0,0 +1,8 @@
#![feature(ip_from)]
#![feature(slice_as_array)]
pub mod consts;
pub mod impls;
pub mod provider;
pub mod systemd;
pub use impls::init_backend;

View File

@ -0,0 +1,36 @@
use faas_containerd::consts::DEFAULT_FAASDRS_DATA_DIR;
use tokio::signal::unix::{SignalKind, signal};
#[tokio::main]
async fn main() -> std::io::Result<()> {
dotenv::dotenv().ok();
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
faas_containerd::init_backend().await;
let provider = faas_containerd::provider::ContainerdProvider::new(DEFAULT_FAASDRS_DATA_DIR);
// leave for shutdown containers (stop tasks)
let _handle = provider.clone();
tokio::spawn(async move {
log::info!("Setting up signal handlers for graceful shutdown");
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigquit = signal(SignalKind::quit()).unwrap();
tokio::select! {
_ = sigint.recv() => log::info!("SIGINT received, starting graceful shutdown..."),
_ = sigterm.recv() => log::info!("SIGTERM received, starting graceful shutdown..."),
_ = sigquit.recv() => log::info!("SIGQUIT received, starting graceful shutdown..."),
}
// for (_q, ctr) in handle.ctr_instance_map.lock().await.drain() {
// let _ = ctr.delete().await;
// }
log::info!("Successfully shutdown all containers");
});
gateway::bootstrap::serve(provider)
.unwrap_or_else(|e| {
log::error!("Failed to start server: {}", e);
std::process::exit(1);
})
.await
}

View File

@ -0,0 +1,35 @@
use crate::impls::cni::Endpoint;
use crate::impls::{backend, cni};
use crate::provider::ContainerdProvider;
use gateway::handlers::function::DeleteError;
use gateway::types::function::Query;
impl ContainerdProvider {
pub(crate) async fn _delete(&self, function: Query) -> Result<(), DeleteError> {
let endpoint: Endpoint = function.into();
log::trace!("Deleting function: {:?}", endpoint);
backend().kill_task_with_timeout(&endpoint).await?;
let del_ctr_err = backend().delete_container(&endpoint).await.map_err(|e| {
log::error!("Failed to delete container: {:?}", e);
e
});
let rm_snap_err = backend().remove_snapshot(&endpoint).await.map_err(|e| {
log::error!("Failed to remove snapshot: {:?}", e);
e
});
let del_net_err = cni::cni_impl::delete_cni_network(endpoint);
if del_ctr_err.is_ok() && rm_snap_err.is_ok() && del_net_err.is_ok() {
Ok(())
} else {
Err(DeleteError::Internal(format!(
"{:?}, {:?}, {:?}",
del_ctr_err, rm_snap_err, del_net_err
)))
}
}
}

View File

@ -0,0 +1,96 @@
use crate::impls::cni;
use crate::impls::{self, backend, function::ContainerStaticMetadata};
use crate::provider::ContainerdProvider;
use gateway::handlers::function::DeployError;
use gateway::types::function::Deployment;
use scopeguard::{ScopeGuard, guard};
impl ContainerdProvider {
pub(crate) async fn _deploy(&self, config: Deployment) -> Result<(), DeployError> {
let metadata = ContainerStaticMetadata::from(config);
log::trace!("Deploying function: {:?}", metadata);
// not going to check the conflict of namespace, should be handled by containerd backend
backend()
.prepare_image(&metadata.image, &metadata.endpoint.namespace, true)
.await
.map_err(|img_err| {
use impls::oci_image::ImageError;
log::error!("Image '{}' fetch failed: {}", &metadata.image, img_err);
match img_err {
ImageError::ImageNotFound(e) => DeployError::Invalid(e.to_string()),
_ => DeployError::InternalError(img_err.to_string()),
}
})?;
log::trace!("Image '{}' fetch ok", &metadata.image);
let mounts = backend().prepare_snapshot(&metadata).await.map_err(|e| {
log::error!("Failed to prepare snapshot: {:?}", e);
DeployError::InternalError(e.to_string())
})?;
let snapshot_defer = scopeguard::guard((), |()| {
log::trace!("Cleaning up snapshot");
let endpoint = metadata.endpoint.clone();
tokio::spawn(async move { backend().remove_snapshot(&endpoint).await });
});
// let network = CNIEndpoint::new(&metadata.container_id, &metadata.namespace)?;
let (ip, netns) = cni::cni_impl::create_cni_network(&metadata.endpoint).map_err(|e| {
log::error!("Failed to create CNI network: {}", e);
DeployError::InternalError(e.to_string())
})?;
let netns_defer = guard(netns, |ns| ns.remove().unwrap());
let _ = backend().create_container(&metadata).await.map_err(|e| {
log::error!("Failed to create container: {:?}", e);
DeployError::InternalError(e.to_string())
})?;
let container_defer = scopeguard::guard((), |()| {
let endpoint = metadata.endpoint.clone();
tokio::spawn(async move { backend().delete_container(&endpoint).await });
});
// TODO: Use ostree-ext
// let img_conf = BACKEND.get().unwrap().get_runtime_config(&metadata.image).unwrap();
backend().new_task(mounts, &metadata.endpoint).await?;
let task_defer = scopeguard::guard((), |()| {
let endpoint = metadata.endpoint.clone();
tokio::spawn(async move { backend().kill_task_with_timeout(&endpoint).await });
});
use std::net::IpAddr::*;
match ip.address() {
V4(addr) => {
if let Err(err) = self
.database
.insert(metadata.endpoint.to_string(), &addr.octets())
{
log::error!("Failed to insert into database: {:?}", err);
return Err(DeployError::InternalError(err.to_string()));
}
}
V6(addr) => {
if let Err(err) = self
.database
.insert(metadata.endpoint.to_string(), &addr.octets())
{
log::error!("Failed to insert into database: {:?}", err);
return Err(DeployError::InternalError(err.to_string()));
}
}
}
log::info!("container was created successfully: {}", metadata.endpoint);
ScopeGuard::into_inner(snapshot_defer);
ScopeGuard::into_inner(netns_defer);
ScopeGuard::into_inner(container_defer);
ScopeGuard::into_inner(task_defer);
Ok(())
}
}

View File

@ -0,0 +1,19 @@
use crate::provider::ContainerdProvider;
pub enum GetError {
NotFound,
InternalError,
}
impl ContainerdProvider {
// pub async fn getfn(
// &self,
// query: function::Query,
// ) -> Option<FunctionInstance> {
// let instance = self.ctr_instance_map
// .lock()
// .await
// .get(&query)
// .cloned();
// }
}

View File

@ -0,0 +1,69 @@
use gateway::{handlers::function::ListError, types::function::Status};
use crate::{
impls::{backend, cni::Endpoint, task::TaskError},
provider::ContainerdProvider,
};
impl ContainerdProvider {
pub(crate) async fn _list(&self, namespace: String) -> Result<Vec<Status>, ListError> {
let containers = backend().list_container(&namespace).await.map_err(|e| {
log::error!(
"failed to get container list for namespace {} because {:?}",
namespace,
e
);
ListError::Internal(e.to_string())
})?;
let mut statuses: Vec<Status> = Vec::new();
for container in containers {
let endpoint = Endpoint {
service: container.id.clone(),
namespace: namespace.clone(),
};
let created_at = container.created_at.unwrap().to_string();
let mut replicas = 0;
match backend().get_task(&endpoint).await {
Ok(task) => {
let status = task.status;
if status == 2 || status == 3 {
replicas = 1;
}
}
Err(TaskError::NotFound) => continue,
Err(e) => {
log::warn!(
"failed to get task for function {:?} because {:?}",
&endpoint,
e
);
}
}
// 大部分字段并未实现使用None填充
let status = Status {
name: endpoint.service,
namespace: Some(endpoint.namespace),
image: container.image,
env_process: None,
env_vars: None,
constraints: None,
secrets: None,
labels: None,
annotations: None,
limits: None,
requests: None,
read_only_root_filesystem: false,
invocation_count: None,
replicas: Some(replicas),
available_replicas: Some(replicas),
created_at: Some(created_at),
usage: None,
};
statuses.push(status);
}
Ok(statuses)
}
}

View File

@ -0,0 +1,6 @@
pub mod delete;
pub mod deploy;
pub mod list;
pub mod resolve;
pub mod status;
pub mod update;

View File

@ -0,0 +1,65 @@
use std::net::{IpAddr, Ipv4Addr};
use actix_http::uri::Builder;
use gateway::handlers::function::ResolveError;
use gateway::types::function::Query;
use crate::impls::cni::{self, Endpoint};
use crate::provider::ContainerdProvider;
fn upstream(addr: IpAddr) -> Builder {
actix_http::Uri::builder()
.scheme("http")
.authority(format!("{}:{}", addr, 8080))
}
impl ContainerdProvider {
pub(crate) async fn _resolve(
&self,
query: Query,
) -> Result<actix_http::uri::Builder, ResolveError> {
let endpoint = Endpoint::from(query);
log::trace!("Resolving function: {:?}", endpoint);
let addr_oct = self
.database
.get(endpoint.to_string())
.map_err(|e| {
log::error!("Failed to get container address: {:?}", e);
ResolveError::Internal(e.to_string())
})?
.ok_or(ResolveError::NotFound("container not found".to_string()))?;
log::trace!("Container address: {:?}", addr_oct.as_array::<4>());
// We force the address to be IPv4 here
let addr = IpAddr::V4(Ipv4Addr::from_octets(*addr_oct.as_array::<4>().unwrap()));
// Check if the coresponding netns is still alive
// We can achieve this by checking the /run/cni/faasrs-cni-bridge,
// if the ip filename is still there
if cni::cni_impl::check_network_exists(addr) {
log::trace!("CNI network exists for {}", addr);
Ok(upstream(addr))
} else {
log::error!("CNI network not exists for {}", addr);
let _ = self.database.remove(endpoint.to_string());
Err(ResolveError::Internal("CNI network not exists".to_string()))
}
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn test_uri() {
let addr = IpAddr::V4(Ipv4Addr::new(10, 42, 2, 48));
let uri = super::upstream(addr).path_and_query("").build().unwrap();
assert_eq!(uri.scheme_str(), Some("http"));
assert_eq!(uri.authority().unwrap().host(), addr.to_string());
assert_eq!(uri.authority().unwrap().port_u16(), Some(8080));
assert_eq!(uri.to_string(), format!("http://{}:8080/", addr));
}
}

View File

@ -0,0 +1,69 @@
use gateway::{
handlers::function::ResolveError,
types::function::{Query, Status},
};
use crate::{
impls::{backend, cni::Endpoint, container::ContainerError},
provider::ContainerdProvider,
};
impl ContainerdProvider {
pub(crate) async fn _status(&self, function: Query) -> Result<Status, ResolveError> {
let endpoint: Endpoint = function.into();
let container = backend().load_container(&endpoint).await.map_err(|e| {
log::error!(
"failed to load container for function {:?} because {:?}",
endpoint,
e
);
match e {
ContainerError::NotFound => ResolveError::NotFound(e.to_string()),
ContainerError::Internal => ResolveError::Internal(e.to_string()),
_ => ResolveError::Invalid(e.to_string()),
}
})?;
let created_at = container.created_at.unwrap().to_string();
let mut replicas = 0;
match backend().get_task(&endpoint).await {
Ok(task) => {
let status = task.status;
if status == 2 || status == 3 {
replicas = 1;
}
}
Err(e) => {
log::warn!(
"failed to get task for function {:?} because {:?}",
&endpoint,
e
);
}
}
// 大部分字段并未实现使用None填充
let status = Status {
name: container.id,
namespace: Some(endpoint.namespace),
image: container.image,
env_process: None,
env_vars: None,
constraints: None,
secrets: None,
labels: None,
annotations: None,
limits: None,
requests: None,
read_only_root_filesystem: false,
invocation_count: None,
replicas: Some(replicas),
available_replicas: Some(replicas),
created_at: Some(created_at),
usage: None,
};
Ok(status)
}
}

View File

@ -0,0 +1,32 @@
use gateway::{
handlers::function::{DeleteError, DeployError, UpdateError},
types::function::{Deployment, Query},
};
use crate::provider::ContainerdProvider;
impl ContainerdProvider {
pub(crate) async fn _update(&self, param: Deployment) -> Result<(), UpdateError> {
let function = Query {
service: param.service.clone(),
namespace: param.namespace.clone(),
};
self._delete(function).await.map_err(|e| {
log::error!("failed to delete function when update because {:?}", e);
match e {
DeleteError::NotFound(e) => UpdateError::NotFound(e.to_string()),
DeleteError::Internal(e) => UpdateError::Internal(e.to_string()),
_ => UpdateError::Internal(e.to_string()),
}
})?;
self._deploy(param).await.map_err(|e| {
log::error!("failed to deploy function when update because {:?}", e);
match e {
DeployError::Invalid(e) => UpdateError::Invalid(e.to_string()),
DeployError::InternalError(e) => UpdateError::Internal(e.to_string()),
}
})?;
Ok(())
}
}

View File

@ -0,0 +1,49 @@
pub mod function;
use std::{path::Path, sync::Arc};
use gateway::{
handlers::function::{DeleteError, DeployError, ListError, ResolveError, UpdateError},
provider::Provider,
types::function::{Deployment, Query, Status},
};
pub struct ContainerdProvider {
// pub ctr_instance_map: tokio::sync::Mutex<HashMap<Query, FunctionInstance>>,
database: sled::Db,
}
impl ContainerdProvider {
pub fn new<P: AsRef<Path>>(path: P) -> Arc<Self> {
Arc::new(ContainerdProvider {
// ctr_instance_map: tokio::sync::Mutex::new(HashMap::new()),
database: sled::open(path).unwrap(),
})
}
}
impl Provider for ContainerdProvider {
async fn resolve(&self, function: Query) -> Result<actix_http::uri::Builder, ResolveError> {
self._resolve(function).await
}
async fn deploy(&self, param: Deployment) -> Result<(), DeployError> {
self._deploy(param).await
}
async fn delete(&self, function: Query) -> Result<(), DeleteError> {
self._delete(function).await
}
async fn list(&self, namespace: String) -> Result<Vec<Status>, ListError> {
self._list(namespace).await
}
async fn update(&self, param: Deployment) -> Result<(), UpdateError> {
self._update(param).await
}
async fn status(&self, function: Query) -> Result<Status, ResolveError> {
self._status(function).await
}
}

View File

@ -0,0 +1,71 @@
type Err = Box<dyn std::error::Error + Send + Sync>;
use handlebars::Handlebars;
use std::{collections::HashMap, fs::File, io::Write, path::Path};
pub struct Systemd;
impl Systemd {
pub fn enable(unit: String) -> Result<(), Err> {
let output = std::process::Command::new("systemctl")
.arg("enable")
.arg(&unit)
.output()?;
if !output.status.success() {
return Err(Box::new(std::io::Error::other(format!(
"Failed to enable unit {}: {}",
unit,
String::from_utf8_lossy(&output.stderr)
))));
}
Ok(())
}
pub fn start(unit: String) -> Result<(), Err> {
let output = std::process::Command::new("systemctl")
.arg("start")
.arg(&unit)
.output()?;
if !output.status.success() {
return Err(Box::new(std::io::Error::other(format!(
"Failed to start unit {}: {}",
unit,
String::from_utf8_lossy(&output.stderr)
))));
}
Ok(())
}
pub fn daemon_reload() -> Result<(), Err> {
let output = std::process::Command::new("systemctl")
.arg("daemon-reload")
.output()?;
if !output.status.success() {
return Err(Box::new(std::io::Error::other(format!(
"Failed to reload systemd daemon: {}",
String::from_utf8_lossy(&output.stderr)
))));
}
Ok(())
}
pub fn install_unit(name: String, tokens: HashMap<String, String>) -> Result<(), Err> {
if tokens.get("Cwd").is_none_or(|v| v.is_empty()) {
return Err("key Cwd expected in tokens parameter".into());
}
let tmpl_name = format!("./hack/{}.service", name);
let mut handlebars = Handlebars::new();
handlebars.register_template_file("template", &tmpl_name)?;
let rendered = handlebars.render("template", &tokens)?;
Self::write_unit(&format!("{}.service", name), rendered.as_bytes())?;
Ok(())
}
pub fn write_unit(name: &str, content: &[u8]) -> Result<(), Err> {
let path = Path::new("/lib/systemd/system").join(name);
let mut file = File::create(path)?;
file.write_all(content)?;
Ok(())
}
}

View File

@ -0,0 +1,150 @@
use actix_web::App;
use actix_web::http::StatusCode;
use actix_web::test;
use faas_containerd::consts::DEFAULT_FAASDRS_DATA_DIR;
use gateway::bootstrap::config_app;
use serde_json::json;
#[actix_web::test]
#[ignore]
async fn test_handlers_in_order() {
dotenv::dotenv().ok();
faas_containerd::init_backend().await;
let provider = faas_containerd::provider::ContainerdProvider::new(DEFAULT_FAASDRS_DATA_DIR);
let app = test::init_service(App::new().configure(config_app(provider))).await;
// test proxy no-found-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::get()
.uri("/function/test-no-found-function")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
assert!(response_str.contains("Invalid function name"));
// test update no-found-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::put()
.uri("/system/functions")
.set_json(json!({
"service": "test-no-found-function",
"image": "hub.scutosc.cn/dolzhuying/echo:latest",
"namespace": "faasrs-test-namespace"
}))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
assert!(response_str.contains("NotFound: container not found"));
// test delete no-found-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::delete()
.uri("/system/functions")
.set_json(json!({
"functionName": "test-no-found-function",
"namespace": "faasrs-test-namespace"
}))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
// test deploy test-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::post()
.uri("/system/functions")
.set_json(json!({
"service": "test-function",
"image": "hub.scutosc.cn/dolzhuying/echo:latest",
"namespace": "faasrs-test-namespace"
}))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(
resp.status(),
StatusCode::ACCEPTED,
"error: {:?}",
resp.response()
);
// test update test-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::put()
.uri("/system/functions")
.set_json(json!({
"service": "test-function",
"image": "hub.scutosc.cn/dolzhuying/echo:latest",
"namespace": "faasrs-test-namespace"
}))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::ACCEPTED);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
assert!(response_str.contains("function test-function was updated successfully"));
// test list
let req = test::TestRequest::get()
.uri("/system/functions?namespace=faasrs-test-namespace")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
let response_json: serde_json::Value = serde_json::from_str(response_str).unwrap();
if let Some(arr) = response_json.as_array() {
for item in arr {
assert_eq!(
item["name"],
serde_json::Value::String("test-function".to_string())
);
assert_eq!(
item["image"],
serde_json::Value::String("hub.scutosc.cn/dolzhuying/echo:latest".to_string())
);
assert_eq!(
item["namespace"],
serde_json::Value::String("faasrs-test-namespace".to_string())
);
}
}
// test status test-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::get()
.uri("/system/function/test-function?namespace=faasrs-test-namespace")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
let response_json: serde_json::Value = serde_json::from_str(response_str).unwrap();
if let Some(arr) = response_json.as_array() {
for item in arr {
assert_eq!(item["name"], "test-function");
assert_eq!(item["image"], "hub.scutosc.cn/dolzhuying/echo:latest");
assert_eq!(item["namespace"], "faasrs-test-namespace");
}
}
// test proxy test-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::get()
.uri("/function/test-function.faasrs-test-namespace")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
assert!(response_str.contains("Hello world!"));
// test delete test-function in namespace 'faasrs-test-namespace'
let req = test::TestRequest::delete()
.uri("/system/functions")
.set_json(json!({
"functionName": "test-function",
"namespace": "faasrs-test-namespace"
}))
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let response_body = test::read_body(resp).await;
let response_str = std::str::from_utf8(&response_body).unwrap();
assert!(response_str.contains("function test-function was deleted successfully"));
}