diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 571a32d0..3ea32c83 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -9,6 +9,7 @@ #![allow(incomplete_features)] #![feature(btree_cursors)] #![feature(btree_extract_if)] +#![feature(debug_closure_helpers)] #![feature(extend_one)] #![feature(fn_traits)] #![feature(format_args_nl)] @@ -37,7 +38,6 @@ use ostd::{ cpu::PinCurrentCpu, }; use process::Process; -use sched::priority::PriorityRange; use crate::{ prelude::*, @@ -86,7 +86,7 @@ pub fn main() { // Spawn the first kernel thread on BSP. ThreadOptions::new(init_thread) - .priority(Priority::new(PriorityRange::new(PriorityRange::MAX))) + .priority(Priority::idle()) .spawn(); } @@ -120,7 +120,7 @@ fn ap_init() { ThreadOptions::new(ap_idle_thread) .cpu_affinity(cpu_id.into()) - .priority(Priority::new(PriorityRange::new(PriorityRange::MAX))) + .priority(Priority::idle()) .spawn(); } diff --git a/kernel/src/net/iface/poll.rs b/kernel/src/net/iface/poll.rs index c2de2cb1..0feab784 100644 --- a/kernel/src/net/iface/poll.rs +++ b/kernel/src/net/iface/poll.rs @@ -7,11 +7,7 @@ use log::trace; use ostd::timer::Jiffies; use super::{ext::IfaceEx, Iface, IFACES}; -use crate::{ - sched::priority::{Priority, PriorityRange}, - thread::kernel_thread::ThreadOptions, - WaitTimeout, -}; +use crate::{sched::priority::Priority, thread::kernel_thread::ThreadOptions, WaitTimeout}; pub fn lazy_init() { for iface in IFACES.get().unwrap() { @@ -68,6 +64,6 @@ fn spawn_background_poll_thread(iface: Arc) { // FIXME: remove the use of real-time priority. ThreadOptions::new(task_fn) - .priority(Priority::new(PriorityRange::new(0))) + .priority(Priority::default_real_time()) .spawn(); } diff --git a/kernel/src/process/posix_thread/builder.rs b/kernel/src/process/posix_thread/builder.rs index 408767e1..adb7502b 100644 --- a/kernel/src/process/posix_thread/builder.rs +++ b/kernel/src/process/posix_thread/builder.rs @@ -13,7 +13,7 @@ use crate::{ Credentials, Process, }, sched::priority::Priority, - thread::{status::ThreadStatus, task, Thread, Tid}, + thread::{task, Thread, Tid}, time::{clocks::ProfClock, TimerManager}, }; @@ -31,6 +31,7 @@ pub struct PosixThreadBuilder { clear_child_tid: Vaddr, sig_mask: AtomicSigMask, sig_queues: SigQueues, + priority: Priority, } impl PosixThreadBuilder { @@ -45,6 +46,7 @@ impl PosixThreadBuilder { clear_child_tid: 0, sig_mask: AtomicSigMask::new_empty(), sig_queues: SigQueues::new(), + priority: Priority::default(), } } @@ -73,6 +75,11 @@ impl PosixThreadBuilder { self } + pub fn priority(mut self, priority: Priority) -> Self { + self.priority = priority; + self + } + pub fn build(self) -> Arc { let Self { tid, @@ -84,6 +91,7 @@ impl PosixThreadBuilder { clear_child_tid, sig_mask, sig_queues, + priority, } = self; Arc::new_cyclic(|weak_task| { @@ -111,13 +119,10 @@ impl PosixThreadBuilder { } }; - let status = ThreadStatus::Init; - let priority = Priority::default(); let cpu_affinity = CpuSet::new_full(); let thread = Arc::new(Thread::new( weak_task.clone(), posix_thread, - status, priority, cpu_affinity, )); diff --git a/kernel/src/sched/mod.rs b/kernel/src/sched/mod.rs index 8621493f..0dc56db1 100644 --- a/kernel/src/sched/mod.rs +++ b/kernel/src/sched/mod.rs @@ -1,7 +1,9 @@ // SPDX-License-Identifier: MPL-2.0 pub mod priority; +// TODO: Remove this out-dated module once the `sched_class` module is stable. mod priority_scheduler; +mod sched_class; mod stats; // Export the stats getter functions. @@ -9,4 +11,4 @@ pub use stats::{loadavg, nr_queued_and_running}; // There may be multiple scheduling policies in the system, // and subsequent schedulers can be placed under this module. -pub use self::priority_scheduler::init; +pub use self::sched_class::{init, SchedAttr}; diff --git a/kernel/src/sched/priority.rs b/kernel/src/sched/priority.rs index 7cf76717..35af35fc 100644 --- a/kernel/src/sched/priority.rs +++ b/kernel/src/sched/priority.rs @@ -23,7 +23,7 @@ impl Nice { Self(range) } - pub fn range(&self) -> &NiceRange { + pub const fn range(&self) -> &NiceRange { &self.0 } @@ -74,7 +74,15 @@ impl Priority { Self(range) } - pub fn range(&self) -> &PriorityRange { + pub const fn default_real_time() -> Self { + Self::new(PriorityRange::new(50)) + } + + pub const fn idle() -> Self { + Self::new(PriorityRange::new(PriorityRange::MAX)) + } + + pub const fn range(&self) -> &PriorityRange { &self.0 } @@ -89,6 +97,12 @@ impl From for Priority { } } +impl From for Nice { + fn from(priority: Priority) -> Self { + Self::new(NiceRange::new((priority.range().get() - 100) as i8 - 20)) + } +} + impl Default for Priority { fn default() -> Self { Nice::default().into() @@ -129,7 +143,7 @@ macro_rules! define_ranged_integer { self.0 = val; } - $visibility fn get(self) -> $type { + $visibility const fn get(self) -> $type { self.0 } } diff --git a/kernel/src/sched/priority_scheduler.rs b/kernel/src/sched/priority_scheduler.rs index 2d6be77f..803e2d25 100644 --- a/kernel/src/sched/priority_scheduler.rs +++ b/kernel/src/sched/priority_scheduler.rs @@ -16,11 +16,12 @@ use ostd::{ }; use super::{ - priority::{Priority, PriorityRange}, + priority::Priority, stats::{set_stats_from_scheduler, SchedulerStats}, }; use crate::{prelude::*, thread::Thread}; +#[allow(unused)] pub fn init() { let preempt_scheduler = Box::new(PreemptScheduler::default()); let scheduler = Box::>::leak(preempt_scheduler); @@ -298,8 +299,8 @@ impl Default for TimeSlice { } impl PreemptSchedInfo for Thread { - const REAL_TIME_TASK_PRIORITY: Priority = Priority::new(PriorityRange::new(100)); - const LOWEST_TASK_PRIORITY: Priority = Priority::new(PriorityRange::new(PriorityRange::MAX)); + const REAL_TIME_TASK_PRIORITY: Priority = Priority::default_real_time(); + const LOWEST_TASK_PRIORITY: Priority = Priority::idle(); fn priority(&self) -> Priority { self.atomic_priority().load(Ordering::Relaxed) diff --git a/kernel/src/sched/sched_class/fair.rs b/kernel/src/sched/sched_class/fair.rs new file mode 100644 index 00000000..3b14ac05 --- /dev/null +++ b/kernel/src/sched/sched_class/fair.rs @@ -0,0 +1,268 @@ +// SPDX-License-Identifier: MPL-2.0 + +use alloc::{collections::binary_heap::BinaryHeap, sync::Arc}; +use core::{ + cmp::{self, Reverse}, + sync::atomic::{AtomicU64, Ordering::*}, +}; + +use ostd::{ + cpu::{num_cpus, CpuId}, + task::scheduler::{EnqueueFlags, UpdateFlags}, +}; + +use super::{ + time::{base_slice_clocks, min_period_clocks}, + CurrentRuntime, SchedAttr, SchedClassRq, +}; +use crate::{ + sched::priority::{Nice, NiceRange}, + thread::Thread, +}; + +const WEIGHT_0: u64 = 1024; +pub const fn nice_to_weight(nice: Nice) -> u64 { + // Calculated by the formula below: + // + // weight = 1024 * 1.25^(-nice) + // + // We propose that every increment of the nice value results + // in 12.5% change of the CPU load weight. + const FACTOR_NUMERATOR: u64 = 5; + const FACTOR_DENOMINATOR: u64 = 4; + + const NICE_TO_WEIGHT: [u64; 40] = const { + let mut ret = [0; 40]; + + let mut index = 0; + let mut nice = NiceRange::MIN; + while nice <= NiceRange::MAX { + ret[index] = match nice { + 0 => WEIGHT_0, + nice @ 1.. => { + let numerator = FACTOR_DENOMINATOR.pow(nice as u32); + let denominator = FACTOR_NUMERATOR.pow(nice as u32); + WEIGHT_0 * numerator / denominator + } + nice => { + let numerator = FACTOR_NUMERATOR.pow((-nice) as u32); + let denominator = FACTOR_DENOMINATOR.pow((-nice) as u32); + WEIGHT_0 * numerator / denominator + } + }; + + index += 1; + nice += 1; + } + ret + }; + + NICE_TO_WEIGHT[(nice.range().get() + 20) as usize] +} + +/// The scheduling entity for the FAIR scheduling class. +/// +/// The structure contains a significant indicator: `vruntime`. +/// +/// # `vruntime` +/// +/// The vruntime (virtual runtime) is calculated by the formula: +/// +/// vruntime += runtime_delta * WEIGHT_0 / weight +/// +/// and a thread with a lower vruntime gains a greater privilege to be +/// scheduled, making the whole run queue balanced on vruntime (thus FAIR). +/// +/// # Scheduling periods +/// +/// Scheduling periods is designed to calculate the time slice for each threads. +/// +/// The time slice for each threads is calculated by the formula: +/// +/// time_slice = period * weight / total_weight +/// +/// where `total_weight` is the sum of all weights in the run queue including +/// the current thread and [`period`](FairClassRq::period) is calculated +/// regarding the number of running threads. +/// +/// When a thread meets the condition below, it will be preempted to the +/// run queue. See [`FairClassRq::update_current`] for more details. +/// +/// period_delta > time_slice +/// || vruntime > rq_min_vruntime + normalized_time_slice +#[derive(Debug)] +pub struct FairAttr { + weight: AtomicU64, + vruntime: AtomicU64, +} + +impl FairAttr { + pub fn new(nice: Nice) -> Self { + FairAttr { + weight: nice_to_weight(nice).into(), + vruntime: Default::default(), + } + } + + pub fn update(&self, nice: Nice) { + self.weight.store(nice_to_weight(nice), Relaxed); + } + + fn update_vruntime(&self, delta: u64) -> (u64, u64) { + let weight = self.weight.load(Relaxed); + let delta = delta * WEIGHT_0 / weight; + let vruntime = self.vruntime.fetch_add(delta, Relaxed) + delta; + (vruntime, weight) + } +} + +/// The wrapper for threads in the FAIR run queue. +/// +/// This structure is used to provide the capability for keying in the +/// run queue implemented by `BTreeSet` in the `FairClassRq`. +struct FairQueueItem(Arc); + +impl core::fmt::Debug for FairQueueItem { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:?}", self.key()) + } +} + +impl FairQueueItem { + fn key(&self) -> u64 { + self.0.sched_attr().fair.vruntime.load(Relaxed) + } +} + +impl PartialEq for FairQueueItem { + fn eq(&self, other: &Self) -> bool { + self.key().eq(&other.key()) + } +} + +impl Eq for FairQueueItem {} + +impl PartialOrd for FairQueueItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for FairQueueItem { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.key().cmp(&other.key()) + } +} + +/// The per-cpu run queue for the FAIR scheduling class. +/// +/// See [`FairAttr`] for the explanation of vruntimes and scheduling periods. +/// +/// The structure contains a `BTreeSet` to store the threads in the run queue to +/// ensure the efficiency for finding next-to-run threads. +#[derive(Debug)] +pub(super) struct FairClassRq { + #[allow(unused)] + cpu: CpuId, + /// The ready-to-run threads. + threads: BinaryHeap>, + /// The minimum of vruntime in the run queue. Serves as the initial + /// value of newly-enqueued threads. + min_vruntime: u64, + total_weight: u64, +} + +impl FairClassRq { + pub fn new(cpu: CpuId) -> Self { + Self { + cpu, + threads: BinaryHeap::new(), + min_vruntime: 0, + total_weight: 0, + } + } + + /// The scheduling period is calculated as the maximum of the following two values: + /// + /// 1. The minimum period value, defined by [`min_period_clocks`]. + /// 2. `period = min_granularity * n` where + /// `min_granularity = log2(1 + num_cpus) * base_slice_clocks`, and `n` is the number of + /// runnable threads (including the current running thread). + /// + /// The formula is chosen by 3 principles: + /// + /// 1. The scheduling period should reflect the running threads and CPUs; + /// 2. The scheduling period should not be too low to limit the overhead of context switching; + /// 3. The scheduling period should not be too high to ensure the scheduling latency + /// & responsiveness. + fn period(&self) -> u64 { + let base_slice_clks = base_slice_clocks(); + let min_period_clks = min_period_clocks(); + + // `+ 1` means including the current running thread. + let period_single_cpu = + (base_slice_clks * (self.threads.len() + 1) as u64).max(min_period_clks); + period_single_cpu * u64::from((1 + num_cpus()).ilog2()) + } + + /// The virtual time slice for each thread in the run queue, measured in vruntime clocks. + fn vtime_slice(&self) -> u64 { + self.period() / (self.threads.len() + 1) as u64 + } + + /// The time slice for each thread in the run queue, measured in sched clocks. + fn time_slice(&self, cur_weight: u64) -> u64 { + self.period() * cur_weight / (self.total_weight + cur_weight) + } +} + +impl SchedClassRq for FairClassRq { + fn enqueue(&mut self, thread: Arc, flags: Option) { + let fair_attr = &thread.sched_attr().fair; + let vruntime = match flags { + Some(EnqueueFlags::Spawn) => self.min_vruntime + self.vtime_slice(), + _ => self.min_vruntime, + }; + fair_attr.vruntime.fetch_max(vruntime, Relaxed); + + self.total_weight += fair_attr.weight.load(Relaxed); + self.threads.push(Reverse(FairQueueItem(thread))); + } + + fn len(&mut self) -> usize { + self.threads.len() + } + + fn is_empty(&mut self) -> bool { + self.threads.is_empty() + } + + fn pick_next(&mut self) -> Option> { + let Reverse(FairQueueItem(thread)) = self.threads.pop()?; + + self.total_weight -= thread.sched_attr().fair.weight.load(Relaxed); + + Some(thread) + } + + fn update_current( + &mut self, + rt: &CurrentRuntime, + attr: &SchedAttr, + flags: UpdateFlags, + ) -> bool { + match flags { + UpdateFlags::Yield => true, + UpdateFlags::Tick | UpdateFlags::Wait => { + let (vruntime, weight) = attr.fair.update_vruntime(rt.delta); + self.min_vruntime = match self.threads.peek() { + Some(Reverse(leftmost)) => vruntime.min(leftmost.key()), + None => vruntime, + }; + + rt.period_delta > self.time_slice(weight) + || vruntime > self.min_vruntime + self.vtime_slice() + } + } + } +} diff --git a/kernel/src/sched/sched_class/idle.rs b/kernel/src/sched/sched_class/idle.rs new file mode 100644 index 00000000..008e6e3d --- /dev/null +++ b/kernel/src/sched/sched_class/idle.rs @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: MPL-2.0 + +use super::*; + +/// The per-cpu run queue for the IDLE scheduling class. +/// +/// This run queue is used for the per-cpu idle thread, if any. +pub(super) struct IdleClassRq { + thread: Option>, +} + +impl IdleClassRq { + pub fn new() -> Self { + Self { thread: None } + } +} + +impl core::fmt::Debug for IdleClassRq { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if self.thread.is_some() { + write!(f, "Idle: occupied")?; + } else { + write!(f, "Idle: empty")?; + } + Ok(()) + } +} + +impl SchedClassRq for IdleClassRq { + fn enqueue(&mut self, thread: Arc, _: Option) { + let ptr = Arc::as_ptr(&thread); + if let Some(t) = self.thread.replace(thread) + && ptr != Arc::as_ptr(&t) + { + panic!("Multiple `idle` threads spawned") + } + } + + fn len(&mut self) -> usize { + usize::from(!self.is_empty()) + } + + fn is_empty(&mut self) -> bool { + self.thread.is_none() + } + + fn pick_next(&mut self) -> Option> { + self.thread.clone() + } + + fn update_current(&mut self, _: &CurrentRuntime, _: &SchedAttr, _flags: UpdateFlags) -> bool { + // Idle threads has the greatest priority value. They should always be preempted. + true + } +} diff --git a/kernel/src/sched/sched_class/mod.rs b/kernel/src/sched/sched_class/mod.rs new file mode 100644 index 00000000..bb7f39d9 --- /dev/null +++ b/kernel/src/sched/sched_class/mod.rs @@ -0,0 +1,373 @@ +// SPDX-License-Identifier: MPL-2.0 + +#![warn(unused)] + +use alloc::{boxed::Box, sync::Arc}; +use core::{fmt, sync::atomic::AtomicU64}; + +use ostd::{ + cpu::{all_cpus, AtomicCpuSet, CpuId, PinCurrentCpu}, + sync::SpinLock, + task::{ + scheduler::{ + info::CommonSchedInfo, inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, + UpdateFlags, + }, + Task, + }, + trap::disable_local, +}; + +mod time; + +mod fair; +mod idle; +mod real_time; +mod stop; + +use ostd::arch::read_tsc as sched_clock; + +use super::{ + priority::{Nice, Priority, RangedU8}, + stats::SchedulerStats, +}; +use crate::thread::{AsThread, Thread}; + +#[allow(unused)] +pub fn init() { + inject_scheduler(Box::leak(Box::new(ClassScheduler::new()))); +} + +/// Represents the middle layer between scheduling classes and generic scheduler +/// traits. It consists of all the sets of run queues for CPU cores. Other global +/// information may also be stored here. +pub struct ClassScheduler { + rqs: Box<[SpinLock]>, +} + +/// Represents the run queue for each CPU core. It stores a list of run queues for +/// scheduling classes in its corresponding CPU core. The current task of this CPU +/// core is also stored in this structure. +struct PerCpuClassRqSet { + stop: Arc, + real_time: real_time::RealTimeClassRq, + fair: fair::FairClassRq, + idle: idle::IdleClassRq, + current: Option<(Arc, CurrentRuntime)>, +} + +/// Stores the runtime information of the current task. +/// +/// This is used to calculate the time slice of the current task. +/// +/// This struct is independent of the current `Arc` instead encapsulating the +/// task, because the scheduling class implementations use `CurrentRuntime` and +/// `SchedAttr` only. +struct CurrentRuntime { + start: u64, + delta: u64, + period_delta: u64, +} + +impl CurrentRuntime { + fn new() -> Self { + CurrentRuntime { + start: sched_clock(), + delta: 0, + period_delta: 0, + } + } + + fn update(&mut self) { + let now = sched_clock(); + self.delta = now - core::mem::replace(&mut self.start, now); + self.period_delta += self.delta; + } +} + +/// The run queue for scheduling classes (the main trait). Scheduling classes +/// should implement this trait to function as expected. +trait SchedClassRq: Send + fmt::Debug { + /// Enqueues a task into the run queue. + fn enqueue(&mut self, thread: Arc, flags: Option); + + /// Returns the number of threads in the run queue. + fn len(&mut self) -> usize; + + /// Checks if the run queue is empty. + fn is_empty(&mut self) -> bool { + self.len() == 0 + } + + /// Picks the next task for running. + fn pick_next(&mut self) -> Option>; + + /// Update the information of the current task. + fn update_current(&mut self, rt: &CurrentRuntime, attr: &SchedAttr, flags: UpdateFlags) + -> bool; +} + +pub use real_time::RealTimePolicy; + +/// The User-chosen scheduling policy. +/// +/// The scheduling policies are specified by the user, usually through its priority. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum SchedPolicy { + Stop, + RealTime { + rt_prio: real_time::RtPrio, + rt_policy: RealTimePolicy, + }, + Fair(Nice), + Idle, +} + +impl From for SchedPolicy { + fn from(priority: Priority) -> Self { + match priority.range().get() { + 0 => SchedPolicy::Stop, + rt @ 1..=99 => SchedPolicy::RealTime { + rt_prio: RangedU8::new(rt), + rt_policy: Default::default(), + }, + 100..=139 => SchedPolicy::Fair(priority.into()), + _ => SchedPolicy::Idle, + } + } +} + +/// The scheduling attribute for a thread. +/// +/// This is used to store the scheduling policy and runtime parameters for each +/// scheduling class. +#[derive(Debug)] +pub struct SchedAttr { + policy: SpinLock, + + real_time: real_time::RealTimeAttr, + fair: fair::FairAttr, +} + +impl SchedAttr { + /// Constructs a new `SchedAttr` with the given scheduling policy. + pub fn new(policy: SchedPolicy) -> Self { + Self { + policy: SpinLock::new(policy), + real_time: { + let (prio, policy) = match policy { + SchedPolicy::RealTime { rt_prio, rt_policy } => (rt_prio.get(), rt_policy), + _ => (real_time::RtPrio::MAX, Default::default()), + }; + real_time::RealTimeAttr::new(prio, policy) + }, + fair: fair::FairAttr::new(match policy { + SchedPolicy::Fair(nice) => nice, + _ => Nice::default(), + }), + } + } + + /// Retrieves the current scheduling policy of the thread. + pub fn policy(&self) -> SchedPolicy { + *self.policy.lock() + } + + /// Updates the scheduling policy of the thread. + /// + /// Specifically for real-time policies, if the new policy doesn't + /// specify a base slice factor for RR, the old one will be kept. + pub fn set_policy(&self, mut policy: SchedPolicy) { + let mut guard = self.policy.lock(); + match policy { + SchedPolicy::RealTime { rt_prio, rt_policy } => { + self.real_time.update(rt_prio.get(), rt_policy); + } + SchedPolicy::Fair(nice) => self.fair.update(nice), + _ => {} + } + + // Keep the old base slice factor if the new policy doesn't specify one. + if let ( + SchedPolicy::RealTime { + rt_policy: + RealTimePolicy::RoundRobin { + base_slice_factor: slot, + }, + .. + }, + SchedPolicy::RealTime { + rt_policy: RealTimePolicy::RoundRobin { base_slice_factor }, + .. + }, + ) = (*guard, &mut policy) + { + *base_slice_factor = slot.or(*base_slice_factor); + } + + *guard = policy; + } +} + +impl Scheduler for ClassScheduler { + fn enqueue(&self, task: Arc, flags: EnqueueFlags) -> Option { + let thread = task.as_thread()?; + + let (still_in_rq, cpu) = { + let selected_cpu_id = self.select_cpu(thread.atomic_cpu_affinity()); + + if let Err(task_cpu_id) = task.cpu().set_if_is_none(selected_cpu_id) { + debug_assert!(flags != EnqueueFlags::Spawn); + (true, task_cpu_id) + } else { + (false, selected_cpu_id) + } + }; + + let mut rq = self.rqs[cpu.as_usize()].disable_irq().lock(); + + // Note: call set_if_is_none again to prevent a race condition. + if still_in_rq && task.cpu().set_if_is_none(cpu).is_err() { + return None; + } + + rq.enqueue_thread(thread, Some(flags)); + Some(cpu) + } + + fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue)) { + let guard = disable_local(); + let mut lock = self.rqs[guard.current_cpu().as_usize()].lock(); + f(&mut *lock) + } + + fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue)) { + let guard = disable_local(); + f(&*self.rqs[guard.current_cpu().as_usize()].lock()) + } +} + +impl ClassScheduler { + pub fn new() -> Self { + let stop = stop::StopClassRq::new(); + let class_rq = |cpu| { + SpinLock::new(PerCpuClassRqSet { + stop: stop.clone(), + real_time: real_time::RealTimeClassRq::new(cpu), + fair: fair::FairClassRq::new(cpu), + idle: idle::IdleClassRq::new(), + current: None, + }) + }; + ClassScheduler { + rqs: all_cpus().map(class_rq).collect(), + } + } + + // TODO: Implement a better algorithm and replace the current naive implementation. + fn select_cpu(&self, affinity: &AtomicCpuSet) -> CpuId { + let guard = disable_local(); + let affinity = affinity.load(); + let cur = guard.current_cpu(); + if affinity.contains(cur) { + cur + } else { + affinity.iter().next().expect("empty affinity") + } + } +} + +impl PerCpuClassRqSet { + fn pick_next_thread(&mut self) -> Option> { + (self.stop.pick_next()) + .or_else(|| self.real_time.pick_next()) + .or_else(|| self.fair.pick_next()) + .or_else(|| self.idle.pick_next()) + } + + fn enqueue_thread(&mut self, thread: &Arc, flags: Option) { + let attr = thread.sched_attr(); + + let cloned = thread.clone(); + match *attr.policy.lock() { + SchedPolicy::Stop => self.stop.enqueue(cloned, flags), + SchedPolicy::RealTime { .. } => self.real_time.enqueue(cloned, flags), + SchedPolicy::Fair(_) => self.fair.enqueue(cloned, flags), + SchedPolicy::Idle => self.idle.enqueue(cloned, flags), + }; + } + + fn nr_queued_and_running(&mut self) -> (u32, u32) { + let queued = self.stop.len() + self.real_time.len() + self.fair.len() + self.idle.len(); + let running = usize::from(self.current.is_some()); + (queued as u32, running as u32) + } +} + +impl LocalRunQueue for PerCpuClassRqSet { + fn current(&self) -> Option<&Arc> { + self.current.as_ref().map(|(task, _)| task) + } + + fn pick_next_current(&mut self) -> Option<&Arc> { + self.pick_next_thread().and_then(|next| { + let next_task = next.task(); + if let Some((old_task, _)) = self + .current + .replace((next_task.clone(), CurrentRuntime::new())) + { + if Arc::ptr_eq(&old_task, &next_task) { + return None; + } + let old = old_task.as_thread().unwrap(); + self.enqueue_thread(old, None); + } + self.current.as_ref().map(|(task, _)| task) + }) + } + + fn update_current(&mut self, flags: UpdateFlags) -> bool { + if let Some((cur_task, rt)) = &mut self.current + && let Some(cur) = cur_task.as_thread() + { + rt.update(); + let attr = &cur.sched_attr(); + + let (current_expired, lookahead) = match &*attr.policy.lock() { + SchedPolicy::Stop => (self.stop.update_current(rt, attr, flags), 0), + SchedPolicy::RealTime { .. } => (self.real_time.update_current(rt, attr, flags), 1), + SchedPolicy::Fair(_) => (self.fair.update_current(rt, attr, flags), 2), + SchedPolicy::Idle => (self.idle.update_current(rt, attr, flags), 3), + }; + + current_expired + || (lookahead >= 1 && !self.stop.is_empty()) + || (lookahead >= 2 && !self.real_time.is_empty()) + || (lookahead >= 3 && !self.fair.is_empty()) + } else { + true + } + } + + fn dequeue_current(&mut self) -> Option> { + self.current.take().map(|(cur_task, _)| { + cur_task.schedule_info().cpu.set_to_none(); + cur_task + }) + } +} + +impl SchedulerStats for ClassScheduler { + fn nr_queued_and_running(&self) -> (u32, u32) { + self.rqs.iter().fold((0, 0), |(queued, running), rq| { + let (q, r) = rq.lock().nr_queued_and_running(); + (queued + q, running + r) + }) + } +} + +impl Default for ClassScheduler { + fn default() -> Self { + Self::new() + } +} diff --git a/kernel/src/sched/sched_class/real_time.rs b/kernel/src/sched/sched_class/real_time.rs new file mode 100644 index 00000000..df65cda7 --- /dev/null +++ b/kernel/src/sched/sched_class/real_time.rs @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: MPL-2.0 + +use alloc::collections::vec_deque::VecDeque; +use core::{ + array, + num::NonZero, + sync::atomic::{AtomicU8, Ordering::*}, +}; + +use bitvec::{bitarr, BitArr}; + +use super::{time::base_slice_clocks, *}; + +pub type RtPrio = RangedU8<1, 99>; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum RealTimePolicy { + Fifo, + RoundRobin { + base_slice_factor: Option>, + }, +} + +impl Default for RealTimePolicy { + fn default() -> Self { + Self::RoundRobin { + base_slice_factor: None, + } + } +} + +impl RealTimePolicy { + fn to_time_slice(self) -> u64 { + match self { + RealTimePolicy::RoundRobin { base_slice_factor } => { + base_slice_clocks() + * base_slice_factor.map_or(DEFAULT_BASE_SLICE_FACTOR, NonZero::get) + } + RealTimePolicy::Fifo => 0, + } + } +} + +/// The scheduling attribute for the REAL-TIME scheduling class. +/// +/// This structure provides not-only the priority of the thread, +/// but also the time slice for the thread, measured in [`sched_clock`]s. +/// +/// - If the time slice is not set, the thread is considered to be a FIFO +/// thread, and will be executed to its end if there no thread with a +/// lower priority. +/// - If the time slice is set, the thread is considered to be an RR +/// (round-robin) thread, and will be executed for the time slice, and +/// then it will be put back to the inactive array. +#[derive(Debug)] +pub struct RealTimeAttr { + prio: AtomicU8, + time_slice: AtomicU64, // 0 for SCHED_FIFO; other for SCHED_RR +} + +const DEFAULT_BASE_SLICE_FACTOR: u64 = 20; + +impl RealTimeAttr { + pub fn new(prio: u8, policy: RealTimePolicy) -> Self { + RealTimeAttr { + prio: prio.into(), + time_slice: AtomicU64::new(policy.to_time_slice()), + } + } + + pub fn update(&self, prio: u8, policy: RealTimePolicy) { + self.prio.store(prio, Relaxed); + self.time_slice.store(policy.to_time_slice(), Relaxed); + } +} + +struct PrioArray { + map: BitArr![for 100], + queue: [VecDeque>; 100], +} + +impl core::fmt::Debug for PrioArray { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("PrioArray") + .field_with("map", |f| { + f.debug_list().entries(self.map.iter_ones()).finish() + }) + .field_with("queue", |f| { + f.debug_list() + .entries((self.queue.iter().flatten()).map(|thread| thread.sched_attr())) + .finish() + }) + .finish() + } +} + +impl PrioArray { + fn enqueue(&mut self, thread: Arc, prio: u8) { + let queue = &mut self.queue[usize::from(prio)]; + let is_empty = queue.is_empty(); + queue.push_back(thread); + if is_empty { + self.map.set(usize::from(prio), true); + } + } + + fn pop(&mut self) -> Option> { + let mut iter = self.map.iter_ones(); + let prio = iter.next()? as u8; + + let queue = &mut self.queue[usize::from(prio)]; + let thread = queue.pop_front()?; + + if queue.is_empty() { + self.map.set(usize::from(prio), false); + } + Some(thread) + } +} + +/// The per-cpu run queue for the REAL-TIME scheduling class. +/// +/// The REAL-TIME scheduling class is implemented as a classic O(1) +/// priority algorithm. +/// +/// It uses a bit array to track which priority levels have runnable +/// threads, and a vector of queues to store the threads. +/// +/// Threads are popped & dequeued from the active array (`array[index]`), and +/// are enqueued into the inactive array (`array[!index]`). When the active array +/// is empty, the 2 arrays are swapped by `index`. +#[derive(Debug)] +pub(super) struct RealTimeClassRq { + #[allow(unused)] + cpu: CpuId, + index: bool, + array: [PrioArray; 2], +} + +impl RealTimeClassRq { + pub fn new(cpu: CpuId) -> RealTimeClassRq { + RealTimeClassRq { + cpu, + index: false, + array: array::from_fn(|_| PrioArray { + map: bitarr![0; 100], + queue: array::from_fn(|_| VecDeque::new()), + }), + } + } + + fn active_array(&mut self) -> &mut PrioArray { + &mut self.array[usize::from(self.index)] + } + + fn inactive_array(&mut self) -> &mut PrioArray { + &mut self.array[usize::from(!self.index)] + } + + fn swap_arrays(&mut self) { + self.index = !self.index; + } +} + +impl SchedClassRq for RealTimeClassRq { + fn enqueue(&mut self, thread: Arc, _: Option) { + let prio = thread.sched_attr().real_time.prio.load(Relaxed); + self.inactive_array().enqueue(thread, prio); + } + + fn len(&mut self) -> usize { + self.active_array().map.count_ones() + self.inactive_array().map.count_ones() + } + + fn is_empty(&mut self) -> bool { + self.active_array().map.is_empty() && self.inactive_array().map.is_empty() + } + + fn pick_next(&mut self) -> Option> { + self.active_array().pop().or_else(|| { + self.swap_arrays(); + self.active_array().pop() + }) + } + + fn update_current( + &mut self, + rt: &CurrentRuntime, + attr: &SchedAttr, + flags: UpdateFlags, + ) -> bool { + let attr = &attr.real_time; + + match flags { + UpdateFlags::Tick | UpdateFlags::Wait => match attr.time_slice.load(Relaxed) { + 0 => (self.inactive_array().map.iter_ones().next()) + .is_some_and(|prio| prio > usize::from(attr.prio.load(Relaxed))), + ts => ts <= rt.period_delta, + }, + UpdateFlags::Yield => true, + } + } +} diff --git a/kernel/src/sched/sched_class/stop.rs b/kernel/src/sched/sched_class/stop.rs new file mode 100644 index 00000000..c1b5a597 --- /dev/null +++ b/kernel/src/sched/sched_class/stop.rs @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: MPL-2.0 + +use super::*; + +/// The per-cpu run queue for the STOP scheduling class. +/// +/// This is a singleton class, meaning that only one thread can be in this class at a time. +/// This is used for the most critical tasks, such as powering off and rebooting. +pub(super) struct StopClassRq { + thread: SpinLock>>, +} + +impl StopClassRq { + pub fn new() -> Arc { + Arc::new(StopClassRq { + thread: SpinLock::new(None), + }) + } +} + +impl core::fmt::Debug for StopClassRq { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if self.thread.lock().is_some() { + write!(f, "Stop: occupied")?; + } else { + write!(f, "Stop: empty")?; + } + Ok(()) + } +} + +impl SchedClassRq for Arc { + fn enqueue(&mut self, thread: Arc, _: Option) { + if self.thread.lock().replace(thread).is_some() { + panic!("Multiple `stop` threads spawned") + } + } + + fn len(&mut self) -> usize { + usize::from(!self.is_empty()) + } + + fn is_empty(&mut self) -> bool { + self.thread.lock().is_none() + } + + fn pick_next(&mut self) -> Option> { + self.thread.lock().take() + } + + fn update_current(&mut self, _: &CurrentRuntime, _: &SchedAttr, _flags: UpdateFlags) -> bool { + // Stop threads has the lowest priority value. They should never be preempted. + false + } +} diff --git a/kernel/src/sched/sched_class/time.rs b/kernel/src/sched/sched_class/time.rs new file mode 100644 index 00000000..cbb99c77 --- /dev/null +++ b/kernel/src/sched/sched_class/time.rs @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: MPL-2.0 + +use core::mem; + +use spin::Once; + +/// Returns the numerator and denominator of the ratio R: +/// +/// R = 10^9 (ns in a sec) / TSC clock frequency +fn tsc_factors() -> (u64, u64) { + static FACTORS: Once<(u64, u64)> = Once::new(); + *FACTORS.call_once(|| { + let freq = ostd::arch::tsc_freq(); + assert_ne!(freq, 0); + let mut a = 1_000_000_000; + let mut b = freq; + if a < b { + mem::swap(&mut a, &mut b); + } + while a > 1 && b > 1 { + let t = a; + a = b; + b = t % b; + } + + let gcd = if a <= 1 { b } else { a }; + (1_000_000_000 / gcd, freq / gcd) + }) +} + +/// The base time slice allocated for every thread, measured in nanoseconds. +pub const BASE_SLICE_NS: u64 = 750_000; + +/// The minimum scheduling period, measured in nanoseconds. +pub const MIN_PERIOD_NS: u64 = 6_000_000; + +fn consts() -> (u64, u64) { + static CONSTS: Once<(u64, u64)> = Once::new(); + *CONSTS.call_once(|| { + let (a, b) = tsc_factors(); + (BASE_SLICE_NS * b / a, MIN_PERIOD_NS * b / a) + }) +} + +/// Returns the base time slice allocated for every thread, measured in TSC clock units. +pub fn base_slice_clocks() -> u64 { + consts().0 +} + +/// Returns the minimum scheduling period, measured in TSC clock units. +pub fn min_period_clocks() -> u64 { + consts().1 +} diff --git a/kernel/src/thread/kernel_thread.rs b/kernel/src/thread/kernel_thread.rs index abced9dd..04ee8cf0 100644 --- a/kernel/src/thread/kernel_thread.rs +++ b/kernel/src/thread/kernel_thread.rs @@ -5,7 +5,7 @@ use ostd::{ task::{Task, TaskOptions}, }; -use super::{oops, status::ThreadStatus, AsThread, Thread}; +use super::{oops, AsThread, Thread}; use crate::{prelude::*, sched::priority::Priority}; /// The inner data of a kernel thread. @@ -58,13 +58,11 @@ impl ThreadOptions { Arc::new_cyclic(|weak_task| { let thread = { let kernel_thread = KernelThread; - let status = ThreadStatus::Init; let priority = self.priority; let cpu_affinity = self.cpu_affinity; Arc::new(Thread::new( weak_task.clone(), kernel_thread, - status, priority, cpu_affinity, )) diff --git a/kernel/src/thread/mod.rs b/kernel/src/thread/mod.rs index d3d46c1c..84b93720 100644 --- a/kernel/src/thread/mod.rs +++ b/kernel/src/thread/mod.rs @@ -12,7 +12,10 @@ use ostd::{ use self::status::{AtomicThreadStatus, ThreadStatus}; use crate::{ prelude::*, - sched::priority::{AtomicPriority, Priority}, + sched::{ + priority::{AtomicPriority, Priority}, + SchedAttr, + }, }; pub mod exception; @@ -40,6 +43,7 @@ pub struct Thread { priority: AtomicPriority, /// Thread CPU affinity cpu_affinity: AtomicCpuSet, + sched_attr: SchedAttr, } impl Thread { @@ -47,16 +51,16 @@ impl Thread { pub fn new( task: Weak, data: impl Send + Sync + Any, - status: ThreadStatus, priority: Priority, cpu_affinity: CpuSet, ) -> Self { Thread { task, data: Box::new(data), - status: AtomicThreadStatus::new(status), + status: AtomicThreadStatus::new(ThreadStatus::Init), priority: AtomicPriority::new(priority), cpu_affinity: AtomicCpuSet::new(cpu_affinity), + sched_attr: SchedAttr::new(priority.into()), } } @@ -140,6 +144,10 @@ impl Thread { &self.cpu_affinity } + pub fn sched_attr(&self) -> &SchedAttr { + &self.sched_attr + } + /// Yields the execution to another thread. /// /// This method will return once the current thread is scheduled again. diff --git a/kernel/src/thread/work_queue/worker.rs b/kernel/src/thread/work_queue/worker.rs index 54ee8c02..d68aa18e 100644 --- a/kernel/src/thread/work_queue/worker.rs +++ b/kernel/src/thread/work_queue/worker.rs @@ -10,7 +10,7 @@ use ostd::{ use super::worker_pool::WorkerPool; use crate::{ prelude::*, - sched::priority::{Priority, PriorityRange}, + sched::priority::Priority, thread::{kernel_thread::ThreadOptions, AsThread}, }; @@ -52,8 +52,7 @@ impl Worker { cpu_affinity.add(bound_cpu); let mut priority = Priority::default(); if worker_pool.upgrade().unwrap().is_high_priority() { - // FIXME: remove the use of real-time priority. - priority = Priority::new(PriorityRange::new(0)); + priority = Priority::default_real_time(); } let bound_task = ThreadOptions::new(task_fn) .cpu_affinity(cpu_affinity) diff --git a/kernel/src/thread/work_queue/worker_pool.rs b/kernel/src/thread/work_queue/worker_pool.rs index af9349fa..33677b79 100644 --- a/kernel/src/thread/work_queue/worker_pool.rs +++ b/kernel/src/thread/work_queue/worker_pool.rs @@ -16,7 +16,7 @@ use ostd::{ use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue}; use crate::{ prelude::*, - sched::priority::{Priority, PriorityRange}, + sched::priority::Priority, thread::{kernel_thread::ThreadOptions, AsThread}, }; @@ -242,7 +242,7 @@ impl Monitor { // This workaround is to make the monitor of high-priority worker pool // starvation-free under the current scheduling policy. let priority = match priority { - WorkPriority::High => Priority::new(PriorityRange::new(0)), + WorkPriority::High => Priority::default_real_time(), WorkPriority::Normal => Priority::default(), }; let bound_task = ThreadOptions::new(task_fn) diff --git a/ostd/src/task/scheduler/mod.rs b/ostd/src/task/scheduler/mod.rs index 7948433d..e714ee28 100644 --- a/ostd/src/task/scheduler/mod.rs +++ b/ostd/src/task/scheduler/mod.rs @@ -138,8 +138,8 @@ where // `dequeue_current` method and nothing bad will happen. This may need to be revisited // after more complex schedulers are introduced. - current = local_rq.dequeue_current(); local_rq.update_current(UpdateFlags::Wait); + current = local_rq.dequeue_current(); } if let Some(next_task) = local_rq.pick_next_current() {