This commit is contained in:
sparkzky 2025-01-25 16:51:20 +08:00
commit 02d9cc020b
9 changed files with 2509 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

2160
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

7
Cargo.toml Normal file
View File

@ -0,0 +1,7 @@
[workspace]
members = [ "app","service"]
# API 模块:基于 Actix Web 提供 RESTful 接口,调用 service 模块的功能。
# 测试:编写单元测试和集成测试,确保服务的可靠性。
# 镜像管理:实现缓存逻辑,并支持镜像解包和检查功能

11
app/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "app"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4.0"
tokio = { version = "1", features = ["full"] }
service = { path = "../service" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

32
app/src/handlers.rs Normal file
View File

@ -0,0 +1,32 @@
use crate::types::*;
use actix_web::{web, HttpResponse, Responder};
use service::Service;
use std::sync::Arc;
/// 创建并启动容器
pub async fn create_container(
service: web::Data<Arc<Service>>,
info: web::Json<CreateContainerInfo>,
) -> impl Responder {
let cid = info.container_id.clone();
let image = info.image.clone();
service.create_container(image, cid).await;
HttpResponse::Ok().json("Container created successfully!")
}
/// 删除容器
pub async fn remove_container(
service: web::Data<Arc<Service>>,
info: web::Json<RemoveContainerInfo>,
) -> impl Responder {
let container_id = info.container_id.clone();
service.remove_container(container_id).await;
HttpResponse::Ok().json("Container removed successfully!")
}
pub async fn get_container_list(service: web::Data<Arc<Service>>) -> impl Responder {
let container_list = service.get_container_list().await.unwrap();
HttpResponse::Ok().json(container_list)
}
// 添加更多的路由处理函数...

32
app/src/main.rs Normal file
View File

@ -0,0 +1,32 @@
use std::sync::Arc;
use actix_web::{web, App, HttpServer};
use service::Service;
pub mod handlers;
pub mod types;
use handlers::*;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let service = Arc::new(
Service::new("/run/containerd/containerd.sock".to_string())
.await
.unwrap(),
);
println!("I'm running!");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(service.clone()))
.route("/create_container", web::post().to(create_container))
.route("/remove_container", web::post().to(remove_container))
.route("/containers", web::get().to(get_container_list))
// 更多路由配置...
})
.bind("0.0.0.0:18080")?
.run()
.await
}

17
app/src/types.rs Normal file
View File

@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct CreateContainerInfo {
pub container_id: String,
pub image: String,
}
#[derive(Serialize, Deserialize)]
pub struct RemoveContainerInfo {
pub container_id: String,
}
#[derive(Deserialize)]
pub struct GetContainerListQuery {
pub status: Option<String>,
}

13
service/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "service"
version = "0.1.0"
edition = "2021"
[dependencies]
containerd-client = "0.6"
tokio = { version = "1", features = ["full"] }
tonic = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
log = "0.4"
env_logger = "0.10"

236
service/src/lib.rs Normal file
View File

