Remove the shim kernel crate

This commit is contained in:
Zhang Junyang
2024-08-19 19:15:22 +08:00
committed by Tate, Hongliang Tian
parent d76c7a5b1e
commit dafd16075f
416 changed files with 231 additions and 273 deletions

View File

@ -0,0 +1,131 @@
// SPDX-License-Identifier: MPL-2.0
#![allow(unused_variables)]
use ostd::{cpu::*, mm::VmSpace};
use crate::{
prelude::*, process::signal::signals::fault::FaultSignal,
vm::page_fault_handler::PageFaultHandler,
};
/// We can't handle most exceptions, just send self a fault signal before return to user space.
pub fn handle_exception(ctx: &Context, context: &UserContext) {
let trap_info = context.trap_information();
let exception = CpuException::to_cpu_exception(trap_info.id as u16).unwrap();
log_trap_info(exception, trap_info);
let root_vmar = ctx.process.root_vmar();
match *exception {
PAGE_FAULT => {
if handle_page_fault(root_vmar.vm_space(), trap_info).is_err() {
generate_fault_signal(trap_info);
}
}
_ => {
// We current do nothing about other exceptions
generate_fault_signal(trap_info);
}
}
}
/// Handles the page fault occurs in the input `VmSpace`.
pub(crate) fn handle_page_fault(
vm_space: &VmSpace,
trap_info: &CpuExceptionInfo,
) -> core::result::Result<(), ()> {
const PAGE_NOT_PRESENT_ERROR_MASK: usize = 0x1 << 0;
const WRITE_ACCESS_MASK: usize = 0x1 << 1;
let page_fault_addr = trap_info.page_fault_addr as Vaddr;
trace!(
"page fault error code: 0x{:x}, Page fault address: 0x{:x}",
trap_info.error_code,
page_fault_addr
);
let not_present = trap_info.error_code & PAGE_NOT_PRESENT_ERROR_MASK == 0;
let write = trap_info.error_code & WRITE_ACCESS_MASK != 0;
if not_present || write {
// If page is not present or due to write access, we should ask the vmar try to commit this page
let current = current!();
let root_vmar = current.root_vmar();
debug_assert_eq!(
Arc::as_ptr(root_vmar.vm_space()),
vm_space as *const VmSpace
);
if let Err(e) = root_vmar.handle_page_fault(page_fault_addr, not_present, write) {
warn!(
"page fault handler failed: addr: 0x{:x}, err: {:?}",
page_fault_addr, e
);
return Err(());
}
Ok(())
} else {
// Otherwise, the page fault cannot be handled
Err(())
}
}
/// generate a fault signal for current process.
fn generate_fault_signal(trap_info: &CpuExceptionInfo) {
let current = current!();
let signal = FaultSignal::new(trap_info);
current.enqueue_signal(signal);
}
macro_rules! log_trap_common {
($exception_name: ident, $trap_info: ident) => {
trace!(
"[Trap][{}][err = {}]",
stringify!($exception_name),
$trap_info.error_code
)
};
}
fn log_trap_info(exception: &CpuException, trap_info: &CpuExceptionInfo) {
match *exception {
DIVIDE_BY_ZERO => log_trap_common!(DIVIDE_BY_ZERO, trap_info),
DEBUG => log_trap_common!(DEBUG, trap_info),
NON_MASKABLE_INTERRUPT => log_trap_common!(NON_MASKABLE_INTERRUPT, trap_info),
BREAKPOINT => log_trap_common!(BREAKPOINT, trap_info),
OVERFLOW => log_trap_common!(OVERFLOW, trap_info),
BOUND_RANGE_EXCEEDED => log_trap_common!(BOUND_RANGE_EXCEEDED, trap_info),
INVALID_OPCODE => log_trap_common!(INVALID_OPCODE, trap_info),
DEVICE_NOT_AVAILABLE => log_trap_common!(DEVICE_NOT_AVAILABLE, trap_info),
DOUBLE_FAULT => log_trap_common!(DOUBLE_FAULT, trap_info),
COPROCESSOR_SEGMENT_OVERRUN => log_trap_common!(COPROCESSOR_SEGMENT_OVERRUN, trap_info),
INVAILD_TSS => log_trap_common!(INVAILD_TSS, trap_info),
SEGMENT_NOT_PRESENT => log_trap_common!(SEGMENT_NOT_PRESENT, trap_info),
STACK_SEGMENT_FAULT => log_trap_common!(STACK_SEGMENT_FAULT, trap_info),
GENERAL_PROTECTION_FAULT => log_trap_common!(GENERAL_PROTECTION_FAULT, trap_info),
PAGE_FAULT => {
trace!(
"[Trap][{}][page fault addr = 0x{:x}, err = {}]",
stringify!(PAGE_FAULT),
trap_info.page_fault_addr,
trap_info.error_code
);
}
// 15 reserved
X87_FLOATING_POINT_EXCEPTION => log_trap_common!(X87_FLOATING_POINT_EXCEPTION, trap_info),
ALIGNMENT_CHECK => log_trap_common!(ALIGNMENT_CHECK, trap_info),
MACHINE_CHECK => log_trap_common!(MACHINE_CHECK, trap_info),
SIMD_FLOATING_POINT_EXCEPTION => log_trap_common!(SIMD_FLOATING_POINT_EXCEPTION, trap_info),
VIRTUALIZATION_EXCEPTION => log_trap_common!(VIRTUALIZATION_EXCEPTION, trap_info),
CONTROL_PROTECTION_EXCEPTION => log_trap_common!(CONTROL_PROTECTION_EXCEPTION, trap_info),
HYPERVISOR_INJECTION_EXCEPTION => {
log_trap_common!(HYPERVISOR_INJECTION_EXCEPTION, trap_info)
}
VMM_COMMUNICATION_EXCEPTION => log_trap_common!(VMM_COMMUNICATION_EXCEPTION, trap_info),
SECURITY_EXCEPTION => log_trap_common!(SECURITY_EXCEPTION, trap_info),
_ => {
info!(
"[Trap][Unknown trap type][id = {}, err = {}]",
trap_info.id, trap_info.error_code
);
}
}
}

