Implement new scheduling API in OSTD

This commit is contained in:
jellllly420
2024-07-27 18:38:14 +08:00
committed by Tate, Hongliang Tian
parent 4844e7ca7c
commit f5464e82c0
14 changed files with 727 additions and 329 deletions

View File

@ -1,56 +1,234 @@
// SPDX-License-Identifier: MPL-2.0
use intrusive_collections::LinkedList;
use ostd::task::{set_scheduler, Scheduler, Task, TaskAdapter};
use ostd::{
cpu::{num_cpus, this_cpu},
task::{
scheduler::{inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, UpdateFlags},
AtomicCpuId, Priority, Task,
},
};
use crate::prelude::*;
pub fn init() {
let preempt_scheduler = Box::new(PreemptScheduler::new());
let scheduler = Box::<PreemptScheduler>::leak(preempt_scheduler);
set_scheduler(scheduler);
let preempt_scheduler = Box::new(PreemptScheduler::default());
let scheduler = Box::<PreemptScheduler<Task>>::leak(preempt_scheduler);
inject_scheduler(scheduler);
}
/// The preempt scheduler
/// The preempt scheduler.
///
/// Real-time tasks are placed in the `real_time_tasks` queue and
/// Real-time tasks are placed in the `real_time_entities` queue and
/// are always prioritized during scheduling.
/// Normal tasks are placed in the `normal_tasks` queue and are only
/// Normal tasks are placed in the `normal_entities` queue and are only
/// scheduled for execution when there are no real-time tasks.
struct PreemptScheduler {
/// Tasks with a priority of less than 100 are regarded as real-time tasks.
real_time_tasks: SpinLock<LinkedList<TaskAdapter>>,
/// Tasks with a priority greater than or equal to 100 are regarded as normal tasks.
normal_tasks: SpinLock<LinkedList<TaskAdapter>>,
struct PreemptScheduler<T: PreemptSchedInfo> {
rq: Vec<SpinLock<PreemptRunQueue<T>>>,
}
impl PreemptScheduler {
impl<T: PreemptSchedInfo> PreemptScheduler<T> {
fn new(nr_cpus: u32) -> Self {
let mut rq = Vec::with_capacity(nr_cpus as usize);
for _ in 0..nr_cpus {
rq.push(SpinLock::new(PreemptRunQueue::new()));
}
Self { rq }
}
/// Selects a cpu for task to run on.
fn select_cpu(&self, _runnable: &Arc<T>) -> u32 {
// FIXME: adopt more reasonable policy once we fully enable SMP.
0
}
}
impl<T: Sync + Send + PreemptSchedInfo> Scheduler<T> for PreemptScheduler<T> {
fn enqueue(&self, runnable: Arc<T>, flags: EnqueueFlags) -> Option<u32> {
let mut still_in_rq = false;
let target_cpu = {
let mut cpu_id = self.select_cpu(&runnable);
if let Err(task_cpu_id) = runnable.cpu().set_if_is_none(cpu_id) {
debug_assert!(flags != EnqueueFlags::Spawn);
still_in_rq = true;
cpu_id = task_cpu_id;
}
cpu_id
};
let mut rq = self.rq[target_cpu as usize].lock_irq_disabled();
if still_in_rq && let Err(_) = runnable.cpu().set_if_is_none(target_cpu) {
return None;
}
let entity = PreemptSchedEntity::new(runnable);
if entity.is_real_time() {
rq.real_time_entities.push_back(entity);
} else {
rq.normal_entities.push_back(entity);
}
Some(target_cpu)
}
fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue<T>)) {
let local_rq: &PreemptRunQueue<T> = &self.rq[this_cpu() as usize].lock_irq_disabled();
f(local_rq);
}
fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue<T>)) {
let local_rq: &mut PreemptRunQueue<T> =
&mut self.rq[this_cpu() as usize].lock_irq_disabled();
f(local_rq);
}
}
impl Default for PreemptScheduler<Task> {
fn default() -> Self {
Self::new(num_cpus())
}
}
struct PreemptRunQueue<T: PreemptSchedInfo> {
current: Option<PreemptSchedEntity<T>>,
real_time_entities: VecDeque<PreemptSchedEntity<T>>,
normal_entities: VecDeque<PreemptSchedEntity<T>>,
}
impl<T: PreemptSchedInfo> PreemptRunQueue<T> {
pub fn new() -> Self {
Self {
real_time_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())),
normal_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())),
current: None,
real_time_entities: VecDeque::new(),
normal_entities: VecDeque::new(),
}
}
}
impl Scheduler for PreemptScheduler {
fn enqueue(&self, task: Arc<Task>) {
if task.is_real_time() {
self.real_time_tasks.lock_irq_disabled().push_back(task);
} else {
self.normal_tasks.lock_irq_disabled().push_back(task);
impl<T: Sync + Send + PreemptSchedInfo> LocalRunQueue<T> for PreemptRunQueue<T> {
fn current(&self) -> Option<&Arc<T>> {
self.current.as_ref().map(|entity| &entity.runnable)
}
fn update_current(&mut self, flags: UpdateFlags) -> bool {
match flags {
UpdateFlags::Tick => {
let Some(ref mut current_entity) = self.current else {
return false;
};
current_entity.tick()
|| (!current_entity.is_real_time() && !self.real_time_entities.is_empty())
}
_ => true,
}
}
fn dequeue(&self) -> Option<Arc<Task>> {
if !self.real_time_tasks.lock_irq_disabled().is_empty() {
self.real_time_tasks.lock_irq_disabled().pop_front()
fn pick_next_current(&mut self) -> Option<&Arc<T>> {
let next_entity = if !self.real_time_entities.is_empty() {
self.real_time_entities.pop_front()
} else {
self.normal_tasks.lock_irq_disabled().pop_front()
self.normal_entities.pop_front()
}?;
if let Some(prev_entity) = self.current.replace(next_entity) {
if prev_entity.is_real_time() {
self.real_time_entities.push_back(prev_entity);
} else {
self.normal_entities.push_back(prev_entity);
}
}
Some(&self.current.as_ref().unwrap().runnable)
}
fn should_preempt(&self, task: &Arc<Task>) -> bool {
!task.is_real_time() && !self.real_time_tasks.lock_irq_disabled().is_empty()
fn dequeue_current(&mut self) -> Option<Arc<T>> {
self.current.take().map(|entity| {
let runnable = entity.runnable;
runnable.cpu().set_to_none();
runnable
})
}
}
struct PreemptSchedEntity<T: PreemptSchedInfo> {
runnable: Arc<T>,
time_slice: TimeSlice,
}
impl<T: PreemptSchedInfo> PreemptSchedEntity<T> {
fn new(runnable: Arc<T>) -> Self {
Self {
runnable,
time_slice: TimeSlice::default(),
}
}
fn is_real_time(&self) -> bool {
self.runnable.is_real_time()
}
fn tick(&mut self) -> bool {
self.time_slice.elapse()
}
}
impl<T: PreemptSchedInfo> Clone for PreemptSchedEntity<T> {
fn clone(&self) -> Self {
Self {
runnable: self.runnable.clone(),
time_slice: self.time_slice,
}
}
}
#[derive(Clone, Copy)]
pub struct TimeSlice {
elapsed_ticks: u32,
}
impl TimeSlice {
const DEFAULT_TIME_SLICE: u32 = 100;
pub const fn new() -> Self {
TimeSlice { elapsed_ticks: 0 }
}
pub fn elapse(&mut self) -> bool {
self.elapsed_ticks = (self.elapsed_ticks + 1) % Self::DEFAULT_TIME_SLICE;
self.elapsed_ticks == 0
}
}
impl Default for TimeSlice {
fn default() -> Self {
Self::new()
}
}
impl PreemptSchedInfo for Task {
type PRIORITY = Priority;
const REAL_TIME_TASK_PRIORITY: Self::PRIORITY = Priority::new(100);
fn priority(&self) -> Self::PRIORITY {
self.priority()
}
fn cpu(&self) -> &AtomicCpuId {
self.cpu()
}
}
trait PreemptSchedInfo {
type PRIORITY: Ord + PartialOrd + Eq + PartialEq;
const REAL_TIME_TASK_PRIORITY: Self::PRIORITY;
fn priority(&self) -> Self::PRIORITY;
fn cpu(&self) -> &AtomicCpuId;
fn is_real_time(&self) -> bool {
self.priority() < Self::REAL_TIME_TASK_PRIORITY
}
}

View File

@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0
use ostd::{
task::{preempt, Task, TaskOptions},
task::{Task, TaskOptions},
user::{ReturnReason, UserContextApi, UserMode, UserSpace},
};
@ -84,8 +84,6 @@ pub fn create_new_user_task(user_space: Arc<UserSpace>, thread_ref: Weak<Thread>
debug!("exit due to signal");
break;
}
// a preemption point after handling user event.
preempt(current_task);
}
debug!("exit user loop");
}

View File

@ -7,11 +7,7 @@ use core::{
time::Duration,
};
use ostd::{
cpu::CpuSet,
sync::WaitQueue,
task::{add_task, Priority},
};
use ostd::{cpu::CpuSet, sync::WaitQueue, task::Priority};
use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue};
use crate::{
@ -81,7 +77,7 @@ impl LocalWorkerPool {
fn add_worker(&self) {
let worker = Worker::new(self.parent.clone(), self.cpu_id);
self.workers.lock_irq_disabled().push_back(worker.clone());
add_task(worker.bound_thread().task().clone());
worker.bound_thread().run();
}
fn remove_worker(&self) {

View File

@ -17,6 +17,7 @@ use trapframe::UserContext as RawUserContext;
use x86_64::registers::rflags::RFlags;
use crate::{
task::scheduler,
trap::call_irq_callback_functions,
user::{ReturnReason, UserContextApi, UserContextApiInternal},
};
@ -50,32 +51,6 @@ pub struct CpuExceptionInfo {
pub page_fault_addr: usize,
}
/// User Preemption.
pub struct UserPreemption {
count: u32,
}
impl UserPreemption {
const PREEMPTION_INTERVAL: u32 = 100;
/// Creates a new instance of `UserPreemption`.
#[allow(clippy::new_without_default)]
pub const fn new() -> Self {
UserPreemption { count: 0 }
}
/// Checks if preemption might occur and takes necessary actions.
pub fn might_preempt(&mut self) {
self.count = (self.count + 1) % Self::PREEMPTION_INTERVAL;
if self.count == 0 {
crate::arch::irq::enable_local();
crate::task::schedule();
crate::arch::irq::disable_local();
}
}
}
impl UserContext {
/// Returns a reference to the general registers.
pub fn general_regs(&self) -> &RawGeneralRegs {
@ -115,9 +90,9 @@ impl UserContextApiInternal for UserContext {
let return_reason: ReturnReason;
const SYSCALL_TRAPNUM: u16 = 0x100;
let mut user_preemption = UserPreemption::new();
// return when it is syscall or cpu exception type is Fault or Trap.
loop {
scheduler::might_preempt();
self.user_context.run();
match CpuException::to_cpu_exception(self.user_context.trap_num as u16) {
Some(exception) => {
@ -146,8 +121,6 @@ impl UserContextApiInternal for UserContext {
return_reason = ReturnReason::KernelEvent;
break;
}
user_preemption.might_preempt();
}
crate::arch::irq::enable_local();

View File

@ -129,7 +129,7 @@ pub fn call_ostd_main() -> ! {
unsafe {
use alloc::boxed::Box;
use crate::task::{set_scheduler, FifoScheduler, Scheduler, TaskOptions};
use crate::task::TaskOptions;
crate::init();
// The whitelists that will be generated by OSDK runner as static consts.
@ -137,10 +137,6 @@ pub fn call_ostd_main() -> ! {
static KTEST_TEST_WHITELIST: Option<&'static [&'static str]>;
static KTEST_CRATE_WHITELIST: Option<&'static [&'static str]>;
}
// Set the global scheduler a FIFO scheduler.
let simple_scheduler = Box::new(FifoScheduler::new());
let static_scheduler: &'static dyn Scheduler = Box::leak(simple_scheduler);
set_scheduler(static_scheduler);
let test_task = move || {
run_ktests(KTEST_TEST_WHITELIST, KTEST_CRATE_WHITELIST);

View File

@ -60,19 +60,20 @@ extern "C" {
}
cpu_local_cell! {
/// The count of the preempt lock.
/// A 4-byte preemption information consisting of a should_preempt flag at
/// the highest bit and a preemption counter in the lower 31 bits.
///
/// We need to access the preemption count before we can copy the section
/// for application processors. So, the preemption count is not copied from
/// We need to access the preemption info before we can copy the section
/// for application processors. So, the preemption info is not copied from
/// bootstrap processor's section as the initialization. Instead it is
/// initialized to zero for application processors.
pub(crate) static PREEMPT_LOCK_COUNT: u32 = 0;
pub(crate) static PREEMPT_INFO: u32 = 0;
}
/// Sets the base address of the CPU-local storage for the bootstrap processor.
///
/// It should be called early to let [`crate::task::disable_preempt`] work,
/// which needs to update a CPU-local preempt lock count. Otherwise it may
/// which needs to update a CPU-local preemption info. Otherwise it may
/// panic when calling [`crate::task::disable_preempt`].
///
/// # Safety
@ -133,16 +134,16 @@ pub unsafe fn init_on_bsp() {
(ap_pages_ptr as *mut u32).write(cpu_i);
}
// SAFETY: the `PREEMPT_LOCK_COUNT` may be dirty on the BSP, so we need
// SAFETY: the `PREEMPT_INFO` may be dirty on the BSP, so we need
// to ensure that it is initialized to zero for APs. The safety
// requirements are met since the static is defined in the `.cpu_local`
// section and the pointer to that static is the offset in the CPU-
// local area. It is a `usize` so it is safe to be overwritten.
unsafe {
let preempt_count_ptr = &PREEMPT_LOCK_COUNT as *const _ as usize;
let preempt_count_offset = preempt_count_ptr - __cpu_local_start as usize;
let ap_preempt_count_ptr = ap_pages_ptr.add(preempt_count_offset) as *mut usize;
ap_preempt_count_ptr.write(0);
let preempt_info_ptr = &PREEMPT_INFO as *const _ as usize;
let preempt_info_offset = preempt_info_ptr - __cpu_local_start as usize;
let ap_preempt_info_ptr = ap_pages_ptr.add(preempt_info_offset) as *mut usize;
ap_preempt_info_ptr.write(0);
}
// SAFETY: bytes `8:16` are reserved for storing the pointer to the

View File

@ -4,7 +4,7 @@ use alloc::{collections::VecDeque, sync::Arc};
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use super::SpinLock;
use crate::task::{add_task, schedule, Task, TaskStatus};
use crate::task::{scheduler, Task};
// # Explanation on the memory orders
//
@ -255,36 +255,15 @@ impl Waker {
if self.has_woken.swap(true, Ordering::Release) {
return false;
}
let mut task = self.task.inner_exclusive_access();
match task.task_status {
TaskStatus::Sleepy => {
task.task_status = TaskStatus::Runnable;
}
TaskStatus::Sleeping => {
task.task_status = TaskStatus::Runnable;
// Avoid holding the lock when doing `add_task`
drop(task);
add_task(self.task.clone());
}
_ => (),
}
scheduler::unpark_target(self.task.clone());
true
}
fn do_wait(&self) {
while !self.has_woken.swap(false, Ordering::Acquire) {
let mut task = self.task.inner_exclusive_access();
// After holding the lock, check again to avoid races
if self.has_woken.swap(false, Ordering::Acquire) {
break;
}
task.task_status = TaskStatus::Sleepy;
drop(task);
schedule();
let has_woken = &self.has_woken;
while !has_woken.swap(false, Ordering::Acquire) {
scheduler::park_current(has_woken);
}
}

View File

@ -2,15 +2,12 @@
//! Tasks are the unit of code execution.
mod priority;
mod processor;
mod scheduler;
pub mod scheduler;
#[allow(clippy::module_inception)]
mod task;
pub use self::{
priority::Priority,
processor::{disable_preempt, preempt, schedule, DisablePreemptGuard},
scheduler::{add_task, set_scheduler, FifoScheduler, Scheduler},
task::{Task, TaskAdapter, TaskContextApi, TaskOptions, TaskStatus},
processor::{disable_preempt, DisablePreemptGuard},
task::{AtomicCpuId, Priority, Task, TaskAdapter, TaskContextApi, TaskOptions},
};

View File

@ -2,12 +2,8 @@
use alloc::sync::Arc;
use super::{
scheduler::{fetch_task, GLOBAL_SCHEDULER},
task::{context_switch, TaskContext},
Task, TaskStatus,
};
use crate::{cpu::local::PREEMPT_LOCK_COUNT, cpu_local_cell};
use super::task::{context_switch, Task, TaskContext};
use crate::{cpu::local::PREEMPT_INFO, cpu_local_cell};
cpu_local_cell! {
/// The `Arc<Task>` (casted by [`Arc::into_raw`]) that is the current task.
@ -37,52 +33,19 @@ pub(super) fn current_task() -> Option<Arc<Task>> {
Some(restored)
}
/// Calls this function to switch to other task by using GLOBAL_SCHEDULER
pub fn schedule() {
if let Some(task) = fetch_task() {
switch_to_task(task);
}
}
/// Preempts the `task`.
///
/// TODO: This interface of this method is error prone.
/// The method takes an argument for the current task to optimize its efficiency,
/// but the argument provided by the caller may not be the current task, really.
/// Thus, this method should be removed or reworked in the future.
pub fn preempt(task: &Arc<Task>) {
// TODO: Refactor `preempt` and `schedule`
// after the Atomic mode and `might_break` is enabled.
let mut scheduler = GLOBAL_SCHEDULER.lock_irq_disabled();
if !scheduler.should_preempt(task) {
return;
}
let Some(next_task) = scheduler.dequeue() else {
return;
};
drop(scheduler);
switch_to_task(next_task);
}
/// Calls this function to switch to other task
///
/// If current task is none, then it will use the default task context and it
/// will not return to this function again.
///
/// If the current task's status not [`TaskStatus::Runnable`], it will not be
/// added to the scheduler.
///
/// # Panics
///
/// This function will panic if called while holding preemption locks or with
/// local IRQ disabled.
fn switch_to_task(next_task: Arc<Task>) {
let preemt_lock_count = PREEMPT_LOCK_COUNT.load();
if preemt_lock_count != 0 {
panic!(
"Calling schedule() while holding {} locks",
preemt_lock_count
);
pub(super) fn switch_to_task(next_task: Arc<Task>) {
let preemt_count = PREEMPT_COUNT.get();
if preemt_count != 0 {
panic!("Switching task while holding {} locks", preemt_count);
}
assert!(
@ -93,7 +56,6 @@ fn switch_to_task(next_task: Arc<Task>) {
let irq_guard = crate::trap::disable_local();
let current_task_ptr = CURRENT_TASK_PTR.load();
let current_task_ctx_ptr = if current_task_ptr.is_null() {
// SAFETY: Interrupts are disabled, so the pointer is safe to be fetched.
unsafe { BOOTSTRAP_CONTEXT.as_ptr_mut() }
@ -104,24 +66,12 @@ fn switch_to_task(next_task: Arc<Task>) {
let _ = core::mem::ManuallyDrop::new(restored.clone());
restored
};
let ctx_ptr = cur_task_arc.ctx().get();
let mut task_inner = cur_task_arc.inner_exclusive_access();
debug_assert_ne!(task_inner.task_status, TaskStatus::Sleeping);
if task_inner.task_status == TaskStatus::Runnable {
drop(task_inner);
GLOBAL_SCHEDULER.lock().enqueue(cur_task_arc);
} else if task_inner.task_status == TaskStatus::Sleepy {
task_inner.task_status = TaskStatus::Sleeping;
}
ctx_ptr
};
let next_task_ctx_ptr = next_task.ctx().get().cast_const();
if let Some(next_user_space) = next_task.user_space() {
next_user_space.vm_space().activate();
}
@ -144,7 +94,7 @@ fn switch_to_task(next_task: Arc<Task>) {
drop(irq_guard);
// SAFETY:
// 1. `ctx` is only used in `schedule()`. We have exclusive access to both the current task
// 1. `ctx` is only used in `reschedule()`. We have exclusive access to both the current task
// context and the next task context.
// 2. The next task context is a valid task context.
unsafe {
@ -159,6 +109,34 @@ fn switch_to_task(next_task: Arc<Task>) {
// to the next task switching.
}
static PREEMPT_COUNT: PreemptCount = PreemptCount::new();
struct PreemptCount {}
impl PreemptCount {
const SHIFT: u8 = 0;
const BITS: u8 = 31;
const MASK: u32 = ((1 << Self::BITS) - 1) << Self::SHIFT;
const fn new() -> Self {
Self {}
}
fn inc(&self) {
PREEMPT_INFO.add_assign(1 << Self::SHIFT);
}
fn dec(&self) {
PREEMPT_INFO.sub_assign(1 << Self::SHIFT);
}
fn get(&self) -> u32 {
PREEMPT_INFO.load() & Self::MASK
}
}
/// A guard for disable preempt.
#[clippy::has_significant_drop]
#[must_use]
@ -171,7 +149,7 @@ impl !Send for DisablePreemptGuard {}
impl DisablePreemptGuard {
fn new() -> Self {
PREEMPT_LOCK_COUNT.add_assign(1);
PREEMPT_COUNT.inc();
Self { _private: () }
}
@ -184,7 +162,7 @@ impl DisablePreemptGuard {
impl Drop for DisablePreemptGuard {
fn drop(&mut self) {
PREEMPT_LOCK_COUNT.sub_assign(1);
PREEMPT_COUNT.dec();
}
}

View File

@ -1,107 +0,0 @@
// SPDX-License-Identifier: MPL-2.0
#![allow(dead_code)]
use alloc::collections::VecDeque;
use crate::{prelude::*, sync::SpinLock, task::Task};
static DEFAULT_SCHEDULER: FifoScheduler = FifoScheduler::new();
pub(crate) static GLOBAL_SCHEDULER: SpinLock<GlobalScheduler> = SpinLock::new(GlobalScheduler {
scheduler: &DEFAULT_SCHEDULER,
});
/// A scheduler for tasks.
///
/// An implementation of scheduler can attach scheduler-related information
/// with the `TypeMap` returned from `task.data()`.
pub trait Scheduler: Sync + Send {
/// Enqueues a task to the scheduler.
fn enqueue(&self, task: Arc<Task>);
/// Dequeues a task from the scheduler.
fn dequeue(&self) -> Option<Arc<Task>>;
/// Tells whether the given task should be preempted by other tasks in the queue.
fn should_preempt(&self, task: &Arc<Task>) -> bool;
}
pub struct GlobalScheduler {
scheduler: &'static dyn Scheduler,
}
impl GlobalScheduler {
pub const fn new(scheduler: &'static dyn Scheduler) -> Self {
Self { scheduler }
}
/// dequeue a task using scheduler
/// require the scheduler is not none
pub fn dequeue(&mut self) -> Option<Arc<Task>> {
self.scheduler.dequeue()
}
/// enqueue a task using scheduler
/// require the scheduler is not none
pub fn enqueue(&mut self, task: Arc<Task>) {
self.scheduler.enqueue(task)
}
pub fn should_preempt(&self, task: &Arc<Task>) -> bool {
self.scheduler.should_preempt(task)
}
}
/// Sets the global task scheduler.
///
/// This must be called before invoking `Task::spawn`.
pub fn set_scheduler(scheduler: &'static dyn Scheduler) {
let mut global_scheduler = GLOBAL_SCHEDULER.lock_irq_disabled();
// When setting a new scheduler, the old scheduler should be empty
assert!(global_scheduler.dequeue().is_none());
global_scheduler.scheduler = scheduler;
}
pub fn fetch_task() -> Option<Arc<Task>> {
GLOBAL_SCHEDULER.lock_irq_disabled().dequeue()
}
/// Adds a task to the global scheduler.
pub fn add_task(task: Arc<Task>) {
GLOBAL_SCHEDULER.lock_irq_disabled().enqueue(task);
}
/// A simple FIFO (First-In-First-Out) task scheduler.
pub struct FifoScheduler {
/// A thread-safe queue to hold tasks waiting to be executed.
task_queue: SpinLock<VecDeque<Arc<Task>>>,
}
impl FifoScheduler {
/// Creates a new instance of `FifoScheduler`.
pub const fn new() -> Self {
FifoScheduler {
task_queue: SpinLock::new(VecDeque::new()),
}
}
}
impl Default for FifoScheduler {
fn default() -> Self {
Self::new()
}
}
impl Scheduler for FifoScheduler {
/// Enqueues a task to the end of the queue.
fn enqueue(&self, task: Arc<Task>) {
self.task_queue.lock_irq_disabled().push_back(task);
}
/// Dequeues a task from the front of the queue, if any.
fn dequeue(&self) -> Option<Arc<Task>> {
self.task_queue.lock_irq_disabled().pop_front()
}
/// In this simple implementation, task preemption is not supported.
/// Once a task starts running, it will continue to run until completion.
fn should_preempt(&self, _task: &Arc<Task>) -> bool {
false
}
}

View File

@ -0,0 +1,125 @@
// SPDX-License-Identifier: MPL-2.0
use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
use super::{inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, UpdateFlags};
use crate::{
cpu::{num_cpus, this_cpu},
sync::SpinLock,
task::{AtomicCpuId, Task},
};
pub fn init() {
let fifo_scheduler = Box::new(FifoScheduler::default());
let scheduler = Box::<FifoScheduler<Task>>::leak(fifo_scheduler);
inject_scheduler(scheduler);
}
/// A simple FIFO (First-In-First-Out) task scheduler.
struct FifoScheduler<T: FifoSchedInfo> {
/// A thread-safe queue to hold tasks waiting to be executed.
rq: Vec<SpinLock<FifoRunQueue<T>>>,
}
impl<T: FifoSchedInfo> FifoScheduler<T> {
/// Creates a new instance of `FifoScheduler`.
fn new(nr_cpus: u32) -> Self {
let mut rq = Vec::new();
for _ in 0..nr_cpus {
rq.push(SpinLock::new(FifoRunQueue::new()));
}
Self { rq }
}
fn select_cpu(&self) -> u32 {
// FIXME: adopt more reasonable policy once we fully enable SMP.
0
}
}
impl<T: FifoSchedInfo + Send + Sync> Scheduler<T> for FifoScheduler<T> {
fn enqueue(&self, runnable: Arc<T>, flags: EnqueueFlags) -> Option<u32> {
let mut still_in_rq = false;
let target_cpu = {
let mut cpu_id = self.select_cpu();
if let Err(task_cpu_id) = runnable.cpu().set_if_is_none(cpu_id) {
debug_assert!(flags != EnqueueFlags::Spawn);
still_in_rq = true;
cpu_id = task_cpu_id;
}
cpu_id
};
let mut rq = self.rq[target_cpu as usize].lock_irq_disabled();
if still_in_rq && let Err(_) = runnable.cpu().set_if_is_none(target_cpu) {
return None;
}
rq.queue.push_back(runnable);
Some(target_cpu)
}
fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue<T>)) {
let local_rq: &FifoRunQueue<T> = &self.rq[this_cpu() as usize].lock_irq_disabled();
f(local_rq);
}
fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue<T>)) {
let local_rq: &mut FifoRunQueue<T> = &mut self.rq[this_cpu() as usize].lock_irq_disabled();
f(local_rq);
}
}
struct FifoRunQueue<T: FifoSchedInfo> {
current: Option<Arc<T>>,
queue: VecDeque<Arc<T>>,
}
impl<T: FifoSchedInfo> FifoRunQueue<T> {
pub const fn new() -> Self {
Self {
current: None,
queue: VecDeque::new(),
}
}
}
impl<T: FifoSchedInfo> LocalRunQueue<T> for FifoRunQueue<T> {
fn current(&self) -> Option<&Arc<T>> {
self.current.as_ref()
}
fn update_current(&mut self, flags: super::UpdateFlags) -> bool {
!matches!(flags, UpdateFlags::Tick)
}
fn pick_next_current(&mut self) -> Option<&Arc<T>> {
let next_task = self.queue.pop_front()?;
if let Some(prev_task) = self.current.replace(next_task) {
self.queue.push_back(prev_task);
}
self.current.as_ref()
}
fn dequeue_current(&mut self) -> Option<Arc<T>> {
self.current.take().inspect(|task| task.cpu().set_to_none())
}
}
impl Default for FifoScheduler<Task> {
fn default() -> Self {
Self::new(num_cpus())
}
}
impl FifoSchedInfo for Task {
fn cpu(&self) -> &AtomicCpuId {
self.cpu()
}
}
trait FifoSchedInfo {
fn cpu(&self) -> &AtomicCpuId;
}

View File

@ -0,0 +1,271 @@
// SPDX-License-Identifier: MPL-2.0
//! Scheduling subsystem (in-OSTD part).
//!
//! This module defines what OSTD expects from a scheduling implementation
//! and provides useful functions for controlling the execution flow.
mod fifo_scheduler;
use core::sync::atomic::{AtomicBool, Ordering};
use spin::Once;
use super::{processor, task::Task};
use crate::{
arch::timer,
cpu::{local::PREEMPT_INFO, this_cpu},
prelude::*,
};
/// Injects a scheduler implementation into framework.
///
/// This function can only be called once and must be called during the initialization of kernel.
pub fn inject_scheduler(scheduler: &'static dyn Scheduler<Task>) {
SCHEDULER.call_once(|| scheduler);
timer::register_callback(|| {
SCHEDULER.get().unwrap().local_mut_rq_with(&mut |local_rq| {
if local_rq.update_current(UpdateFlags::Tick) {
SHOULD_PREEMPT.set();
}
})
});
}
static SCHEDULER: Once<&'static dyn Scheduler<Task>> = Once::new();
/// A per-CPU task scheduler.
pub trait Scheduler<T = Task>: Sync + Send {
/// Enqueues a runnable task.
///
/// Scheduler developers can perform load-balancing or some accounting work here.
///
/// If the `current` of a CPU should be preempted, this method returns the id of
/// that CPU.
fn enqueue(&self, runnable: Arc<T>, flags: EnqueueFlags) -> Option<u32>;
/// Gets an immutable access to the local runqueue of the current CPU core.
fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue<T>));
/// Gets a mutable access to the local runqueue of the current CPU core.
fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue<T>));
}
/// The _local_ view of a per-CPU runqueue.
///
/// This local view provides the interface for the runqueue of a CPU core
/// to be inspected and manipulated by the code running on this particular CPU core.
///
/// Conceptually, a local runqueue consists of two parts:
/// (1) a priority queue of runnable tasks;
/// (2) the current running task.
/// The exact definition of "priority" is left for the concrete implementation to decide.
pub trait LocalRunQueue<T = Task> {
/// Gets the current runnable task.
fn current(&self) -> Option<&Arc<T>>;
/// Updates the current runnable task's scheduling statistics and potentially its
/// position in the queue.
///
/// If the current runnable task should be preempted, the method returns `true`.
fn update_current(&mut self, flags: UpdateFlags) -> bool;
/// Picks the next current runnable task.
///
/// This method returns the chosen next current runnable task. If there is no
/// candidate for next current runnable task, this method returns `None`.
fn pick_next_current(&mut self) -> Option<&Arc<T>>;
/// Removes the current runnable task from runqueue.
///
/// This method returns the current runnable task. If there is no current runnable
/// task, this method returns `None`.
fn dequeue_current(&mut self) -> Option<Arc<T>>;
}
/// Possible triggers of an `enqueue` action.
#[derive(PartialEq, Copy, Clone)]
pub enum EnqueueFlags {
/// Spawn a new task.
Spawn,
/// Wake a sleeping task.
Wake,
}
/// Possible triggers of an `update_current` action.
#[derive(PartialEq, Copy, Clone)]
pub enum UpdateFlags {
/// Timer interrupt.
Tick,
/// Task waiting.
Wait,
/// Task yielding.
Yield,
}
/// Preempts the current task.
pub(crate) fn might_preempt() {
fn preempt_check() -> bool {
PREEMPT_INFO.load() == 0
}
if !preempt_check() {
return;
}
yield_now();
}
/// Blocks the current task unless `has_woken` is `true`.
pub(crate) fn park_current(has_woken: &AtomicBool) {
let mut current = None;
let mut is_first_try = true;
reschedule(&mut |local_rq: &mut dyn LocalRunQueue| {
if is_first_try {
if has_woken.load(Ordering::Acquire) {
return ReschedAction::DoNothing;
}
current = local_rq.dequeue_current();
local_rq.update_current(UpdateFlags::Wait);
}
if let Some(next_task) = local_rq.pick_next_current() {
if Arc::ptr_eq(current.as_ref().unwrap(), next_task) {
return ReschedAction::DoNothing;
}
ReschedAction::SwitchTo(next_task.clone())
} else {
is_first_try = false;
ReschedAction::Retry
}
});
}
/// Unblocks a target task.
pub(crate) fn unpark_target(runnable: Arc<Task>) {
let should_preempt_info = SCHEDULER
.get()
.unwrap()
.enqueue(runnable, EnqueueFlags::Wake);
if should_preempt_info.is_some() {
let cpu_id = should_preempt_info.unwrap();
// FIXME: send IPI to set remote CPU's `SHOULD_PREEMPT` if needed.
if cpu_id == this_cpu() {
SHOULD_PREEMPT.set();
}
}
}
/// Enqueues a newly built task.
///
/// Note that the new task is not guranteed to run at once.
pub(super) fn run_new_task(runnable: Arc<Task>) {
// FIXME: remove this check for `SCHEDULER`.
// Currently OSTD cannot know whether its user has injected a scheduler.
if !SCHEDULER.is_completed() {
fifo_scheduler::init();
}
let should_preempt_info = SCHEDULER
.get()
.unwrap()
.enqueue(runnable, EnqueueFlags::Spawn);
if should_preempt_info.is_some() {
let cpu_id = should_preempt_info.unwrap();
// FIXME: send IPI to set remote CPU's `SHOULD_PREEMPT` if needed.
if cpu_id == this_cpu() {
SHOULD_PREEMPT.set();
}
}
might_preempt();
}
/// Dequeues the current task from its runqueue.
///
/// This should only be called if the current is to exit.
pub(super) fn exit_current() {
reschedule(&mut |local_rq: &mut dyn LocalRunQueue| {
let _ = local_rq.dequeue_current();
if let Some(next_task) = local_rq.pick_next_current() {
ReschedAction::SwitchTo(next_task.clone())
} else {
ReschedAction::Retry
}
})
}
/// Yields execution.
pub(super) fn yield_now() {
reschedule(&mut |local_rq| {
local_rq.update_current(UpdateFlags::Yield);
if let Some(next_task) = local_rq.pick_next_current() {
ReschedAction::SwitchTo(next_task.clone())
} else {
ReschedAction::DoNothing
}
})
}
/// Do rescheduling by acting on the scheduling decision (`ReschedAction`) made by a
/// user-given closure.
///
/// The closure makes the scheduling decision by taking the local runqueue has its input.
fn reschedule<F>(f: &mut F)
where
F: FnMut(&mut dyn LocalRunQueue) -> ReschedAction,
{
let next_task = loop {
let mut action = ReschedAction::DoNothing;
SCHEDULER.get().unwrap().local_mut_rq_with(&mut |rq| {
action = f(rq);
});
match action {
ReschedAction::DoNothing => {
return;
}
ReschedAction::Retry => {
continue;
}
ReschedAction::SwitchTo(next_task) => {
break next_task;
}
};
};
SHOULD_PREEMPT.clear();
processor::switch_to_task(next_task);
}
/// Possible actions of a rescheduling.
enum ReschedAction {
/// Keep running current task and do nothing.
DoNothing,
/// Loop until finding a task to swap out the current.
Retry,
/// Switch to target task.
SwitchTo(Arc<Task>),
}
static SHOULD_PREEMPT: ShouldPreemptFlag = ShouldPreemptFlag::new();
struct ShouldPreemptFlag {}
impl ShouldPreemptFlag {
const SHIFT: u8 = 31;
const MASK: u32 = 1 << Self::SHIFT;
const fn new() -> Self {
Self {}
}
fn set(&self) {
PREEMPT_INFO.bitand_assign(!Self::MASK);
}
fn clear(&self) {
PREEMPT_INFO.bitor_assign(Self::MASK);
}
}

View File

@ -4,22 +4,23 @@
// So we temporary allow missing_docs for this module.
#![allow(missing_docs)]
use alloc::{boxed::Box, sync::Arc};
use core::{any::Any, cell::UnsafeCell};
mod priority;
use core::{
any::Any,
cell::UnsafeCell,
sync::atomic::{AtomicU32, Ordering},
};
use intrusive_collections::{intrusive_adapter, LinkedListAtomicLink};
pub use priority::Priority;
use super::{
add_task,
priority::Priority,
processor::{current_task, schedule},
};
use super::{processor::current_task, scheduler};
pub(crate) use crate::arch::task::{context_switch, TaskContext};
use crate::{
cpu::CpuSet,
mm::{kspace::KERNEL_PAGE_TABLE, FrameAllocOptions, Paddr, PageFlags, Segment, PAGE_SIZE},
prelude::*,
sync::{SpinLock, SpinLockGuard},
user::UserSpace,
};
@ -103,6 +104,41 @@ impl Drop for KernelStack {
}
}
/// An atomic CPUID container.
pub struct AtomicCpuId(AtomicU32);
impl AtomicCpuId {
/// The null value of CPUID.
///
/// An `AtomicCpuId` with `AtomicCpuId::NONE` as its inner value is empty.
const NONE: u32 = u32::MAX;
fn new(cpu_id: u32) -> Self {
Self(AtomicU32::new(cpu_id))
}
/// Sets the inner value of an `AtomicCpuId` if it's empty.
///
/// The return value is a result indicating whether the new value was written
/// and containing the previous value.
pub fn set_if_is_none(&self, cpu_id: u32) -> core::result::Result<u32, u32> {
self.0
.compare_exchange(Self::NONE, cpu_id, Ordering::Relaxed, Ordering::Relaxed)
}
/// Sets the inner value of an `AtomicCpuId` to `AtomicCpuId::NONE`, i.e. makes
/// an `AtomicCpuId` empty.
pub fn set_to_none(&self) {
self.0.store(Self::NONE, Ordering::Relaxed);
}
}
impl Default for AtomicCpuId {
fn default() -> Self {
Self::new(Self::NONE)
}
}
/// A task that executes a function to the end.
///
/// Each task is associated with per-task data and an optional user space.
@ -112,11 +148,11 @@ pub struct Task {
func: Box<dyn Fn() + Send + Sync>,
data: Box<dyn Any + Send + Sync>,
user_space: Option<Arc<UserSpace>>,
task_inner: SpinLock<TaskInner>,
ctx: UnsafeCell<TaskContext>,
/// kernel stack, note that the top is SyscallFrame/TrapFrame
kstack: KernelStack,
link: LinkedListAtomicLink,
cpu: AtomicCpuId,
priority: Priority,
// TODO: add multiprocessor support
#[allow(dead_code)]
@ -130,11 +166,6 @@ intrusive_adapter!(pub TaskAdapter = Arc<Task>: Task { link: LinkedListAtomicLin
// we have exclusive access to the field.
unsafe impl Sync for Task {}
#[derive(Debug)]
pub(crate) struct TaskInner {
pub task_status: TaskStatus,
}
impl Task {
/// Gets the current task.
///
@ -143,11 +174,6 @@ impl Task {
current_task()
}
/// Gets inner
pub(crate) fn inner_exclusive_access(&self) -> SpinLockGuard<TaskInner> {
self.task_inner.lock_irq_disabled()
}
pub(super) fn ctx(&self) -> &UnsafeCell<TaskContext> {
&self.ctx
}
@ -157,18 +183,14 @@ impl Task {
/// Note that this method cannot be simply named "yield" as the name is
/// a Rust keyword.
pub fn yield_now() {
schedule();
scheduler::yield_now()
}
/// Runs the task.
///
/// BUG: This method highly depends on the current scheduling policy.
pub fn run(self: &Arc<Self>) {
add_task(self.clone());
schedule();
}
/// Returns the task status.
pub fn status(&self) -> TaskStatus {
self.task_inner.lock_irq_disabled().task_status
scheduler::run_new_task(self.clone());
}
/// Returns the task data.
@ -185,6 +207,16 @@ impl Task {
}
}
// Returns the cpu of this task.
pub fn cpu(&self) -> &AtomicCpuId {
&self.cpu
}
/// Returns the priority.
pub fn priority(&self) -> Priority {
self.priority
}
/// Exits the current task.
///
/// The task `self` must be the task that is currently running.
@ -192,13 +224,10 @@ impl Task {
/// **NOTE:** If there is anything left on the stack, it will be forgotten. This behavior may
/// lead to resource leakage.
fn exit(self: Arc<Self>) -> ! {
self.inner_exclusive_access().task_status = TaskStatus::Exited;
// `current_task()` still holds a strong reference, so nothing is destroyed at this point,
// neither is the kernel stack.
drop(self);
schedule();
scheduler::exit_current();
unreachable!()
}
@ -208,19 +237,6 @@ impl Task {
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
/// The status of a task.
pub enum TaskStatus {
/// The task is runnable.
Runnable,
/// The task is running in the foreground but will sleep when it goes to the background.
Sleepy,
/// The task is sleeping in the background.
Sleeping,
/// The task has exited.
Exited,
}
/// Options to create or spawn a new task.
pub struct TaskOptions {
func: Option<Box<dyn Fn() + Send + Sync>>,
@ -236,13 +252,12 @@ impl TaskOptions {
where
F: Fn() + Send + Sync + 'static,
{
let cpu_affinity = CpuSet::new_full();
Self {
func: Some(Box::new(func)),
data: None,
user_space: None,
priority: Priority::normal(),
cpu_affinity,
cpu_affinity: CpuSet::new_full(),
}
}
@ -300,11 +315,9 @@ impl TaskOptions {
func: self.func.unwrap(),
data: self.data.unwrap(),
user_space: self.user_space,
task_inner: SpinLock::new(TaskInner {
task_status: TaskStatus::Runnable,
}),
ctx: UnsafeCell::new(TaskContext::default()),
kstack: KernelStack::new_with_guard_page()?,
cpu: AtomicCpuId::default(),
link: LinkedListAtomicLink::new(),
priority: self.priority,
cpu_affinity: self.cpu_affinity,

View File

@ -7,7 +7,7 @@ pub const REAL_TIME_TASK_PRIORITY: u16 = 100;
/// Similar to Linux, a larger value represents a lower priority,
/// with a range of 0 to 139. Priorities ranging from 0 to 99 are considered real-time,
/// while those ranging from 100 to 139 are considered normal.
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct Priority(u16);
impl Priority {