@ -0,0 +1,236 @@
use containerd_client::{
services::v1::{
container::Runtime, Container, CreateContainerRequest, CreateTaskRequest,
DeleteContainerRequest, DeleteTaskRequest, KillRequest, ListContainersRequest,
ListTasksRequest, StartRequest, WaitRequest,
},
tonic::Request,
with_namespace, Client,
};
use std::{
fs::{self, File},
sync::{Arc, Mutex},
time::Duration,
};
use tokio::time::timeout;
// config.json,dockerhub密钥
const DOCKER_CONFIG_DIR: &str = "/var/lib/faasd/.docker/";
// 命名空间(容器的)
const NAMESPACE: &str = "default";
type Err = Box<dyn std::error::Error>;
pub struct Service {
client: Arc<Mutex<Client>>,
}
impl Service {
pub async fn new(endpoint: String) -> Result<Self, Err> {
let client = Client::from_path(endpoint).await.unwrap();
Ok(Service {
client: Arc::new(Mutex::new(client)),
})
}
pub async fn create_container(&self, image: String, cid: String) {
let mut containers_client = self.client.lock().unwrap().containers();
let container = Container {
id: cid.to_string(),
image,
runtime: Some(Runtime {
name: "io.containerd.runc.v2".to_string(),
options: None,
}),
spec: None,
..Default::default()
};
let req = CreateContainerRequest {
container: Some(container),
};
let req = with_namespace!(req, NAMESPACE);
let _resp = containers_client
.create(req)
.await
.expect("Failed to create container");
println!("Container: {:?} created", cid);
self.create_and_start_task(cid).await;
}
pub async fn remove_container(&self, container_id: String) {
let c = self.client.lock().unwrap();
let mut containers_client = c.containers();
let request = Request::new(ListContainersRequest {
..Default::default()
});
let responce = containers_client.list(request).await.unwrap().into_inner();
let container = responce
.containers
.iter()
.find(|container| container.id == container_id);
if let Some(container) = container {
let mut tasks_client = c.tasks();
let request = Request::new(ListTasksRequest {
filter: format!("container=={}", container_id),
..Default::default()
});
let responce = tasks_client.list(request).await.unwrap().into_inner();
drop(tasks_client);
if let Some(task) = responce
.tasks
.iter()
.find(|task| task.container_id == container.id)
{
println!("Task found: {}, Status: {}", task.id, task.status);
// TASK_UNKNOWN (0) — 未知状态
// TASK_CREATED (1) — 任务已创建
// TASK_RUNNING (2) — 任务正在运行
// TASK_STOPPED (3) — 任务已停止
// TASK_EXITED (4) — 任务已退出
// TASK_PAUSED (5) — 任务已暂停
// TASK_FAILED (6) — 任务失败
self.delete_task(&task.container_id).await;
}
let delete_request = DeleteContainerRequest {
id: container.id.clone(),
..Default::default()
};
let delete_request = with_namespace!(delete_request, NAMESPACE);
let _ = containers_client
.delete(delete_request)
.await
.expect("Failed to delete container");
println!("Container: {:?} deleted", containers_client);
} else {
todo!("Container not found");
}
drop(containers_client);
}
pub async fn create_and_start_task(&self, container_id: String) {
let tmp = std::env::temp_dir().join("containerd-client-test");
println!("Temp dir: {:?}", tmp);
fs::create_dir_all(&tmp).expect("Failed to create temp directory");
let stdin = tmp.join("stdin");
let stdout = tmp.join("stdout");
let stderr = tmp.join("stderr");
File::create(&stdin).expect("Failed to create stdin");
File::create(&stdout).expect("Failed to create stdout");
File::create(&stderr).expect("Failed to create stderr");
let mut tasks_client = self.client.lock().unwrap().tasks();
let req = CreateTaskRequest {
container_id: container_id.clone(),
stdin: stdin.to_str().unwrap().to_string(),
stdout: stdout.to_str().unwrap().to_string(),
stderr: stderr.to_str().unwrap().to_string(),
..Default::default()
};
let req = with_namespace!(req, NAMESPACE);
let _resp = tasks_client
.create(req)
.await
.expect("Failed to create task");
println!("Task: {:?} created", container_id);
let req = StartRequest {
container_id: container_id.to_string(),
..Default::default()
};
let req = with_namespace!(req, NAMESPACE);
let _resp = tasks_client.start(req).await.expect("Failed to start task");
println!("Task: {:?} started", container_id);
}
pub async fn delete_task(&self, container_id: &str) {
let time_out = Duration::from_secs(30);
let mut tc = self.client.lock().unwrap().tasks();
let wait_result = timeout(time_out, async {
let wait_request = Request::new(WaitRequest {
container_id: container_id.to_string(),
..Default::default()
});
let _ = tc.wait(wait_request).await?;
Ok::<(), Err>(())
})
.await;
let kill_request = Request::new(KillRequest {
container_id: container_id.to_string(),
signal: 15,
all: true,
..Default::default()
});
tc.kill(kill_request).await.expect("Failed to kill task");
match wait_result {
Ok(Ok(_)) => {
let req = DeleteTaskRequest {
container_id: container_id.to_string(),
};
let req = with_namespace!(req, NAMESPACE);
let _resp = tc.delete(req).await.expect("Failed to delete task");
println!("Task: {:?} deleted", container_id);
}
_ => {
let kill_request = Request::new(KillRequest {
container_id: container_id.to_string(),
signal: 9,
all: true,
..Default::default()
});
tc.kill(kill_request)
.await
.expect("Failed to FORCE kill task");
}
}
}
pub async fn get_container_list(&self) -> Result<Vec<String>, tonic::Status> {
let mut cc = self.client.lock().unwrap().containers();
let request = ListContainersRequest {
..Default::default()
};
let request = with_namespace!(request, NAMESPACE);
let response = cc.list(request).await?;
Ok(response
.into_inner()
.containers
.into_iter()
.map(|container| container.id)
.collect())
}
pub fn prepare_image(&self) {
todo!()
}
pub fn pull_image(&self) {
todo!()
}
/// 获取resolver验证用后面拉取镜像可能会用到
pub fn get_resolver(&self) {
todo!()
}
}