View File

@ -0,0 +1,111 @@
// SPDX-License-Identifier: MPL-2.0
use ostd::{
cpu::CpuSet,
task::{Priority, TaskOptions},
};
use super::{allocate_tid, status::ThreadStatus, thread_table, Thread};
use crate::prelude::*;
/// The inner data of a kernel thread
pub struct KernelThread;
pub trait KernelThreadExt {
/// get the kernel_thread structure
fn as_kernel_thread(&self) -> Option<&KernelThread>;
/// create a new kernel thread structure, **NOT** run the thread.
fn new_kernel_thread(thread_options: ThreadOptions) -> Arc<Thread>;
/// create a new kernel thread structure, and then run the thread.
fn spawn_kernel_thread(thread_options: ThreadOptions) -> Arc<Thread> {
let thread = Self::new_kernel_thread(thread_options);
thread.run();
thread
}
/// join a kernel thread, returns if the kernel thread exit
fn join(&self);
}
impl KernelThreadExt for Thread {
fn as_kernel_thread(&self) -> Option<&KernelThread> {
self.data().downcast_ref::<KernelThread>()
}
fn new_kernel_thread(mut thread_options: ThreadOptions) -> Arc<Self> {
let task_fn = thread_options.take_func();
let thread_fn = move || {
task_fn();
let current_thread = current_thread!();
// ensure the thread is exit
current_thread.exit();
};
let tid = allocate_tid();
let thread = Arc::new_cyclic(|thread_ref| {
let weal_thread = thread_ref.clone();
let task = TaskOptions::new(thread_fn)
.data(weal_thread)
.priority(thread_options.priority)
.cpu_affinity(thread_options.cpu_affinity)
.build()
.unwrap();
let status = ThreadStatus::Init;
let kernel_thread = KernelThread;
Thread::new(tid, task, kernel_thread, status)
});
thread_table::add_thread(thread.clone());
thread
}
fn join(&self) {
loop {
if self.status().is_exited() {
return;
} else {
Thread::yield_now();
}
}
}
}
/// Options to create or spawn a new thread.
pub struct ThreadOptions {
func: Option<Box<dyn Fn() + Send + Sync>>,
priority: Priority,
cpu_affinity: CpuSet,
}
impl ThreadOptions {
pub fn new<F>(func: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
let cpu_affinity = CpuSet::new_full();
Self {
func: Some(Box::new(func)),
priority: Priority::normal(),
cpu_affinity,
}
}
pub fn func<F>(mut self, func: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.func = Some(Box::new(func));
self
}
fn take_func(&mut self) -> Box<dyn Fn() + Send + Sync> {
self.func.take().unwrap()
}
pub fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}
pub fn cpu_affinity(mut self, cpu_affinity: CpuSet) -> Self {
self.cpu_affinity = cpu_affinity;
self
}
}

113
kernel/src/thread/mod.rs Normal file
View File

