diff --git a/kernel/aster-nix/src/sched/priority_scheduler.rs b/kernel/aster-nix/src/sched/priority_scheduler.rs index c4928b61..c769876d 100644 --- a/kernel/aster-nix/src/sched/priority_scheduler.rs +++ b/kernel/aster-nix/src/sched/priority_scheduler.rs @@ -1,56 +1,234 @@ // SPDX-License-Identifier: MPL-2.0 -use intrusive_collections::LinkedList; -use ostd::task::{set_scheduler, Scheduler, Task, TaskAdapter}; +use ostd::{ + cpu::{num_cpus, this_cpu}, + task::{ + scheduler::{inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, UpdateFlags}, + AtomicCpuId, Priority, Task, + }, +}; use crate::prelude::*; pub fn init() { - let preempt_scheduler = Box::new(PreemptScheduler::new()); - let scheduler = Box::::leak(preempt_scheduler); - set_scheduler(scheduler); + let preempt_scheduler = Box::new(PreemptScheduler::default()); + let scheduler = Box::>::leak(preempt_scheduler); + inject_scheduler(scheduler); } -/// The preempt scheduler +/// The preempt scheduler. /// -/// Real-time tasks are placed in the `real_time_tasks` queue and +/// Real-time tasks are placed in the `real_time_entities` queue and /// are always prioritized during scheduling. -/// Normal tasks are placed in the `normal_tasks` queue and are only +/// Normal tasks are placed in the `normal_entities` queue and are only /// scheduled for execution when there are no real-time tasks. -struct PreemptScheduler { - /// Tasks with a priority of less than 100 are regarded as real-time tasks. - real_time_tasks: SpinLock>, - /// Tasks with a priority greater than or equal to 100 are regarded as normal tasks. - normal_tasks: SpinLock>, +struct PreemptScheduler { + rq: Vec>>, } -impl PreemptScheduler { +impl PreemptScheduler { + fn new(nr_cpus: u32) -> Self { + let mut rq = Vec::with_capacity(nr_cpus as usize); + for _ in 0..nr_cpus { + rq.push(SpinLock::new(PreemptRunQueue::new())); + } + Self { rq } + } + + /// Selects a cpu for task to run on. + fn select_cpu(&self, _runnable: &Arc) -> u32 { + // FIXME: adopt more reasonable policy once we fully enable SMP. + 0 + } +} + +impl Scheduler for PreemptScheduler { + fn enqueue(&self, runnable: Arc, flags: EnqueueFlags) -> Option { + let mut still_in_rq = false; + let target_cpu = { + let mut cpu_id = self.select_cpu(&runnable); + if let Err(task_cpu_id) = runnable.cpu().set_if_is_none(cpu_id) { + debug_assert!(flags != EnqueueFlags::Spawn); + still_in_rq = true; + cpu_id = task_cpu_id; + } + + cpu_id + }; + + let mut rq = self.rq[target_cpu as usize].lock_irq_disabled(); + if still_in_rq && let Err(_) = runnable.cpu().set_if_is_none(target_cpu) { + return None; + } + let entity = PreemptSchedEntity::new(runnable); + if entity.is_real_time() { + rq.real_time_entities.push_back(entity); + } else { + rq.normal_entities.push_back(entity); + } + + Some(target_cpu) + } + + fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue)) { + let local_rq: &PreemptRunQueue = &self.rq[this_cpu() as usize].lock_irq_disabled(); + f(local_rq); + } + + fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue)) { + let local_rq: &mut PreemptRunQueue = + &mut self.rq[this_cpu() as usize].lock_irq_disabled(); + f(local_rq); + } +} + +impl Default for PreemptScheduler { + fn default() -> Self { + Self::new(num_cpus()) + } +} + +struct PreemptRunQueue { + current: Option>, + real_time_entities: VecDeque>, + normal_entities: VecDeque>, +} + +impl PreemptRunQueue { pub fn new() -> Self { Self { - real_time_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())), - normal_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())), + current: None, + real_time_entities: VecDeque::new(), + normal_entities: VecDeque::new(), } } } -impl Scheduler for PreemptScheduler { - fn enqueue(&self, task: Arc) { - if task.is_real_time() { - self.real_time_tasks.lock_irq_disabled().push_back(task); - } else { - self.normal_tasks.lock_irq_disabled().push_back(task); +impl LocalRunQueue for PreemptRunQueue { + fn current(&self) -> Option<&Arc> { + self.current.as_ref().map(|entity| &entity.runnable) + } + + fn update_current(&mut self, flags: UpdateFlags) -> bool { + match flags { + UpdateFlags::Tick => { + let Some(ref mut current_entity) = self.current else { + return false; + }; + current_entity.tick() + || (!current_entity.is_real_time() && !self.real_time_entities.is_empty()) + } + _ => true, } } - fn dequeue(&self) -> Option> { - if !self.real_time_tasks.lock_irq_disabled().is_empty() { - self.real_time_tasks.lock_irq_disabled().pop_front() + fn pick_next_current(&mut self) -> Option<&Arc> { + let next_entity = if !self.real_time_entities.is_empty() { + self.real_time_entities.pop_front() } else { - self.normal_tasks.lock_irq_disabled().pop_front() + self.normal_entities.pop_front() + }?; + if let Some(prev_entity) = self.current.replace(next_entity) { + if prev_entity.is_real_time() { + self.real_time_entities.push_back(prev_entity); + } else { + self.normal_entities.push_back(prev_entity); + } } + + Some(&self.current.as_ref().unwrap().runnable) } - fn should_preempt(&self, task: &Arc) -> bool { - !task.is_real_time() && !self.real_time_tasks.lock_irq_disabled().is_empty() + fn dequeue_current(&mut self) -> Option> { + self.current.take().map(|entity| { + let runnable = entity.runnable; + runnable.cpu().set_to_none(); + + runnable + }) + } +} + +struct PreemptSchedEntity { + runnable: Arc, + time_slice: TimeSlice, +} + +impl PreemptSchedEntity { + fn new(runnable: Arc) -> Self { + Self { + runnable, + time_slice: TimeSlice::default(), + } + } + + fn is_real_time(&self) -> bool { + self.runnable.is_real_time() + } + + fn tick(&mut self) -> bool { + self.time_slice.elapse() + } +} + +impl Clone for PreemptSchedEntity { + fn clone(&self) -> Self { + Self { + runnable: self.runnable.clone(), + time_slice: self.time_slice, + } + } +} + +#[derive(Clone, Copy)] +pub struct TimeSlice { + elapsed_ticks: u32, +} + +impl TimeSlice { + const DEFAULT_TIME_SLICE: u32 = 100; + + pub const fn new() -> Self { + TimeSlice { elapsed_ticks: 0 } + } + + pub fn elapse(&mut self) -> bool { + self.elapsed_ticks = (self.elapsed_ticks + 1) % Self::DEFAULT_TIME_SLICE; + + self.elapsed_ticks == 0 + } +} + +impl Default for TimeSlice { + fn default() -> Self { + Self::new() + } +} + +impl PreemptSchedInfo for Task { + type PRIORITY = Priority; + + const REAL_TIME_TASK_PRIORITY: Self::PRIORITY = Priority::new(100); + + fn priority(&self) -> Self::PRIORITY { + self.priority() + } + + fn cpu(&self) -> &AtomicCpuId { + self.cpu() + } +} + +trait PreemptSchedInfo { + type PRIORITY: Ord + PartialOrd + Eq + PartialEq; + + const REAL_TIME_TASK_PRIORITY: Self::PRIORITY; + + fn priority(&self) -> Self::PRIORITY; + + fn cpu(&self) -> &AtomicCpuId; + + fn is_real_time(&self) -> bool { + self.priority() < Self::REAL_TIME_TASK_PRIORITY } } diff --git a/kernel/aster-nix/src/thread/task.rs b/kernel/aster-nix/src/thread/task.rs index 63692339..d7b6ed3c 100644 --- a/kernel/aster-nix/src/thread/task.rs +++ b/kernel/aster-nix/src/thread/task.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 use ostd::{ - task::{preempt, Task, TaskOptions}, + task::{Task, TaskOptions}, user::{ReturnReason, UserContextApi, UserMode, UserSpace}, }; @@ -84,8 +84,6 @@ pub fn create_new_user_task(user_space: Arc, thread_ref: Weak debug!("exit due to signal"); break; } - // a preemption point after handling user event. - preempt(current_task); } debug!("exit user loop"); } diff --git a/kernel/aster-nix/src/thread/work_queue/worker_pool.rs b/kernel/aster-nix/src/thread/work_queue/worker_pool.rs index 77ad2f5a..4dfe9aa2 100644 --- a/kernel/aster-nix/src/thread/work_queue/worker_pool.rs +++ b/kernel/aster-nix/src/thread/work_queue/worker_pool.rs @@ -7,11 +7,7 @@ use core::{ time::Duration, }; -use ostd::{ - cpu::CpuSet, - sync::WaitQueue, - task::{add_task, Priority}, -}; +use ostd::{cpu::CpuSet, sync::WaitQueue, task::Priority}; use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue}; use crate::{ @@ -81,7 +77,7 @@ impl LocalWorkerPool { fn add_worker(&self) { let worker = Worker::new(self.parent.clone(), self.cpu_id); self.workers.lock_irq_disabled().push_back(worker.clone()); - add_task(worker.bound_thread().task().clone()); + worker.bound_thread().run(); } fn remove_worker(&self) { diff --git a/ostd/src/arch/x86/cpu/mod.rs b/ostd/src/arch/x86/cpu/mod.rs index b0c2f1ab..20cacf6c 100644 --- a/ostd/src/arch/x86/cpu/mod.rs +++ b/ostd/src/arch/x86/cpu/mod.rs @@ -17,6 +17,7 @@ use trapframe::UserContext as RawUserContext; use x86_64::registers::rflags::RFlags; use crate::{ + task::scheduler, trap::call_irq_callback_functions, user::{ReturnReason, UserContextApi, UserContextApiInternal}, }; @@ -50,32 +51,6 @@ pub struct CpuExceptionInfo { pub page_fault_addr: usize, } -/// User Preemption. -pub struct UserPreemption { - count: u32, -} - -impl UserPreemption { - const PREEMPTION_INTERVAL: u32 = 100; - - /// Creates a new instance of `UserPreemption`. - #[allow(clippy::new_without_default)] - pub const fn new() -> Self { - UserPreemption { count: 0 } - } - - /// Checks if preemption might occur and takes necessary actions. - pub fn might_preempt(&mut self) { - self.count = (self.count + 1) % Self::PREEMPTION_INTERVAL; - - if self.count == 0 { - crate::arch::irq::enable_local(); - crate::task::schedule(); - crate::arch::irq::disable_local(); - } - } -} - impl UserContext { /// Returns a reference to the general registers. pub fn general_regs(&self) -> &RawGeneralRegs { @@ -115,9 +90,9 @@ impl UserContextApiInternal for UserContext { let return_reason: ReturnReason; const SYSCALL_TRAPNUM: u16 = 0x100; - let mut user_preemption = UserPreemption::new(); // return when it is syscall or cpu exception type is Fault or Trap. loop { + scheduler::might_preempt(); self.user_context.run(); match CpuException::to_cpu_exception(self.user_context.trap_num as u16) { Some(exception) => { @@ -146,8 +121,6 @@ impl UserContextApiInternal for UserContext { return_reason = ReturnReason::KernelEvent; break; } - - user_preemption.might_preempt(); } crate::arch::irq::enable_local(); diff --git a/ostd/src/boot/mod.rs b/ostd/src/boot/mod.rs index 941f92ea..56f2ede7 100644 --- a/ostd/src/boot/mod.rs +++ b/ostd/src/boot/mod.rs @@ -129,7 +129,7 @@ pub fn call_ostd_main() -> ! { unsafe { use alloc::boxed::Box; - use crate::task::{set_scheduler, FifoScheduler, Scheduler, TaskOptions}; + use crate::task::TaskOptions; crate::init(); // The whitelists that will be generated by OSDK runner as static consts. @@ -137,10 +137,6 @@ pub fn call_ostd_main() -> ! { static KTEST_TEST_WHITELIST: Option<&'static [&'static str]>; static KTEST_CRATE_WHITELIST: Option<&'static [&'static str]>; } - // Set the global scheduler a FIFO scheduler. - let simple_scheduler = Box::new(FifoScheduler::new()); - let static_scheduler: &'static dyn Scheduler = Box::leak(simple_scheduler); - set_scheduler(static_scheduler); let test_task = move || { run_ktests(KTEST_TEST_WHITELIST, KTEST_CRATE_WHITELIST); diff --git a/ostd/src/cpu/local/mod.rs b/ostd/src/cpu/local/mod.rs index 6e9c7b16..04b981ad 100644 --- a/ostd/src/cpu/local/mod.rs +++ b/ostd/src/cpu/local/mod.rs @@ -60,19 +60,20 @@ extern "C" { } cpu_local_cell! { - /// The count of the preempt lock. + /// A 4-byte preemption information consisting of a should_preempt flag at + /// the highest bit and a preemption counter in the lower 31 bits. /// - /// We need to access the preemption count before we can copy the section - /// for application processors. So, the preemption count is not copied from + /// We need to access the preemption info before we can copy the section + /// for application processors. So, the preemption info is not copied from /// bootstrap processor's section as the initialization. Instead it is /// initialized to zero for application processors. - pub(crate) static PREEMPT_LOCK_COUNT: u32 = 0; + pub(crate) static PREEMPT_INFO: u32 = 0; } /// Sets the base address of the CPU-local storage for the bootstrap processor. /// /// It should be called early to let [`crate::task::disable_preempt`] work, -/// which needs to update a CPU-local preempt lock count. Otherwise it may +/// which needs to update a CPU-local preemption info. Otherwise it may /// panic when calling [`crate::task::disable_preempt`]. /// /// # Safety @@ -133,16 +134,16 @@ pub unsafe fn init_on_bsp() { (ap_pages_ptr as *mut u32).write(cpu_i); } - // SAFETY: the `PREEMPT_LOCK_COUNT` may be dirty on the BSP, so we need + // SAFETY: the `PREEMPT_INFO` may be dirty on the BSP, so we need // to ensure that it is initialized to zero for APs. The safety // requirements are met since the static is defined in the `.cpu_local` // section and the pointer to that static is the offset in the CPU- // local area. It is a `usize` so it is safe to be overwritten. unsafe { - let preempt_count_ptr = &PREEMPT_LOCK_COUNT as *const _ as usize; - let preempt_count_offset = preempt_count_ptr - __cpu_local_start as usize; - let ap_preempt_count_ptr = ap_pages_ptr.add(preempt_count_offset) as *mut usize; - ap_preempt_count_ptr.write(0); + let preempt_info_ptr = &PREEMPT_INFO as *const _ as usize; + let preempt_info_offset = preempt_info_ptr - __cpu_local_start as usize; + let ap_preempt_info_ptr = ap_pages_ptr.add(preempt_info_offset) as *mut usize; + ap_preempt_info_ptr.write(0); } // SAFETY: bytes `8:16` are reserved for storing the pointer to the diff --git a/ostd/src/sync/wait.rs b/ostd/src/sync/wait.rs index 81cd950e..34b4a1c2 100644 --- a/ostd/src/sync/wait.rs +++ b/ostd/src/sync/wait.rs @@ -4,7 +4,7 @@ use alloc::{collections::VecDeque, sync::Arc}; use core::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use super::SpinLock; -use crate::task::{add_task, schedule, Task, TaskStatus}; +use crate::task::{scheduler, Task}; // # Explanation on the memory orders // @@ -255,36 +255,15 @@ impl Waker { if self.has_woken.swap(true, Ordering::Release) { return false; } - - let mut task = self.task.inner_exclusive_access(); - match task.task_status { - TaskStatus::Sleepy => { - task.task_status = TaskStatus::Runnable; - } - TaskStatus::Sleeping => { - task.task_status = TaskStatus::Runnable; - - // Avoid holding the lock when doing `add_task` - drop(task); - add_task(self.task.clone()); - } - _ => (), - } + scheduler::unpark_target(self.task.clone()); true } fn do_wait(&self) { - while !self.has_woken.swap(false, Ordering::Acquire) { - let mut task = self.task.inner_exclusive_access(); - // After holding the lock, check again to avoid races - if self.has_woken.swap(false, Ordering::Acquire) { - break; - } - task.task_status = TaskStatus::Sleepy; - drop(task); - - schedule(); + let has_woken = &self.has_woken; + while !has_woken.swap(false, Ordering::Acquire) { + scheduler::park_current(has_woken); } } diff --git a/ostd/src/task/mod.rs b/ostd/src/task/mod.rs index 61b76cd9..5b97c6a7 100644 --- a/ostd/src/task/mod.rs +++ b/ostd/src/task/mod.rs @@ -2,15 +2,12 @@ //! Tasks are the unit of code execution. -mod priority; mod processor; -mod scheduler; +pub mod scheduler; #[allow(clippy::module_inception)] mod task; pub use self::{ - priority::Priority, - processor::{disable_preempt, preempt, schedule, DisablePreemptGuard}, - scheduler::{add_task, set_scheduler, FifoScheduler, Scheduler}, - task::{Task, TaskAdapter, TaskContextApi, TaskOptions, TaskStatus}, + processor::{disable_preempt, DisablePreemptGuard}, + task::{AtomicCpuId, Priority, Task, TaskAdapter, TaskContextApi, TaskOptions}, }; diff --git a/ostd/src/task/processor.rs b/ostd/src/task/processor.rs index 48471908..9412a295 100644 --- a/ostd/src/task/processor.rs +++ b/ostd/src/task/processor.rs @@ -2,12 +2,8 @@ use alloc::sync::Arc; -use super::{ - scheduler::{fetch_task, GLOBAL_SCHEDULER}, - task::{context_switch, TaskContext}, - Task, TaskStatus, -}; -use crate::{cpu::local::PREEMPT_LOCK_COUNT, cpu_local_cell}; +use super::task::{context_switch, Task, TaskContext}; +use crate::{cpu::local::PREEMPT_INFO, cpu_local_cell}; cpu_local_cell! { /// The `Arc` (casted by [`Arc::into_raw`]) that is the current task. @@ -37,52 +33,19 @@ pub(super) fn current_task() -> Option> { Some(restored) } -/// Calls this function to switch to other task by using GLOBAL_SCHEDULER -pub fn schedule() { - if let Some(task) = fetch_task() { - switch_to_task(task); - } -} - -/// Preempts the `task`. -/// -/// TODO: This interface of this method is error prone. -/// The method takes an argument for the current task to optimize its efficiency, -/// but the argument provided by the caller may not be the current task, really. -/// Thus, this method should be removed or reworked in the future. -pub fn preempt(task: &Arc) { - // TODO: Refactor `preempt` and `schedule` - // after the Atomic mode and `might_break` is enabled. - let mut scheduler = GLOBAL_SCHEDULER.lock_irq_disabled(); - if !scheduler.should_preempt(task) { - return; - } - let Some(next_task) = scheduler.dequeue() else { - return; - }; - drop(scheduler); - switch_to_task(next_task); -} - /// Calls this function to switch to other task /// /// If current task is none, then it will use the default task context and it /// will not return to this function again. /// -/// If the current task's status not [`TaskStatus::Runnable`], it will not be -/// added to the scheduler. -/// /// # Panics /// /// This function will panic if called while holding preemption locks or with /// local IRQ disabled. -fn switch_to_task(next_task: Arc) { - let preemt_lock_count = PREEMPT_LOCK_COUNT.load(); - if preemt_lock_count != 0 { - panic!( - "Calling schedule() while holding {} locks", - preemt_lock_count - ); +pub(super) fn switch_to_task(next_task: Arc) { + let preemt_count = PREEMPT_COUNT.get(); + if preemt_count != 0 { + panic!("Switching task while holding {} locks", preemt_count); } assert!( @@ -93,7 +56,6 @@ fn switch_to_task(next_task: Arc) { let irq_guard = crate::trap::disable_local(); let current_task_ptr = CURRENT_TASK_PTR.load(); - let current_task_ctx_ptr = if current_task_ptr.is_null() { // SAFETY: Interrupts are disabled, so the pointer is safe to be fetched. unsafe { BOOTSTRAP_CONTEXT.as_ptr_mut() } @@ -104,24 +66,12 @@ fn switch_to_task(next_task: Arc) { let _ = core::mem::ManuallyDrop::new(restored.clone()); restored }; - let ctx_ptr = cur_task_arc.ctx().get(); - let mut task_inner = cur_task_arc.inner_exclusive_access(); - - debug_assert_ne!(task_inner.task_status, TaskStatus::Sleeping); - if task_inner.task_status == TaskStatus::Runnable { - drop(task_inner); - GLOBAL_SCHEDULER.lock().enqueue(cur_task_arc); - } else if task_inner.task_status == TaskStatus::Sleepy { - task_inner.task_status = TaskStatus::Sleeping; - } - ctx_ptr }; let next_task_ctx_ptr = next_task.ctx().get().cast_const(); - if let Some(next_user_space) = next_task.user_space() { next_user_space.vm_space().activate(); } @@ -144,7 +94,7 @@ fn switch_to_task(next_task: Arc) { drop(irq_guard); // SAFETY: - // 1. `ctx` is only used in `schedule()`. We have exclusive access to both the current task + // 1. `ctx` is only used in `reschedule()`. We have exclusive access to both the current task // context and the next task context. // 2. The next task context is a valid task context. unsafe { @@ -159,6 +109,34 @@ fn switch_to_task(next_task: Arc) { // to the next task switching. } +static PREEMPT_COUNT: PreemptCount = PreemptCount::new(); + +struct PreemptCount {} + +impl PreemptCount { + const SHIFT: u8 = 0; + + const BITS: u8 = 31; + + const MASK: u32 = ((1 << Self::BITS) - 1) << Self::SHIFT; + + const fn new() -> Self { + Self {} + } + + fn inc(&self) { + PREEMPT_INFO.add_assign(1 << Self::SHIFT); + } + + fn dec(&self) { + PREEMPT_INFO.sub_assign(1 << Self::SHIFT); + } + + fn get(&self) -> u32 { + PREEMPT_INFO.load() & Self::MASK + } +} + /// A guard for disable preempt. #[clippy::has_significant_drop] #[must_use] @@ -171,7 +149,7 @@ impl !Send for DisablePreemptGuard {} impl DisablePreemptGuard { fn new() -> Self { - PREEMPT_LOCK_COUNT.add_assign(1); + PREEMPT_COUNT.inc(); Self { _private: () } } @@ -184,7 +162,7 @@ impl DisablePreemptGuard { impl Drop for DisablePreemptGuard { fn drop(&mut self) { - PREEMPT_LOCK_COUNT.sub_assign(1); + PREEMPT_COUNT.dec(); } } diff --git a/ostd/src/task/scheduler.rs b/ostd/src/task/scheduler.rs deleted file mode 100644 index aada4154..00000000 --- a/ostd/src/task/scheduler.rs +++ /dev/null @@ -1,107 +0,0 @@ -// SPDX-License-Identifier: MPL-2.0 - -#![allow(dead_code)] - -use alloc::collections::VecDeque; - -use crate::{prelude::*, sync::SpinLock, task::Task}; - -static DEFAULT_SCHEDULER: FifoScheduler = FifoScheduler::new(); -pub(crate) static GLOBAL_SCHEDULER: SpinLock = SpinLock::new(GlobalScheduler { - scheduler: &DEFAULT_SCHEDULER, -}); - -/// A scheduler for tasks. -/// -/// An implementation of scheduler can attach scheduler-related information -/// with the `TypeMap` returned from `task.data()`. -pub trait Scheduler: Sync + Send { - /// Enqueues a task to the scheduler. - fn enqueue(&self, task: Arc); - - /// Dequeues a task from the scheduler. - fn dequeue(&self) -> Option>; - - /// Tells whether the given task should be preempted by other tasks in the queue. - fn should_preempt(&self, task: &Arc) -> bool; -} - -pub struct GlobalScheduler { - scheduler: &'static dyn Scheduler, -} - -impl GlobalScheduler { - pub const fn new(scheduler: &'static dyn Scheduler) -> Self { - Self { scheduler } - } - - /// dequeue a task using scheduler - /// require the scheduler is not none - pub fn dequeue(&mut self) -> Option> { - self.scheduler.dequeue() - } - /// enqueue a task using scheduler - /// require the scheduler is not none - pub fn enqueue(&mut self, task: Arc) { - self.scheduler.enqueue(task) - } - - pub fn should_preempt(&self, task: &Arc) -> bool { - self.scheduler.should_preempt(task) - } -} -/// Sets the global task scheduler. -/// -/// This must be called before invoking `Task::spawn`. -pub fn set_scheduler(scheduler: &'static dyn Scheduler) { - let mut global_scheduler = GLOBAL_SCHEDULER.lock_irq_disabled(); - // When setting a new scheduler, the old scheduler should be empty - assert!(global_scheduler.dequeue().is_none()); - global_scheduler.scheduler = scheduler; -} - -pub fn fetch_task() -> Option> { - GLOBAL_SCHEDULER.lock_irq_disabled().dequeue() -} - -/// Adds a task to the global scheduler. -pub fn add_task(task: Arc) { - GLOBAL_SCHEDULER.lock_irq_disabled().enqueue(task); -} - -/// A simple FIFO (First-In-First-Out) task scheduler. -pub struct FifoScheduler { - /// A thread-safe queue to hold tasks waiting to be executed. - task_queue: SpinLock>>, -} - -impl FifoScheduler { - /// Creates a new instance of `FifoScheduler`. - pub const fn new() -> Self { - FifoScheduler { - task_queue: SpinLock::new(VecDeque::new()), - } - } -} - -impl Default for FifoScheduler { - fn default() -> Self { - Self::new() - } -} - -impl Scheduler for FifoScheduler { - /// Enqueues a task to the end of the queue. - fn enqueue(&self, task: Arc) { - self.task_queue.lock_irq_disabled().push_back(task); - } - /// Dequeues a task from the front of the queue, if any. - fn dequeue(&self) -> Option> { - self.task_queue.lock_irq_disabled().pop_front() - } - /// In this simple implementation, task preemption is not supported. - /// Once a task starts running, it will continue to run until completion. - fn should_preempt(&self, _task: &Arc) -> bool { - false - } -} diff --git a/ostd/src/task/scheduler/fifo_scheduler.rs b/ostd/src/task/scheduler/fifo_scheduler.rs new file mode 100644 index 00000000..a153e9b5 --- /dev/null +++ b/ostd/src/task/scheduler/fifo_scheduler.rs @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: MPL-2.0 + +use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + +use super::{inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, UpdateFlags}; +use crate::{ + cpu::{num_cpus, this_cpu}, + sync::SpinLock, + task::{AtomicCpuId, Task}, +}; + +pub fn init() { + let fifo_scheduler = Box::new(FifoScheduler::default()); + let scheduler = Box::>::leak(fifo_scheduler); + inject_scheduler(scheduler); +} + +/// A simple FIFO (First-In-First-Out) task scheduler. +struct FifoScheduler { + /// A thread-safe queue to hold tasks waiting to be executed. + rq: Vec>>, +} + +impl FifoScheduler { + /// Creates a new instance of `FifoScheduler`. + fn new(nr_cpus: u32) -> Self { + let mut rq = Vec::new(); + for _ in 0..nr_cpus { + rq.push(SpinLock::new(FifoRunQueue::new())); + } + Self { rq } + } + + fn select_cpu(&self) -> u32 { + // FIXME: adopt more reasonable policy once we fully enable SMP. + 0 + } +} + +impl Scheduler for FifoScheduler { + fn enqueue(&self, runnable: Arc, flags: EnqueueFlags) -> Option { + let mut still_in_rq = false; + let target_cpu = { + let mut cpu_id = self.select_cpu(); + if let Err(task_cpu_id) = runnable.cpu().set_if_is_none(cpu_id) { + debug_assert!(flags != EnqueueFlags::Spawn); + still_in_rq = true; + cpu_id = task_cpu_id; + } + + cpu_id + }; + + let mut rq = self.rq[target_cpu as usize].lock_irq_disabled(); + if still_in_rq && let Err(_) = runnable.cpu().set_if_is_none(target_cpu) { + return None; + } + rq.queue.push_back(runnable); + + Some(target_cpu) + } + + fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue)) { + let local_rq: &FifoRunQueue = &self.rq[this_cpu() as usize].lock_irq_disabled(); + f(local_rq); + } + + fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue)) { + let local_rq: &mut FifoRunQueue = &mut self.rq[this_cpu() as usize].lock_irq_disabled(); + f(local_rq); + } +} + +struct FifoRunQueue { + current: Option>, + queue: VecDeque>, +} + +impl FifoRunQueue { + pub const fn new() -> Self { + Self { + current: None, + queue: VecDeque::new(), + } + } +} + +impl LocalRunQueue for FifoRunQueue { + fn current(&self) -> Option<&Arc> { + self.current.as_ref() + } + + fn update_current(&mut self, flags: super::UpdateFlags) -> bool { + !matches!(flags, UpdateFlags::Tick) + } + + fn pick_next_current(&mut self) -> Option<&Arc> { + let next_task = self.queue.pop_front()?; + if let Some(prev_task) = self.current.replace(next_task) { + self.queue.push_back(prev_task); + } + + self.current.as_ref() + } + + fn dequeue_current(&mut self) -> Option> { + self.current.take().inspect(|task| task.cpu().set_to_none()) + } +} + +impl Default for FifoScheduler { + fn default() -> Self { + Self::new(num_cpus()) + } +} + +impl FifoSchedInfo for Task { + fn cpu(&self) -> &AtomicCpuId { + self.cpu() + } +} + +trait FifoSchedInfo { + fn cpu(&self) -> &AtomicCpuId; +} diff --git a/ostd/src/task/scheduler/mod.rs b/ostd/src/task/scheduler/mod.rs new file mode 100644 index 00000000..15f81770 --- /dev/null +++ b/ostd/src/task/scheduler/mod.rs @@ -0,0 +1,271 @@ +// SPDX-License-Identifier: MPL-2.0 + +//! Scheduling subsystem (in-OSTD part). +//! +//! This module defines what OSTD expects from a scheduling implementation +//! and provides useful functions for controlling the execution flow. + +mod fifo_scheduler; + +use core::sync::atomic::{AtomicBool, Ordering}; + +use spin::Once; + +use super::{processor, task::Task}; +use crate::{ + arch::timer, + cpu::{local::PREEMPT_INFO, this_cpu}, + prelude::*, +}; + +/// Injects a scheduler implementation into framework. +/// +/// This function can only be called once and must be called during the initialization of kernel. +pub fn inject_scheduler(scheduler: &'static dyn Scheduler) { + SCHEDULER.call_once(|| scheduler); + + timer::register_callback(|| { + SCHEDULER.get().unwrap().local_mut_rq_with(&mut |local_rq| { + if local_rq.update_current(UpdateFlags::Tick) { + SHOULD_PREEMPT.set(); + } + }) + }); +} + +static SCHEDULER: Once<&'static dyn Scheduler> = Once::new(); + +/// A per-CPU task scheduler. +pub trait Scheduler: Sync + Send { + /// Enqueues a runnable task. + /// + /// Scheduler developers can perform load-balancing or some accounting work here. + /// + /// If the `current` of a CPU should be preempted, this method returns the id of + /// that CPU. + fn enqueue(&self, runnable: Arc, flags: EnqueueFlags) -> Option; + + /// Gets an immutable access to the local runqueue of the current CPU core. + fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue)); + + /// Gets a mutable access to the local runqueue of the current CPU core. + fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue)); +} + +/// The _local_ view of a per-CPU runqueue. +/// +/// This local view provides the interface for the runqueue of a CPU core +/// to be inspected and manipulated by the code running on this particular CPU core. +/// +/// Conceptually, a local runqueue consists of two parts: +/// (1) a priority queue of runnable tasks; +/// (2) the current running task. +/// The exact definition of "priority" is left for the concrete implementation to decide. +pub trait LocalRunQueue { + /// Gets the current runnable task. + fn current(&self) -> Option<&Arc>; + + /// Updates the current runnable task's scheduling statistics and potentially its + /// position in the queue. + /// + /// If the current runnable task should be preempted, the method returns `true`. + fn update_current(&mut self, flags: UpdateFlags) -> bool; + + /// Picks the next current runnable task. + /// + /// This method returns the chosen next current runnable task. If there is no + /// candidate for next current runnable task, this method returns `None`. + fn pick_next_current(&mut self) -> Option<&Arc>; + + /// Removes the current runnable task from runqueue. + /// + /// This method returns the current runnable task. If there is no current runnable + /// task, this method returns `None`. + fn dequeue_current(&mut self) -> Option>; +} + +/// Possible triggers of an `enqueue` action. +#[derive(PartialEq, Copy, Clone)] +pub enum EnqueueFlags { + /// Spawn a new task. + Spawn, + /// Wake a sleeping task. + Wake, +} + +/// Possible triggers of an `update_current` action. +#[derive(PartialEq, Copy, Clone)] +pub enum UpdateFlags { + /// Timer interrupt. + Tick, + /// Task waiting. + Wait, + /// Task yielding. + Yield, +} + +/// Preempts the current task. +pub(crate) fn might_preempt() { + fn preempt_check() -> bool { + PREEMPT_INFO.load() == 0 + } + + if !preempt_check() { + return; + } + yield_now(); +} + +/// Blocks the current task unless `has_woken` is `true`. +pub(crate) fn park_current(has_woken: &AtomicBool) { + let mut current = None; + let mut is_first_try = true; + reschedule(&mut |local_rq: &mut dyn LocalRunQueue| { + if is_first_try { + if has_woken.load(Ordering::Acquire) { + return ReschedAction::DoNothing; + } + current = local_rq.dequeue_current(); + local_rq.update_current(UpdateFlags::Wait); + } + if let Some(next_task) = local_rq.pick_next_current() { + if Arc::ptr_eq(current.as_ref().unwrap(), next_task) { + return ReschedAction::DoNothing; + } + ReschedAction::SwitchTo(next_task.clone()) + } else { + is_first_try = false; + ReschedAction::Retry + } + }); +} + +/// Unblocks a target task. +pub(crate) fn unpark_target(runnable: Arc) { + let should_preempt_info = SCHEDULER + .get() + .unwrap() + .enqueue(runnable, EnqueueFlags::Wake); + if should_preempt_info.is_some() { + let cpu_id = should_preempt_info.unwrap(); + // FIXME: send IPI to set remote CPU's `SHOULD_PREEMPT` if needed. + if cpu_id == this_cpu() { + SHOULD_PREEMPT.set(); + } + } +} + +/// Enqueues a newly built task. +/// +/// Note that the new task is not guranteed to run at once. +pub(super) fn run_new_task(runnable: Arc) { + // FIXME: remove this check for `SCHEDULER`. + // Currently OSTD cannot know whether its user has injected a scheduler. + if !SCHEDULER.is_completed() { + fifo_scheduler::init(); + } + + let should_preempt_info = SCHEDULER + .get() + .unwrap() + .enqueue(runnable, EnqueueFlags::Spawn); + if should_preempt_info.is_some() { + let cpu_id = should_preempt_info.unwrap(); + // FIXME: send IPI to set remote CPU's `SHOULD_PREEMPT` if needed. + if cpu_id == this_cpu() { + SHOULD_PREEMPT.set(); + } + } + + might_preempt(); +} + +/// Dequeues the current task from its runqueue. +/// +/// This should only be called if the current is to exit. +pub(super) fn exit_current() { + reschedule(&mut |local_rq: &mut dyn LocalRunQueue| { + let _ = local_rq.dequeue_current(); + if let Some(next_task) = local_rq.pick_next_current() { + ReschedAction::SwitchTo(next_task.clone()) + } else { + ReschedAction::Retry + } + }) +} + +/// Yields execution. +pub(super) fn yield_now() { + reschedule(&mut |local_rq| { + local_rq.update_current(UpdateFlags::Yield); + + if let Some(next_task) = local_rq.pick_next_current() { + ReschedAction::SwitchTo(next_task.clone()) + } else { + ReschedAction::DoNothing + } + }) +} + +/// Do rescheduling by acting on the scheduling decision (`ReschedAction`) made by a +/// user-given closure. +/// +/// The closure makes the scheduling decision by taking the local runqueue has its input. +fn reschedule(f: &mut F) +where + F: FnMut(&mut dyn LocalRunQueue) -> ReschedAction, +{ + let next_task = loop { + let mut action = ReschedAction::DoNothing; + SCHEDULER.get().unwrap().local_mut_rq_with(&mut |rq| { + action = f(rq); + }); + + match action { + ReschedAction::DoNothing => { + return; + } + ReschedAction::Retry => { + continue; + } + ReschedAction::SwitchTo(next_task) => { + break next_task; + } + }; + }; + + SHOULD_PREEMPT.clear(); + processor::switch_to_task(next_task); +} + +/// Possible actions of a rescheduling. +enum ReschedAction { + /// Keep running current task and do nothing. + DoNothing, + /// Loop until finding a task to swap out the current. + Retry, + /// Switch to target task. + SwitchTo(Arc), +} + +static SHOULD_PREEMPT: ShouldPreemptFlag = ShouldPreemptFlag::new(); + +struct ShouldPreemptFlag {} + +impl ShouldPreemptFlag { + const SHIFT: u8 = 31; + + const MASK: u32 = 1 << Self::SHIFT; + + const fn new() -> Self { + Self {} + } + + fn set(&self) { + PREEMPT_INFO.bitand_assign(!Self::MASK); + } + + fn clear(&self) { + PREEMPT_INFO.bitor_assign(Self::MASK); + } +} diff --git a/ostd/src/task/task.rs b/ostd/src/task/task/mod.rs similarity index 85% rename from ostd/src/task/task.rs rename to ostd/src/task/task/mod.rs index e875678f..4d5a2390 100644 --- a/ostd/src/task/task.rs +++ b/ostd/src/task/task/mod.rs @@ -4,22 +4,23 @@ // So we temporary allow missing_docs for this module. #![allow(missing_docs)] -use alloc::{boxed::Box, sync::Arc}; -use core::{any::Any, cell::UnsafeCell}; +mod priority; + +use core::{ + any::Any, + cell::UnsafeCell, + sync::atomic::{AtomicU32, Ordering}, +}; use intrusive_collections::{intrusive_adapter, LinkedListAtomicLink}; +pub use priority::Priority; -use super::{ - add_task, - priority::Priority, - processor::{current_task, schedule}, -}; +use super::{processor::current_task, scheduler}; pub(crate) use crate::arch::task::{context_switch, TaskContext}; use crate::{ cpu::CpuSet, mm::{kspace::KERNEL_PAGE_TABLE, FrameAllocOptions, Paddr, PageFlags, Segment, PAGE_SIZE}, prelude::*, - sync::{SpinLock, SpinLockGuard}, user::UserSpace, }; @@ -103,6 +104,41 @@ impl Drop for KernelStack { } } +/// An atomic CPUID container. +pub struct AtomicCpuId(AtomicU32); + +impl AtomicCpuId { + /// The null value of CPUID. + /// + /// An `AtomicCpuId` with `AtomicCpuId::NONE` as its inner value is empty. + const NONE: u32 = u32::MAX; + + fn new(cpu_id: u32) -> Self { + Self(AtomicU32::new(cpu_id)) + } + + /// Sets the inner value of an `AtomicCpuId` if it's empty. + /// + /// The return value is a result indicating whether the new value was written + /// and containing the previous value. + pub fn set_if_is_none(&self, cpu_id: u32) -> core::result::Result { + self.0 + .compare_exchange(Self::NONE, cpu_id, Ordering::Relaxed, Ordering::Relaxed) + } + + /// Sets the inner value of an `AtomicCpuId` to `AtomicCpuId::NONE`, i.e. makes + /// an `AtomicCpuId` empty. + pub fn set_to_none(&self) { + self.0.store(Self::NONE, Ordering::Relaxed); + } +} + +impl Default for AtomicCpuId { + fn default() -> Self { + Self::new(Self::NONE) + } +} + /// A task that executes a function to the end. /// /// Each task is associated with per-task data and an optional user space. @@ -112,11 +148,11 @@ pub struct Task { func: Box, data: Box, user_space: Option>, - task_inner: SpinLock, ctx: UnsafeCell, /// kernel stack, note that the top is SyscallFrame/TrapFrame kstack: KernelStack, link: LinkedListAtomicLink, + cpu: AtomicCpuId, priority: Priority, // TODO: add multiprocessor support #[allow(dead_code)] @@ -130,11 +166,6 @@ intrusive_adapter!(pub TaskAdapter = Arc: Task { link: LinkedListAtomicLin // we have exclusive access to the field. unsafe impl Sync for Task {} -#[derive(Debug)] -pub(crate) struct TaskInner { - pub task_status: TaskStatus, -} - impl Task { /// Gets the current task. /// @@ -143,11 +174,6 @@ impl Task { current_task() } - /// Gets inner - pub(crate) fn inner_exclusive_access(&self) -> SpinLockGuard { - self.task_inner.lock_irq_disabled() - } - pub(super) fn ctx(&self) -> &UnsafeCell { &self.ctx } @@ -157,18 +183,14 @@ impl Task { /// Note that this method cannot be simply named "yield" as the name is /// a Rust keyword. pub fn yield_now() { - schedule(); + scheduler::yield_now() } /// Runs the task. + /// + /// BUG: This method highly depends on the current scheduling policy. pub fn run(self: &Arc) { - add_task(self.clone()); - schedule(); - } - - /// Returns the task status. - pub fn status(&self) -> TaskStatus { - self.task_inner.lock_irq_disabled().task_status + scheduler::run_new_task(self.clone()); } /// Returns the task data. @@ -185,6 +207,16 @@ impl Task { } } + // Returns the cpu of this task. + pub fn cpu(&self) -> &AtomicCpuId { + &self.cpu + } + + /// Returns the priority. + pub fn priority(&self) -> Priority { + self.priority + } + /// Exits the current task. /// /// The task `self` must be the task that is currently running. @@ -192,13 +224,10 @@ impl Task { /// **NOTE:** If there is anything left on the stack, it will be forgotten. This behavior may /// lead to resource leakage. fn exit(self: Arc) -> ! { - self.inner_exclusive_access().task_status = TaskStatus::Exited; - // `current_task()` still holds a strong reference, so nothing is destroyed at this point, // neither is the kernel stack. drop(self); - - schedule(); + scheduler::exit_current(); unreachable!() } @@ -208,19 +237,6 @@ impl Task { } } -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] -/// The status of a task. -pub enum TaskStatus { - /// The task is runnable. - Runnable, - /// The task is running in the foreground but will sleep when it goes to the background. - Sleepy, - /// The task is sleeping in the background. - Sleeping, - /// The task has exited. - Exited, -} - /// Options to create or spawn a new task. pub struct TaskOptions { func: Option>, @@ -236,13 +252,12 @@ impl TaskOptions { where F: Fn() + Send + Sync + 'static, { - let cpu_affinity = CpuSet::new_full(); Self { func: Some(Box::new(func)), data: None, user_space: None, priority: Priority::normal(), - cpu_affinity, + cpu_affinity: CpuSet::new_full(), } } @@ -300,11 +315,9 @@ impl TaskOptions { func: self.func.unwrap(), data: self.data.unwrap(), user_space: self.user_space, - task_inner: SpinLock::new(TaskInner { - task_status: TaskStatus::Runnable, - }), ctx: UnsafeCell::new(TaskContext::default()), kstack: KernelStack::new_with_guard_page()?, + cpu: AtomicCpuId::default(), link: LinkedListAtomicLink::new(), priority: self.priority, cpu_affinity: self.cpu_affinity, diff --git a/ostd/src/task/priority.rs b/ostd/src/task/task/priority.rs similarity index 96% rename from ostd/src/task/priority.rs rename to ostd/src/task/task/priority.rs index 4bbad8c5..fd5b3ebe 100644 --- a/ostd/src/task/priority.rs +++ b/ostd/src/task/task/priority.rs @@ -7,7 +7,7 @@ pub const REAL_TIME_TASK_PRIORITY: u16 = 100; /// Similar to Linux, a larger value represents a lower priority, /// with a range of 0 to 139. Priorities ranging from 0 to 99 are considered real-time, /// while those ranging from 100 to 139 are considered normal. -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Eq, Ord, PartialEq, PartialOrd)] pub struct Priority(u16); impl Priority {