@ -0,0 +1,113 @@
// SPDX-License-Identifier: MPL-2.0
//! Posix thread implementation
use core::sync::atomic::{AtomicU32, Ordering};
use ostd::task::Task;
use self::status::{AtomicThreadStatus, ThreadStatus};
use crate::prelude::*;
pub mod exception;
pub mod kernel_thread;
pub mod status;
pub mod task;
pub mod thread_table;
pub mod work_queue;
pub type Tid = u32;
static TID_ALLOCATOR: AtomicU32 = AtomicU32::new(0);
/// A thread is a wrapper on top of task.
pub struct Thread {
// immutable part
/// Thread id
tid: Tid,
/// Low-level info
task: Arc<Task>,
/// Data: Posix thread info/Kernel thread Info
data: Box<dyn Send + Sync + Any>,
// mutable part
status: AtomicThreadStatus,
}
impl Thread {
/// Never call these function directly
pub fn new(
tid: Tid,
task: Arc<Task>,
data: impl Send + Sync + Any,
status: ThreadStatus,
) -> Self {
Thread {
tid,
task,
data: Box::new(data),
status: AtomicThreadStatus::new(status),
}
}
/// Returns the current thread.
///
/// This function returns `None` if the current task is not associated with
/// a thread, or if called within the bootstrap context.
pub fn current() -> Option<Arc<Self>> {
Task::current()?
.data()
.downcast_ref::<Weak<Thread>>()?
.upgrade()
}
pub(in crate::thread) fn task(&self) -> &Arc<Task> {
&self.task
}
/// Runs this thread at once.
pub fn run(&self) {
self.set_status(ThreadStatus::Running);
self.task.run();
}
pub(super) fn exit(&self) {
self.set_status(ThreadStatus::Exited);
}
/// Returns the reference to the atomic status.
pub fn atomic_status(&self) -> &AtomicThreadStatus {
&self.status
}
/// Returns the current status.
pub fn status(&self) -> ThreadStatus {
self.status.load(Ordering::Acquire)
}
/// Updates the status with the `new` value.
pub fn set_status(&self, new_status: ThreadStatus) {
self.status.store(new_status, Ordering::Release);
}
pub fn yield_now() {
Task::yield_now()
}
pub fn tid(&self) -> Tid {
self.tid
}
/// Returns the associated data.
///
/// The return type must be borrowed box, otherwise the `downcast_ref` will fail.
#[allow(clippy::borrowed_box)]
pub fn data(&self) -> &Box<dyn Send + Sync + Any> {
&self.data
}
}
/// Allocates a new tid for the new thread
pub fn allocate_tid() -> Tid {
TID_ALLOCATOR.fetch_add(1, Ordering::SeqCst)
}

View File

@ -0,0 +1,63 @@
// SPDX-License-Identifier: MPL-2.0
use core::sync::atomic::{AtomicU8, Ordering};
use int_to_c_enum::TryFromInt;
/// A `ThreadStatus` which can be safely shared between threads.
#[derive(Debug)]
pub struct AtomicThreadStatus(AtomicU8);
impl AtomicThreadStatus {
/// Creates a new atomic status.
pub fn new(status: ThreadStatus) -> Self {
Self(AtomicU8::new(status as u8))
}
/// Loads a value from the atomic status.
pub fn load(&self, order: Ordering) -> ThreadStatus {
ThreadStatus::try_from(self.0.load(order)).unwrap()
}
/// Stores a value into the atomic status.
pub fn store(&self, new_status: ThreadStatus, order: Ordering) {
self.0.store(new_status as u8, order);
}
/// Stores a value into the atomic status if the current value is the same as the `current` value.
pub fn compare_exchange(
&self,
current: ThreadStatus,
new: ThreadStatus,
success: Ordering,
failure: Ordering,
) -> Result<ThreadStatus, ThreadStatus> {
self.0
.compare_exchange(current as u8, new as u8, success, failure)
.map(|val| ThreadStatus::try_from(val).unwrap())
.map_err(|val| ThreadStatus::try_from(val).unwrap())
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, TryFromInt)]
#[repr(u8)]
pub enum ThreadStatus {
Init = 0,
Running = 1,
Exited = 2,
Stopped = 3,
}
impl ThreadStatus {
pub fn is_running(&self) -> bool {
*self == ThreadStatus::Running
}
pub fn is_exited(&self) -> bool {
*self == ThreadStatus::Exited
}
pub fn is_stopped(&self) -> bool {
*self == ThreadStatus::Stopped
}
}

96
kernel/src/thread/task.rs Normal file
View File

@ -0,0 +1,96 @@
// SPDX-License-Identifier: MPL-2.0
use ostd::{
task::{Task, TaskOptions},
user::{ReturnReason, UserContextApi, UserMode, UserSpace},
};
use super::Thread;
use crate::{
cpu::LinuxAbi,
prelude::*,
process::{posix_thread::PosixThreadExt, signal::handle_pending_signal},
syscall::handle_syscall,
thread::exception::handle_exception,
vm::vmar::is_userspace_vaddr,
};
/// create new task with userspace and parent process
pub fn create_new_user_task(user_space: Arc<UserSpace>, thread_ref: Weak<Thread>) -> Arc<Task> {
fn user_task_entry() {
let current_thread = current_thread!();
let current_posix_thread = current_thread.as_posix_thread().unwrap();
let current_process = current_posix_thread.process();
let current_task = current_thread.task();
let user_space = current_task
.user_space()
.expect("user task should have user space");
let mut user_mode = UserMode::new(user_space);
debug!(
"[Task entry] rip = 0x{:x}",
user_mode.context().instruction_pointer()
);
debug!(
"[Task entry] rsp = 0x{:x}",
user_mode.context().stack_pointer()
);
debug!(
"[Task entry] rax = 0x{:x}",
user_mode.context().syscall_ret()
);
let child_tid_ptr = *current_posix_thread.set_child_tid().lock();
// The `clone` syscall may require child process to write the thread pid to the specified address.
// Make sure the store operation completes before the clone call returns control to user space
// in the child process.
if is_userspace_vaddr(child_tid_ptr) {
CurrentUserSpace::get()
.write_val(child_tid_ptr, &current_thread.tid())
.unwrap();
}
let has_kernel_event_fn = || current_posix_thread.has_pending();
let ctx = Context {
process: current_process.as_ref(),
posix_thread: current_posix_thread,
thread: current_thread.as_ref(),
task: current_task.as_ref(),
};
loop {
let return_reason = user_mode.execute(has_kernel_event_fn);
let user_ctx = user_mode.context_mut();
// handle user event:
match return_reason {
ReturnReason::UserException => handle_exception(&ctx, user_ctx),
ReturnReason::UserSyscall => handle_syscall(&ctx, user_ctx),
ReturnReason::KernelEvent => {}
};
if current_thread.status().is_exited() {
break;
}
handle_pending_signal(user_ctx, &current_thread).unwrap();
// If current is suspended, wait for a signal to wake up self
while current_thread.status().is_stopped() {
Thread::yield_now();
debug!("{} is suspended.", current_thread.tid());
handle_pending_signal(user_ctx, &current_thread).unwrap();
}
if current_thread.status().is_exited() {
debug!("exit due to signal");
break;
}
}
debug!("exit user loop");
}
TaskOptions::new(user_task_entry)
.data(thread_ref)
.user_space(Some(user_space))
.build()
.expect("spawn task failed")
}

View File

@ -0,0 +1,21 @@
// SPDX-License-Identifier: MPL-2.0
use super::{Thread, Tid};
use crate::prelude::*;
lazy_static! {
static ref THREAD_TABLE: Mutex<BTreeMap<Tid, Arc<Thread>>> = Mutex::new(BTreeMap::new());
}
pub fn add_thread(thread: Arc<Thread>) {
let tid = thread.tid();
THREAD_TABLE.lock().insert(tid, thread);
}
pub fn remove_thread(tid: Tid) {
THREAD_TABLE.lock().remove(&tid);
}
pub fn get_thread(tid: Tid) -> Option<Arc<Thread>> {
THREAD_TABLE.lock().get(&tid).cloned()
}

View File

@ -0,0 +1,201 @@
// SPDX-License-Identifier: MPL-2.0
#![allow(dead_code)]
use ostd::cpu::CpuSet;
use spin::Once;
use work_item::WorkItem;
use worker_pool::WorkerPool;
use crate::prelude::*;
mod simple_scheduler;
pub mod work_item;
pub mod worker;
pub mod worker_pool;
static WORKERPOOL_NORMAL: Once<Arc<WorkerPool>> = Once::new();
static WORKERPOOL_HIGH_PRI: Once<Arc<WorkerPool>> = Once::new();
static WORKQUEUE_GLOBAL_NORMAL: Once<Arc<WorkQueue>> = Once::new();
static WORKQUEUE_GLOBAL_HIGH_PRI: Once<Arc<WorkQueue>> = Once::new();
/// Work queue mechanism.
///
/// # Overview
///
/// A `workqueue` is a kernel-level mechanism used to schedule and execute deferred work.
/// Deferred work refers to tasks that need to be executed at some point in the future,
/// but not necessarily immediately.
///
/// The workqueue mechanism is implemented using a combination of kernel threads and data
/// structures such as `WorkItem`, `WorkQueue`, `Worker` and `WorkerPool`. The `WorkItem`
/// represents a task to be processed, while the `WorkQueue` maintains the queue of submitted
/// `WorkItems`. The `Worker` is responsible for processing these submitted tasks,
/// and the `WorkerPool` manages and schedules these workers.
///
/// # Examples
///
/// The system has a default work queue and worker pool,
/// and it also provides high-level APIs for users to use.
/// Here is a basic example to how to use those APIs.
///
/// ```rust
/// use crate::thread::work_queue::{submit_work_func, submit_work_item, WorkItem};
///
/// // Submit to high priority queue.
/// submit_work_func(||{ }, true);
///
/// // Submit to low priority queue.
/// submit_work_func(||{ }, false);
///
/// fn deferred_task(){
/// // ...
/// }
///
/// // Create a work item.
/// let work_item = Arc::new(WorkItem::new(Box::new(deferred_task)));
///
/// // Submit to high priority queue.
/// submit_work_item(work_item, true);
///
/// // Submit to low priority queue.
/// submit_work_item(work_item, false);
/// ```
///
/// Certainly, users can also create a dedicated WorkQueue and WorkerPool.
///
/// ```rust
/// use ostd::cpu::CpuSet;
/// use crate::thread::work_queue::{WorkQueue, WorkerPool, WorkItem};
///
/// fn deferred_task(){
/// // ...
/// }
///
/// let cpu_set = CpuSet::new_full();
/// let high_pri_pool = WorkerPool::new(true, cpu_set);
/// let my_queue = WorkQueue::new(Arc::downgrade(high_pri_pool.get().unwrap()));
///
/// let work_item = Arc::new(WorkItem::new(Box::new(deferred_task)));
/// my_queue.enqueue(work_item);
///
/// ```
/// Submit a function to a global work queue.
pub fn submit_work_func<F>(work_func: F, work_priority: WorkPriority)
where
F: Fn() + Send + Sync + 'static,
{
let work_item = Arc::new(WorkItem::new(Box::new(work_func)));
submit_work_item(work_item, work_priority);
}
/// Submit a work item to a global work queue.
pub fn submit_work_item(work_item: Arc<WorkItem>, work_priority: WorkPriority) -> bool {
match work_priority {
WorkPriority::High => WORKQUEUE_GLOBAL_HIGH_PRI
.get()
.unwrap()
.enqueue(work_item.clone()),
WorkPriority::Normal => WORKQUEUE_GLOBAL_NORMAL
.get()
.unwrap()
.enqueue(work_item.clone()),
}
}
/// A work queue maintains a series of work items to be handled
/// asynchronously in a process context.
pub struct WorkQueue {
worker_pool: Weak<WorkerPool>,
inner: SpinLock<WorkQueueInner>,
}
struct WorkQueueInner {
pending_work_items: Vec<Arc<WorkItem>>,
}
impl WorkQueue {
/// Create a `WorkQueue` and specify a `WorkerPool` to
/// process the submitted `WorkItems`.
pub fn new(worker_pool: Weak<WorkerPool>) -> Arc<Self> {
let queue = Arc::new(WorkQueue {
worker_pool: worker_pool.clone(),
inner: SpinLock::new(WorkQueueInner {
pending_work_items: Vec::new(),
}),
});
worker_pool
.upgrade()
.unwrap()
.assign_work_queue(queue.clone());
queue
}
/// Submit a work item. Return `false` if the work item is currently pending.
pub fn enqueue(&self, work_item: Arc<WorkItem>) -> bool {
if !work_item.try_pending() {
return false;
}
self.inner
.disable_irq()
.lock()
.pending_work_items
.push(work_item);
if let Some(worker_pool) = self.worker_pool.upgrade() {
worker_pool.schedule()
}
true
}
/// Request a pending work item. The `request_cpu` indicates the CPU where
/// the calling worker is located.
fn dequeue(&self, request_cpu: u32) -> Option<Arc<WorkItem>> {
let mut inner = self.inner.disable_irq().lock();
let index = inner
.pending_work_items
.iter()
.position(|item| item.is_valid_cpu(request_cpu))?;
let item = inner.pending_work_items.remove(index);
Some(item)
}
fn has_pending_work_items(&self, request_cpu: u32) -> bool {
self.inner
.disable_irq()
.lock()
.pending_work_items
.iter()
.any(|item| item.is_valid_cpu(request_cpu))
}
}
/// Initialize global worker pools and work queues.
pub fn init() {
WORKERPOOL_NORMAL.call_once(|| {
let cpu_set = CpuSet::new_full();
WorkerPool::new(WorkPriority::Normal, cpu_set)
});
WORKERPOOL_NORMAL.get().unwrap().run();
WORKERPOOL_HIGH_PRI.call_once(|| {
let cpu_set = CpuSet::new_full();
WorkerPool::new(WorkPriority::High, cpu_set)
});
WORKERPOOL_HIGH_PRI.get().unwrap().run();
WORKQUEUE_GLOBAL_NORMAL
.call_once(|| WorkQueue::new(Arc::downgrade(WORKERPOOL_NORMAL.get().unwrap())));
WORKQUEUE_GLOBAL_HIGH_PRI
.call_once(|| WorkQueue::new(Arc::downgrade(WORKERPOOL_HIGH_PRI.get().unwrap())));
}
impl Drop for WorkQueue {
fn drop(&mut self) {
//TODO: Handling non-empty queues.
}
}
#[derive(PartialEq)]
pub enum WorkPriority {
High,
Normal,
}

View File

@ -0,0 +1,36 @@
// SPDX-License-Identifier: MPL-2.0
use alloc::sync::Weak;
use super::worker_pool::{WorkerPool, WorkerScheduler};
/// SimpleScheduler is the simplest scheduling implementation.
/// Only when there is a liveness problem in the workerpool, increase the workers,
/// set the upper limit of the workers, and do not actively reduce the workers.
/// And it only adds one worker at a time for each scheduling.
pub struct SimpleScheduler {
worker_pool: Weak<WorkerPool>,
}
impl SimpleScheduler {
pub fn new(worker_pool: Weak<WorkerPool>) -> Self {
Self { worker_pool }
}
}
const WORKER_LIMIT: u16 = 16;
impl WorkerScheduler for SimpleScheduler {
fn schedule(&self) {
let worker_pool = self.worker_pool.upgrade().unwrap();
for cpu_id in worker_pool.cpu_set().iter() {
if !worker_pool.heartbeat(cpu_id as u32)
&& worker_pool.has_pending_work_items(cpu_id as u32)
&& !worker_pool.wake_worker(cpu_id as u32)
&& worker_pool.num_workers(cpu_id as u32) < WORKER_LIMIT
{
worker_pool.add_worker(cpu_id as u32);
}
}
}
}

View File

@ -0,0 +1,61 @@
// SPDX-License-Identifier: MPL-2.0
#![allow(dead_code)]
use core::sync::atomic::{AtomicBool, Ordering};
use ostd::cpu::CpuSet;
use crate::prelude::*;
/// A task to be executed by a worker thread.
pub struct WorkItem {
work_func: Box<dyn Fn() + Send + Sync>,
cpu_affinity: CpuSet,
was_pending: AtomicBool,
}
impl WorkItem {
pub fn new(work_func: Box<dyn Fn() + Send + Sync>) -> WorkItem {
let cpu_affinity = CpuSet::new_full();
WorkItem {
work_func,
cpu_affinity,
was_pending: AtomicBool::new(false),
}
}
pub fn cpu_affinity(&self) -> &CpuSet {
&self.cpu_affinity
}
pub fn cpu_affinity_mut(&mut self) -> &mut CpuSet {
&mut self.cpu_affinity
}
pub(super) fn is_valid_cpu(&self, cpu_id: u32) -> bool {
self.cpu_affinity.contains(cpu_id)
}
pub(super) fn set_processing(&self) {
self.was_pending.store(false, Ordering::Release);
}
pub(super) fn set_pending(&self) {
self.was_pending.store(true, Ordering::Release);
}
pub(super) fn is_pending(&self) -> bool {
self.was_pending.load(Ordering::Acquire)
}
pub(super) fn try_pending(&self) -> bool {
self.was_pending
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
pub(super) fn call_work_func(&self) {
self.work_func.call(())
}
}

View File

@ -0,0 +1,123 @@
// SPDX-License-Identifier: MPL-2.0
#![allow(dead_code)]
use ostd::{cpu::CpuSet, task::Priority};
use super::worker_pool::WorkerPool;
use crate::{
prelude::*,
thread::kernel_thread::{KernelThreadExt, ThreadOptions},
Thread,
};
/// A worker thread. A `Worker` will attempt to retrieve unfinished
/// work items from its corresponding `WorkerPool`. If there are none,
/// it will go to sleep and be rescheduled when a new work item is
/// added to the `WorkerPool`.
pub(super) struct Worker {
worker_pool: Weak<WorkerPool>,
bound_thread: Arc<Thread>,
bound_cpu: u32,
inner: SpinLock<WorkerInner>,
}
struct WorkerInner {
worker_status: WorkerStatus,
}
#[derive(PartialEq)]
enum WorkerStatus {
Idle,
Running,
Exited,
/// This state only occurs when destructing the `WorkerPool`,
/// where workers will exit after processing the remaining work items.
Destroying,
}
impl Worker {
/// Creates a new `Worker` to the given `worker_pool`.
pub(super) fn new(worker_pool: Weak<WorkerPool>, bound_cpu: u32) -> Arc<Self> {
Arc::new_cyclic(|worker_ref| {
let weal_worker = worker_ref.clone();
let task_fn = Box::new(move || {
let current_worker: Arc<Worker> = weal_worker.upgrade().unwrap();
current_worker.run_worker_loop();
});
let mut cpu_affinity = CpuSet::new_empty();
cpu_affinity.add(bound_cpu);
let mut priority = Priority::normal();
if worker_pool.upgrade().unwrap().is_high_priority() {
priority = Priority::high();
}
let bound_thread = Thread::new_kernel_thread(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(priority),
);
Self {
worker_pool,
bound_thread,
bound_cpu,
inner: SpinLock::new(WorkerInner {
worker_status: WorkerStatus::Running,
}),
}
})
}
pub(super) fn run(&self) {
self.bound_thread.run();
}
/// The thread function bound to normal workers.
/// It pulls a work item from the work queue and sleeps if there is no more pending items.
fn run_worker_loop(self: &Arc<Self>) {
loop {
let worker_pool = self.worker_pool.upgrade();
let Some(worker_pool) = worker_pool else {
break;
};
if let Some(work_item) = worker_pool.fetch_pending_work_item(self.bound_cpu) {
work_item.set_processing();
work_item.call_work_func();
worker_pool.set_heartbeat(self.bound_cpu, true);
} else {
if self.is_destroying() {
break;
}
self.inner.disable_irq().lock().worker_status = WorkerStatus::Idle;
worker_pool.idle_current_worker(self.bound_cpu, self.clone());
if !self.is_destroying() {
self.inner.disable_irq().lock().worker_status = WorkerStatus::Running;
}
}
}
self.exit();
}
pub(super) fn bound_thread(&self) -> &Arc<Thread> {
&self.bound_thread
}
pub(super) fn is_idle(&self) -> bool {
self.inner.disable_irq().lock().worker_status == WorkerStatus::Idle
}
pub(super) fn is_destroying(&self) -> bool {
self.inner.disable_irq().lock().worker_status == WorkerStatus::Destroying
}
pub(super) fn destroy(&self) {
self.inner.disable_irq().lock().worker_status = WorkerStatus::Destroying;
}
fn exit(&self) {
self.inner.disable_irq().lock().worker_status = WorkerStatus::Exited;
}
pub(super) fn is_exit(&self) -> bool {
self.inner.disable_irq().lock().worker_status == WorkerStatus::Exited
}
}

View File

@ -0,0 +1,273 @@
// SPDX-License-Identifier: MPL-2.0
#![allow(dead_code)]
use core::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use ostd::{cpu::CpuSet, sync::WaitQueue, task::Priority};
use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue};
use crate::{
prelude::*,
thread::kernel_thread::{KernelThreadExt, ThreadOptions},
Thread,
};
/// A pool of workers.
///
/// The `WorkerPool` maintains workers created from different CPUs, while clustering workers
/// from the same CPU into a `LocalWorkerPool` for better management.
pub struct WorkerPool {
local_pools: Vec<Arc<LocalWorkerPool>>,
/// Monitor invokes `schedule()` in WorkerScheduler to determine whether there is a need for
/// adding or removing workers.
monitor: Arc<Monitor>,
priority: WorkPriority,
cpu_set: CpuSet,
scheduler: Arc<dyn WorkerScheduler>,
work_queues: SpinLock<Vec<Arc<WorkQueue>>>,
}
/// A set of workers for a specific CPU.
pub struct LocalWorkerPool {
cpu_id: u32,
idle_wait_queue: WaitQueue,
parent: Weak<WorkerPool>,
/// A liveness check for LocalWorkerPool. The monitor periodically clears heartbeat,
/// and when a worker completes an item, it will be set to indicate that there is still
/// an active worker. If there is no heartbeats and there are still pending work items,
/// it suggests that more workers are needed.
heartbeat: AtomicBool,
workers: SpinLock<VecDeque<Arc<Worker>>>,
}
/// Schedule `Workers` for a `WorkerPool`.
///
/// Having an excessive number of Workers in WorkerPool may result in wastage of system
/// resources, while a shortage of workers may lead to longer response time for workitems.
/// A well-designed WorkerScheduler must strike a balance between resource utilization and response time.
pub trait WorkerScheduler: Sync + Send {
/// Schedule workers in a worker pool. This needs to solve two problems: when to increase or decrease
/// workers, and how to add or remove workers to keep the number of workers in a reasonable range.
fn schedule(&self);
}
/// The `Monitor` is responsible for monitoring the `WorkerPool` for scheduling needs.
/// Currently, it only performs a liveness check, and attempts to schedule when no workers
/// are found processing in the pool.
pub struct Monitor {
worker_pool: Weak<WorkerPool>,
bound_thread: Arc<Thread>,
}
impl LocalWorkerPool {
fn new(worker_pool: Weak<WorkerPool>, cpu_id: u32) -> Self {
LocalWorkerPool {
cpu_id,
idle_wait_queue: WaitQueue::new(),
parent: worker_pool,
heartbeat: AtomicBool::new(false),
workers: SpinLock::new(VecDeque::new()),
}
}
fn add_worker(&self) {
let worker = Worker::new(self.parent.clone(), self.cpu_id);
self.workers.disable_irq().lock().push_back(worker.clone());
worker.bound_thread().run();
}
fn remove_worker(&self) {
let mut workers = self.workers.disable_irq().lock();
for (index, worker) in workers.iter().enumerate() {
if worker.is_idle() {
worker.destroy();
workers.remove(index);
break;
}
}
}
fn wake_worker(&self) -> bool {
self.idle_wait_queue.wake_one()
}
fn has_pending_work_items(&self) -> bool {
self.parent
.upgrade()
.unwrap()
.has_pending_work_items(self.cpu_id)
}
fn heartbeat(&self) -> bool {
self.heartbeat.load(Ordering::Acquire)
}
fn set_heartbeat(&self, heartbeat: bool) {
self.heartbeat.store(heartbeat, Ordering::Release);
}
fn idle_current_worker(&self, worker: Arc<Worker>) {
self.idle_wait_queue
.wait_until(|| (worker.is_destroying() || self.has_pending_work_items()).then_some(0));
}
fn destroy_all_workers(&self) {
for worker in self.workers.disable_irq().lock().iter() {
worker.destroy();
}
self.idle_wait_queue.wake_all();
}
}
impl WorkerPool {
pub fn new(priority: WorkPriority, cpu_set: CpuSet) -> Arc<Self> {
Arc::new_cyclic(|pool_ref| {
let mut local_pools = Vec::new();
for cpu_id in cpu_set.iter() {
local_pools.push(Arc::new(LocalWorkerPool::new(
pool_ref.clone(),
cpu_id as u32,
)));
}
WorkerPool {
local_pools,
monitor: Monitor::new(pool_ref.clone(), &priority),
priority,
cpu_set,
scheduler: Arc::new(SimpleScheduler::new(pool_ref.clone())),
work_queues: SpinLock::new(Vec::new()),
}
})
}
pub fn run(&self) {
self.monitor.run();
}
pub fn assign_work_queue(&self, work_queue: Arc<WorkQueue>) {
self.work_queues.disable_irq().lock().push(work_queue);
}
pub fn has_pending_work_items(&self, request_cpu: u32) -> bool {
self.work_queues
.disable_irq()
.lock()
.iter()
.any(|work_queue| work_queue.has_pending_work_items(request_cpu))
}
pub fn schedule(&self) {
self.scheduler.schedule();
}
pub fn num_workers(&self, cpu_id: u32) -> u16 {
self.local_pool(cpu_id).workers.disable_irq().lock().len() as u16
}
pub fn cpu_set(&self) -> &CpuSet {
&self.cpu_set
}
pub(super) fn fetch_pending_work_item(&self, request_cpu: u32) -> Option<Arc<WorkItem>> {
for work_queue in self.work_queues.disable_irq().lock().iter() {
let item = work_queue.dequeue(request_cpu);
if item.is_some() {
return item;
}
}
None
}
fn local_pool(&self, cpu_id: u32) -> &Arc<LocalWorkerPool> {
self.local_pools
.iter()
.find(|local_pool: &&Arc<LocalWorkerPool>| local_pool.cpu_id == cpu_id)
.unwrap()
}
pub(super) fn wake_worker(&self, cpu_id: u32) -> bool {
self.local_pool(cpu_id).wake_worker()
}
pub(super) fn add_worker(&self, cpu_id: u32) {
self.local_pool(cpu_id).add_worker();
}
pub(super) fn remove_worker(&self, cpu_id: u32) {
self.local_pool(cpu_id).remove_worker();
}
pub(super) fn is_high_priority(&self) -> bool {
self.priority == WorkPriority::High
}
pub(super) fn heartbeat(&self, cpu_id: u32) -> bool {
self.local_pool(cpu_id).heartbeat()
}
pub(super) fn set_heartbeat(&self, cpu_id: u32, heartbeat: bool) {
self.local_pool(cpu_id).set_heartbeat(heartbeat)
}
pub(super) fn idle_current_worker(&self, cpu_id: u32, worker: Arc<Worker>) {
self.local_pool(cpu_id).idle_current_worker(worker);
}
}
impl Drop for WorkerPool {
fn drop(&mut self) {
for local_pool in self.local_pools.iter() {
local_pool.destroy_all_workers();
}
}
}
impl Monitor {
pub fn new(worker_pool: Weak<WorkerPool>, priority: &WorkPriority) -> Arc<Self> {
Arc::new_cyclic(|monitor_ref| {
let weal_monitor = monitor_ref.clone();
let task_fn = Box::new(move || {
let current_monitor: Arc<Monitor> = weal_monitor.upgrade().unwrap();
current_monitor.run_monitor_loop();
});
let cpu_affinity = CpuSet::new_full();
let priority = match priority {
WorkPriority::High => Priority::high(),
WorkPriority::Normal => Priority::normal(),
};
let bound_thread = Thread::new_kernel_thread(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(priority),
);
Self {
worker_pool,
bound_thread,
}
})
}
pub fn run(&self) {
self.bound_thread.run();
}
fn run_monitor_loop(self: &Arc<Self>) {
let sleep_queue = WaitQueue::new();
let sleep_duration = Duration::from_millis(100);
loop {
let worker_pool = self.worker_pool.upgrade();
let Some(worker_pool) = worker_pool else {
break;
};
worker_pool.schedule();
for local_pool in worker_pool.local_pools.iter() {
local_pool.set_heartbeat(false);
}
sleep_queue.wait_until_or_timeout(|| -> Option<()> { None }, &sleep_duration);
}
}
}