Scheduling class support

This commit is contained in:
徐启航 2024-08-17 08:06:35 +00:00 committed by Tate, Hongliang Tian
parent a260411a2a
commit 878f3f3760
17 changed files with 1062 additions and 32 deletions

View File

@ -9,6 +9,7 @@
#![allow(incomplete_features)]
#![feature(btree_cursors)]
#![feature(btree_extract_if)]
#![feature(debug_closure_helpers)]
#![feature(extend_one)]
#![feature(fn_traits)]
#![feature(format_args_nl)]
@ -37,7 +38,6 @@ use ostd::{
cpu::PinCurrentCpu,
};
use process::Process;
use sched::priority::PriorityRange;
use crate::{
prelude::*,
@ -86,7 +86,7 @@ pub fn main() {
// Spawn the first kernel thread on BSP.
ThreadOptions::new(init_thread)
.priority(Priority::new(PriorityRange::new(PriorityRange::MAX)))
.priority(Priority::idle())
.spawn();
}
@ -120,7 +120,7 @@ fn ap_init() {
ThreadOptions::new(ap_idle_thread)
.cpu_affinity(cpu_id.into())
.priority(Priority::new(PriorityRange::new(PriorityRange::MAX)))
.priority(Priority::idle())
.spawn();
}

View File

@ -7,11 +7,7 @@ use log::trace;
use ostd::timer::Jiffies;
use super::{ext::IfaceEx, Iface, IFACES};
use crate::{
sched::priority::{Priority, PriorityRange},
thread::kernel_thread::ThreadOptions,
WaitTimeout,
};
use crate::{sched::priority::Priority, thread::kernel_thread::ThreadOptions, WaitTimeout};
pub fn lazy_init() {
for iface in IFACES.get().unwrap() {
@ -68,6 +64,6 @@ fn spawn_background_poll_thread(iface: Arc<Iface>) {
// FIXME: remove the use of real-time priority.
ThreadOptions::new(task_fn)
.priority(Priority::new(PriorityRange::new(0)))
.priority(Priority::default_real_time())
.spawn();
}

View File

@ -13,7 +13,7 @@ use crate::{
Credentials, Process,
},
sched::priority::Priority,
thread::{status::ThreadStatus, task, Thread, Tid},
thread::{task, Thread, Tid},
time::{clocks::ProfClock, TimerManager},
};
@ -31,6 +31,7 @@ pub struct PosixThreadBuilder {
clear_child_tid: Vaddr,
sig_mask: AtomicSigMask,
sig_queues: SigQueues,
priority: Priority,
}
impl PosixThreadBuilder {
@ -45,6 +46,7 @@ impl PosixThreadBuilder {
clear_child_tid: 0,
sig_mask: AtomicSigMask::new_empty(),
sig_queues: SigQueues::new(),
priority: Priority::default(),
}
}
@ -73,6 +75,11 @@ impl PosixThreadBuilder {
self
}
pub fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}
pub fn build(self) -> Arc<Task> {
let Self {
tid,
@ -84,6 +91,7 @@ impl PosixThreadBuilder {
clear_child_tid,
sig_mask,
sig_queues,
priority,
} = self;
Arc::new_cyclic(|weak_task| {
@ -111,13 +119,10 @@ impl PosixThreadBuilder {
}
};
let status = ThreadStatus::Init;
let priority = Priority::default();
let cpu_affinity = CpuSet::new_full();
let thread = Arc::new(Thread::new(
weak_task.clone(),
posix_thread,
status,
priority,
cpu_affinity,
));

View File

@ -1,7 +1,9 @@
// SPDX-License-Identifier: MPL-2.0
pub mod priority;
// TODO: Remove this out-dated module once the `sched_class` module is stable.
mod priority_scheduler;
mod sched_class;
mod stats;
// Export the stats getter functions.
@ -9,4 +11,4 @@ pub use stats::{loadavg, nr_queued_and_running};
// There may be multiple scheduling policies in the system,
// and subsequent schedulers can be placed under this module.
pub use self::priority_scheduler::init;
pub use self::sched_class::{init, SchedAttr};

View File

@ -23,7 +23,7 @@ impl Nice {
Self(range)
}
pub fn range(&self) -> &NiceRange {
pub const fn range(&self) -> &NiceRange {
&self.0
}
@ -74,7 +74,15 @@ impl Priority {
Self(range)
}
pub fn range(&self) -> &PriorityRange {
pub const fn default_real_time() -> Self {
Self::new(PriorityRange::new(50))
}
pub const fn idle() -> Self {
Self::new(PriorityRange::new(PriorityRange::MAX))
}
pub const fn range(&self) -> &PriorityRange {
&self.0
}
@ -89,6 +97,12 @@ impl From<Nice> for Priority {
}
}
impl From<Priority> for Nice {
fn from(priority: Priority) -> Self {
Self::new(NiceRange::new((priority.range().get() - 100) as i8 - 20))
}
}
impl Default for Priority {
fn default() -> Self {
Nice::default().into()
@ -129,7 +143,7 @@ macro_rules! define_ranged_integer {
self.0 = val;
}
$visibility fn get(self) -> $type {
$visibility const fn get(self) -> $type {
self.0
}
}

View File

@ -16,11 +16,12 @@ use ostd::{
};
use super::{
priority::{Priority, PriorityRange},
priority::Priority,
stats::{set_stats_from_scheduler, SchedulerStats},
};
use crate::{prelude::*, thread::Thread};
#[allow(unused)]
pub fn init() {
let preempt_scheduler = Box::new(PreemptScheduler::default());
let scheduler = Box::<PreemptScheduler<Thread, Task>>::leak(preempt_scheduler);
@ -298,8 +299,8 @@ impl Default for TimeSlice {
}
impl PreemptSchedInfo for Thread {
const REAL_TIME_TASK_PRIORITY: Priority = Priority::new(PriorityRange::new(100));
const LOWEST_TASK_PRIORITY: Priority = Priority::new(PriorityRange::new(PriorityRange::MAX));
const REAL_TIME_TASK_PRIORITY: Priority = Priority::default_real_time();
const LOWEST_TASK_PRIORITY: Priority = Priority::idle();
fn priority(&self) -> Priority {
self.atomic_priority().load(Ordering::Relaxed)

View File

@ -0,0 +1,268 @@
// SPDX-License-Identifier: MPL-2.0
use alloc::{collections::binary_heap::BinaryHeap, sync::Arc};
use core::{
cmp::{self, Reverse},
sync::atomic::{AtomicU64, Ordering::*},
};
use ostd::{
cpu::{num_cpus, CpuId},
task::scheduler::{EnqueueFlags, UpdateFlags},
};
use super::{
time::{base_slice_clocks, min_period_clocks},
CurrentRuntime, SchedAttr, SchedClassRq,
};
use crate::{
sched::priority::{Nice, NiceRange},
thread::Thread,
};
const WEIGHT_0: u64 = 1024;
pub const fn nice_to_weight(nice: Nice) -> u64 {
// Calculated by the formula below:
//
// weight = 1024 * 1.25^(-nice)
//
// We propose that every increment of the nice value results
// in 12.5% change of the CPU load weight.
const FACTOR_NUMERATOR: u64 = 5;
const FACTOR_DENOMINATOR: u64 = 4;
const NICE_TO_WEIGHT: [u64; 40] = const {
let mut ret = [0; 40];
let mut index = 0;
let mut nice = NiceRange::MIN;
while nice <= NiceRange::MAX {
ret[index] = match nice {
0 => WEIGHT_0,
nice @ 1.. => {
let numerator = FACTOR_DENOMINATOR.pow(nice as u32);
let denominator = FACTOR_NUMERATOR.pow(nice as u32);
WEIGHT_0 * numerator / denominator
}
nice => {
let numerator = FACTOR_NUMERATOR.pow((-nice) as u32);
let denominator = FACTOR_DENOMINATOR.pow((-nice) as u32);
WEIGHT_0 * numerator / denominator
}
};
index += 1;
nice += 1;
}
ret
};
NICE_TO_WEIGHT[(nice.range().get() + 20) as usize]
}
/// The scheduling entity for the FAIR scheduling class.
///
/// The structure contains a significant indicator: `vruntime`.
///
/// # `vruntime`
///
/// The vruntime (virtual runtime) is calculated by the formula:
///
/// vruntime += runtime_delta * WEIGHT_0 / weight
///
/// and a thread with a lower vruntime gains a greater privilege to be
/// scheduled, making the whole run queue balanced on vruntime (thus FAIR).
///
/// # Scheduling periods
///
/// Scheduling periods is designed to calculate the time slice for each threads.
///
/// The time slice for each threads is calculated by the formula:
///
/// time_slice = period * weight / total_weight
///
/// where `total_weight` is the sum of all weights in the run queue including
/// the current thread and [`period`](FairClassRq::period) is calculated
/// regarding the number of running threads.
///
/// When a thread meets the condition below, it will be preempted to the
/// run queue. See [`FairClassRq::update_current`] for more details.
///
/// period_delta > time_slice
/// || vruntime > rq_min_vruntime + normalized_time_slice
#[derive(Debug)]
pub struct FairAttr {
weight: AtomicU64,
vruntime: AtomicU64,
}
impl FairAttr {
pub fn new(nice: Nice) -> Self {
FairAttr {
weight: nice_to_weight(nice).into(),
vruntime: Default::default(),
}
}
pub fn update(&self, nice: Nice) {
self.weight.store(nice_to_weight(nice), Relaxed);
}
fn update_vruntime(&self, delta: u64) -> (u64, u64) {
let weight = self.weight.load(Relaxed);
let delta = delta * WEIGHT_0 / weight;
let vruntime = self.vruntime.fetch_add(delta, Relaxed) + delta;
(vruntime, weight)
}
}
/// The wrapper for threads in the FAIR run queue.
///
/// This structure is used to provide the capability for keying in the
/// run queue implemented by `BTreeSet` in the `FairClassRq`.
struct FairQueueItem(Arc<Thread>);
impl core::fmt::Debug for FairQueueItem {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:?}", self.key())
}
}
impl FairQueueItem {
fn key(&self) -> u64 {
self.0.sched_attr().fair.vruntime.load(Relaxed)
}
}
impl PartialEq for FairQueueItem {
fn eq(&self, other: &Self) -> bool {
self.key().eq(&other.key())
}
}
impl Eq for FairQueueItem {}
impl PartialOrd for FairQueueItem {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for FairQueueItem {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.key().cmp(&other.key())
}
}
/// The per-cpu run queue for the FAIR scheduling class.
///
/// See [`FairAttr`] for the explanation of vruntimes and scheduling periods.
///
/// The structure contains a `BTreeSet` to store the threads in the run queue to
/// ensure the efficiency for finding next-to-run threads.
#[derive(Debug)]
pub(super) struct FairClassRq {
#[allow(unused)]
cpu: CpuId,
/// The ready-to-run threads.
threads: BinaryHeap<Reverse<FairQueueItem>>,
/// The minimum of vruntime in the run queue. Serves as the initial
/// value of newly-enqueued threads.
min_vruntime: u64,
total_weight: u64,
}
impl FairClassRq {
pub fn new(cpu: CpuId) -> Self {
Self {
cpu,
threads: BinaryHeap::new(),
min_vruntime: 0,
total_weight: 0,
}
}
/// The scheduling period is calculated as the maximum of the following two values:
///
/// 1. The minimum period value, defined by [`min_period_clocks`].
/// 2. `period = min_granularity * n` where
/// `min_granularity = log2(1 + num_cpus) * base_slice_clocks`, and `n` is the number of
/// runnable threads (including the current running thread).
///
/// The formula is chosen by 3 principles:
///
/// 1. The scheduling period should reflect the running threads and CPUs;
/// 2. The scheduling period should not be too low to limit the overhead of context switching;
/// 3. The scheduling period should not be too high to ensure the scheduling latency
/// & responsiveness.
fn period(&self) -> u64 {
let base_slice_clks = base_slice_clocks();
let min_period_clks = min_period_clocks();
// `+ 1` means including the current running thread.
let period_single_cpu =
(base_slice_clks * (self.threads.len() + 1) as u64).max(min_period_clks);
period_single_cpu * u64::from((1 + num_cpus()).ilog2())
}
/// The virtual time slice for each thread in the run queue, measured in vruntime clocks.
fn vtime_slice(&self) -> u64 {
self.period() / (self.threads.len() + 1) as u64
}
/// The time slice for each thread in the run queue, measured in sched clocks.
fn time_slice(&self, cur_weight: u64) -> u64 {
self.period() * cur_weight / (self.total_weight + cur_weight)
}
}
impl SchedClassRq for FairClassRq {
fn enqueue(&mut self, thread: Arc<Thread>, flags: Option<EnqueueFlags>) {
let fair_attr = &thread.sched_attr().fair;
let vruntime = match flags {
Some(EnqueueFlags::Spawn) => self.min_vruntime + self.vtime_slice(),
_ => self.min_vruntime,
};
fair_attr.vruntime.fetch_max(vruntime, Relaxed);
self.total_weight += fair_attr.weight.load(Relaxed);
self.threads.push(Reverse(FairQueueItem(thread)));
}
fn len(&mut self) -> usize {
self.threads.len()
}
fn is_empty(&mut self) -> bool {
self.threads.is_empty()
}
fn pick_next(&mut self) -> Option<Arc<Thread>> {
let Reverse(FairQueueItem(thread)) = self.threads.pop()?;
self.total_weight -= thread.sched_attr().fair.weight.load(Relaxed);
Some(thread)
}
fn update_current(
&mut self,
rt: &CurrentRuntime,
attr: &SchedAttr,
flags: UpdateFlags,
) -> bool {
match flags {
UpdateFlags::Yield => true,
UpdateFlags::Tick | UpdateFlags::Wait => {
let (vruntime, weight) = attr.fair.update_vruntime(rt.delta);
self.min_vruntime = match self.threads.peek() {
Some(Reverse(leftmost)) => vruntime.min(leftmost.key()),
None => vruntime,
};
rt.period_delta > self.time_slice(weight)
|| vruntime > self.min_vruntime + self.vtime_slice()
}
}
}
}

View File

@ -0,0 +1,55 @@
// SPDX-License-Identifier: MPL-2.0
use super::*;
/// The per-cpu run queue for the IDLE scheduling class.
///
/// This run queue is used for the per-cpu idle thread, if any.
pub(super) struct IdleClassRq {
thread: Option<Arc<Thread>>,
}
impl IdleClassRq {
pub fn new() -> Self {
Self { thread: None }
}
}
impl core::fmt::Debug for IdleClassRq {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
if self.thread.is_some() {
write!(f, "Idle: occupied")?;
} else {
write!(f, "Idle: empty")?;
}
Ok(())
}
}
impl SchedClassRq for IdleClassRq {
fn enqueue(&mut self, thread: Arc<Thread>, _: Option<EnqueueFlags>) {
let ptr = Arc::as_ptr(&thread);
if let Some(t) = self.thread.replace(thread)
&& ptr != Arc::as_ptr(&t)
{
panic!("Multiple `idle` threads spawned")
}
}
fn len(&mut self) -> usize {
usize::from(!self.is_empty())
}
fn is_empty(&mut self) -> bool {
self.thread.is_none()
}
fn pick_next(&mut self) -> Option<Arc<Thread>> {
self.thread.clone()
}
fn update_current(&mut self, _: &CurrentRuntime, _: &SchedAttr, _flags: UpdateFlags) -> bool {
// Idle threads has the greatest priority value. They should always be preempted.
true
}
}

View File

@ -0,0 +1,373 @@
// SPDX-License-Identifier: MPL-2.0
#![warn(unused)]
use alloc::{boxed::Box, sync::Arc};
use core::{fmt, sync::atomic::AtomicU64};
use ostd::{
cpu::{all_cpus, AtomicCpuSet, CpuId, PinCurrentCpu},
sync::SpinLock,
task::{
scheduler::{
info::CommonSchedInfo, inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler,
UpdateFlags,
},
Task,
},
trap::disable_local,
};
mod time;
mod fair;
mod idle;
mod real_time;
mod stop;
use ostd::arch::read_tsc as sched_clock;
use super::{
priority::{Nice, Priority, RangedU8},
stats::SchedulerStats,
};
use crate::thread::{AsThread, Thread};
#[allow(unused)]
pub fn init() {
inject_scheduler(Box::leak(Box::new(ClassScheduler::new())));
}
/// Represents the middle layer between scheduling classes and generic scheduler
/// traits. It consists of all the sets of run queues for CPU cores. Other global
/// information may also be stored here.
pub struct ClassScheduler {
rqs: Box<[SpinLock<PerCpuClassRqSet>]>,
}
/// Represents the run queue for each CPU core. It stores a list of run queues for
/// scheduling classes in its corresponding CPU core. The current task of this CPU
/// core is also stored in this structure.
struct PerCpuClassRqSet {
stop: Arc<stop::StopClassRq>,
real_time: real_time::RealTimeClassRq,
fair: fair::FairClassRq,
idle: idle::IdleClassRq,
current: Option<(Arc<Task>, CurrentRuntime)>,
}
/// Stores the runtime information of the current task.
///
/// This is used to calculate the time slice of the current task.
///
/// This struct is independent of the current `Arc<Task>` instead encapsulating the
/// task, because the scheduling class implementations use `CurrentRuntime` and
/// `SchedAttr` only.
struct CurrentRuntime {
start: u64,
delta: u64,
period_delta: u64,
}
impl CurrentRuntime {
fn new() -> Self {
CurrentRuntime {
start: sched_clock(),
delta: 0,
period_delta: 0,
}
}
fn update(&mut self) {
let now = sched_clock();
self.delta = now - core::mem::replace(&mut self.start, now);
self.period_delta += self.delta;
}
}
/// The run queue for scheduling classes (the main trait). Scheduling classes
/// should implement this trait to function as expected.
trait SchedClassRq: Send + fmt::Debug {
/// Enqueues a task into the run queue.
fn enqueue(&mut self, thread: Arc<Thread>, flags: Option<EnqueueFlags>);
/// Returns the number of threads in the run queue.
fn len(&mut self) -> usize;
/// Checks if the run queue is empty.
fn is_empty(&mut self) -> bool {
self.len() == 0
}
/// Picks the next task for running.
fn pick_next(&mut self) -> Option<Arc<Thread>>;
/// Update the information of the current task.
fn update_current(&mut self, rt: &CurrentRuntime, attr: &SchedAttr, flags: UpdateFlags)
-> bool;
}
pub use real_time::RealTimePolicy;
/// The User-chosen scheduling policy.
///
/// The scheduling policies are specified by the user, usually through its priority.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum SchedPolicy {
Stop,
RealTime {
rt_prio: real_time::RtPrio,
rt_policy: RealTimePolicy,
},
Fair(Nice),
Idle,
}
impl From<Priority> for SchedPolicy {
fn from(priority: Priority) -> Self {
match priority.range().get() {
0 => SchedPolicy::Stop,
rt @ 1..=99 => SchedPolicy::RealTime {
rt_prio: RangedU8::new(rt),
rt_policy: Default::default(),
},
100..=139 => SchedPolicy::Fair(priority.into()),
_ => SchedPolicy::Idle,
}
}
}
/// The scheduling attribute for a thread.
///
/// This is used to store the scheduling policy and runtime parameters for each
/// scheduling class.
#[derive(Debug)]
pub struct SchedAttr {
policy: SpinLock<SchedPolicy>,
real_time: real_time::RealTimeAttr,
fair: fair::FairAttr,
}
impl SchedAttr {
/// Constructs a new `SchedAttr` with the given scheduling policy.
pub fn new(policy: SchedPolicy) -> Self {
Self {
policy: SpinLock::new(policy),
real_time: {
let (prio, policy) = match policy {
SchedPolicy::RealTime { rt_prio, rt_policy } => (rt_prio.get(), rt_policy),
_ => (real_time::RtPrio::MAX, Default::default()),
};
real_time::RealTimeAttr::new(prio, policy)
},
fair: fair::FairAttr::new(match policy {
SchedPolicy::Fair(nice) => nice,
_ => Nice::default(),
}),
}
}
/// Retrieves the current scheduling policy of the thread.
pub fn policy(&self) -> SchedPolicy {
*self.policy.lock()
}
/// Updates the scheduling policy of the thread.
///
/// Specifically for real-time policies, if the new policy doesn't
/// specify a base slice factor for RR, the old one will be kept.
pub fn set_policy(&self, mut policy: SchedPolicy) {
let mut guard = self.policy.lock();
match policy {
SchedPolicy::RealTime { rt_prio, rt_policy } => {
self.real_time.update(rt_prio.get(), rt_policy);
}
SchedPolicy::Fair(nice) => self.fair.update(nice),
_ => {}
}
// Keep the old base slice factor if the new policy doesn't specify one.
if let (
SchedPolicy::RealTime {
rt_policy:
RealTimePolicy::RoundRobin {
base_slice_factor: slot,
},
..
},
SchedPolicy::RealTime {
rt_policy: RealTimePolicy::RoundRobin { base_slice_factor },
..
},
) = (*guard, &mut policy)
{
*base_slice_factor = slot.or(*base_slice_factor);
}
*guard = policy;
}
}
impl Scheduler for ClassScheduler {
fn enqueue(&self, task: Arc<Task>, flags: EnqueueFlags) -> Option<CpuId> {
let thread = task.as_thread()?;
let (still_in_rq, cpu) = {
let selected_cpu_id = self.select_cpu(thread.atomic_cpu_affinity());
if let Err(task_cpu_id) = task.cpu().set_if_is_none(selected_cpu_id) {
debug_assert!(flags != EnqueueFlags::Spawn);
(true, task_cpu_id)
} else {
(false, selected_cpu_id)
}
};
let mut rq = self.rqs[cpu.as_usize()].disable_irq().lock();
// Note: call set_if_is_none again to prevent a race condition.
if still_in_rq && task.cpu().set_if_is_none(cpu).is_err() {
return None;
}
rq.enqueue_thread(thread, Some(flags));
Some(cpu)
}
fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue)) {
let guard = disable_local();
let mut lock = self.rqs[guard.current_cpu().as_usize()].lock();
f(&mut *lock)
}
fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue)) {
let guard = disable_local();
f(&*self.rqs[guard.current_cpu().as_usize()].lock())
}
}
impl ClassScheduler {
pub fn new() -> Self {
let stop = stop::StopClassRq::new();
let class_rq = |cpu| {
SpinLock::new(PerCpuClassRqSet {
stop: stop.clone(),
real_time: real_time::RealTimeClassRq::new(cpu),
fair: fair::FairClassRq::new(cpu),
idle: idle::IdleClassRq::new(),
current: None,
})
};
ClassScheduler {
rqs: all_cpus().map(class_rq).collect(),
}
}
// TODO: Implement a better algorithm and replace the current naive implementation.
fn select_cpu(&self, affinity: &AtomicCpuSet) -> CpuId {
let guard = disable_local();
let affinity = affinity.load();
let cur = guard.current_cpu();
if affinity.contains(cur) {
cur
} else {
affinity.iter().next().expect("empty affinity")
}
}
}
impl PerCpuClassRqSet {
fn pick_next_thread(&mut self) -> Option<Arc<Thread>> {
(self.stop.pick_next())
.or_else(|| self.real_time.pick_next())
.or_else(|| self.fair.pick_next())
.or_else(|| self.idle.pick_next())
}
fn enqueue_thread(&mut self, thread: &Arc<Thread>, flags: Option<EnqueueFlags>) {
let attr = thread.sched_attr();
let cloned = thread.clone();
match *attr.policy.lock() {
SchedPolicy::Stop => self.stop.enqueue(cloned, flags),
SchedPolicy::RealTime { .. } => self.real_time.enqueue(cloned, flags),
SchedPolicy::Fair(_) => self.fair.enqueue(cloned, flags),
SchedPolicy::Idle => self.idle.enqueue(cloned, flags),
};
}
fn nr_queued_and_running(&mut self) -> (u32, u32) {
let queued = self.stop.len() + self.real_time.len() + self.fair.len() + self.idle.len();
let running = usize::from(self.current.is_some());
(queued as u32, running as u32)
}
}
impl LocalRunQueue for PerCpuClassRqSet {
fn current(&self) -> Option<&Arc<Task>> {
self.current.as_ref().map(|(task, _)| task)
}
fn pick_next_current(&mut self) -> Option<&Arc<Task>> {
self.pick_next_thread().and_then(|next| {
let next_task = next.task();
if let Some((old_task, _)) = self
.current
.replace((next_task.clone(), CurrentRuntime::new()))
{
if Arc::ptr_eq(&old_task, &next_task) {
return None;
}
let old = old_task.as_thread().unwrap();
self.enqueue_thread(old, None);
}
self.current.as_ref().map(|(task, _)| task)
})
}
fn update_current(&mut self, flags: UpdateFlags) -> bool {
if let Some((cur_task, rt)) = &mut self.current
&& let Some(cur) = cur_task.as_thread()
{
rt.update();
let attr = &cur.sched_attr();
let (current_expired, lookahead) = match &*attr.policy.lock() {
SchedPolicy::Stop => (self.stop.update_current(rt, attr, flags), 0),
SchedPolicy::RealTime { .. } => (self.real_time.update_current(rt, attr, flags), 1),
SchedPolicy::Fair(_) => (self.fair.update_current(rt, attr, flags), 2),
SchedPolicy::Idle => (self.idle.update_current(rt, attr, flags), 3),
};
current_expired
|| (lookahead >= 1 && !self.stop.is_empty())
|| (lookahead >= 2 && !self.real_time.is_empty())
|| (lookahead >= 3 && !self.fair.is_empty())
} else {
true
}
}
fn dequeue_current(&mut self) -> Option<Arc<Task>> {
self.current.take().map(|(cur_task, _)| {
cur_task.schedule_info().cpu.set_to_none();
cur_task
})
}
}
impl SchedulerStats for ClassScheduler {
fn nr_queued_and_running(&self) -> (u32, u32) {
self.rqs.iter().fold((0, 0), |(queued, running), rq| {
let (q, r) = rq.lock().nr_queued_and_running();
(queued + q, running + r)
})
}
}
impl Default for ClassScheduler {
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,203 @@
// SPDX-License-Identifier: MPL-2.0
use alloc::collections::vec_deque::VecDeque;
use core::{
array,
num::NonZero,
sync::atomic::{AtomicU8, Ordering::*},
};
use bitvec::{bitarr, BitArr};
use super::{time::base_slice_clocks, *};
pub type RtPrio = RangedU8<1, 99>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RealTimePolicy {
Fifo,
RoundRobin {
base_slice_factor: Option<NonZero<u64>>,
},
}
impl Default for RealTimePolicy {
fn default() -> Self {
Self::RoundRobin {
base_slice_factor: None,
}
}
}
impl RealTimePolicy {
fn to_time_slice(self) -> u64 {
match self {
RealTimePolicy::RoundRobin { base_slice_factor } => {
base_slice_clocks()
* base_slice_factor.map_or(DEFAULT_BASE_SLICE_FACTOR, NonZero::get)
}
RealTimePolicy::Fifo => 0,
}
}
}
/// The scheduling attribute for the REAL-TIME scheduling class.
///
/// This structure provides not-only the priority of the thread,
/// but also the time slice for the thread, measured in [`sched_clock`]s.
///
/// - If the time slice is not set, the thread is considered to be a FIFO
/// thread, and will be executed to its end if there no thread with a
/// lower priority.
/// - If the time slice is set, the thread is considered to be an RR
/// (round-robin) thread, and will be executed for the time slice, and
/// then it will be put back to the inactive array.
#[derive(Debug)]
pub struct RealTimeAttr {
prio: AtomicU8,
time_slice: AtomicU64, // 0 for SCHED_FIFO; other for SCHED_RR
}
const DEFAULT_BASE_SLICE_FACTOR: u64 = 20;
impl RealTimeAttr {
pub fn new(prio: u8, policy: RealTimePolicy) -> Self {
RealTimeAttr {
prio: prio.into(),
time_slice: AtomicU64::new(policy.to_time_slice()),
}
}
pub fn update(&self, prio: u8, policy: RealTimePolicy) {
self.prio.store(prio, Relaxed);
self.time_slice.store(policy.to_time_slice(), Relaxed);
}
}
struct PrioArray {
map: BitArr![for 100],
queue: [VecDeque<Arc<Thread>>; 100],
}
impl core::fmt::Debug for PrioArray {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("PrioArray")
.field_with("map", |f| {
f.debug_list().entries(self.map.iter_ones()).finish()
})
.field_with("queue", |f| {
f.debug_list()
.entries((self.queue.iter().flatten()).map(|thread| thread.sched_attr()))
.finish()
})
.finish()
}
}
impl PrioArray {
fn enqueue(&mut self, thread: Arc<Thread>, prio: u8) {
let queue = &mut self.queue[usize::from(prio)];
let is_empty = queue.is_empty();
queue.push_back(thread);
if is_empty {
self.map.set(usize::from(prio), true);
}
}
fn pop(&mut self) -> Option<Arc<Thread>> {
let mut iter = self.map.iter_ones();
let prio = iter.next()? as u8;
let queue = &mut self.queue[usize::from(prio)];
let thread = queue.pop_front()?;
if queue.is_empty() {
self.map.set(usize::from(prio), false);
}
Some(thread)
}
}
/// The per-cpu run queue for the REAL-TIME scheduling class.
///
/// The REAL-TIME scheduling class is implemented as a classic O(1)
/// priority algorithm.
///
/// It uses a bit array to track which priority levels have runnable
/// threads, and a vector of queues to store the threads.
///
/// Threads are popped & dequeued from the active array (`array[index]`), and
/// are enqueued into the inactive array (`array[!index]`). When the active array
/// is empty, the 2 arrays are swapped by `index`.
#[derive(Debug)]
pub(super) struct RealTimeClassRq {
#[allow(unused)]
cpu: CpuId,
index: bool,
array: [PrioArray; 2],
}
impl RealTimeClassRq {
pub fn new(cpu: CpuId) -> RealTimeClassRq {
RealTimeClassRq {
cpu,
index: false,
array: array::from_fn(|_| PrioArray {
map: bitarr![0; 100],
queue: array::from_fn(|_| VecDeque::new()),
}),
}
}
fn active_array(&mut self) -> &mut PrioArray {
&mut self.array[usize::from(self.index)]
}
fn inactive_array(&mut self) -> &mut PrioArray {
&mut self.array[usize::from(!self.index)]
}
fn swap_arrays(&mut self) {
self.index = !self.index;
}
}
impl SchedClassRq for RealTimeClassRq {
fn enqueue(&mut self, thread: Arc<Thread>, _: Option<EnqueueFlags>) {
let prio = thread.sched_attr().real_time.prio.load(Relaxed);
self.inactive_array().enqueue(thread, prio);
}
fn len(&mut self) -> usize {
self.active_array().map.count_ones() + self.inactive_array().map.count_ones()
}
fn is_empty(&mut self) -> bool {
self.active_array().map.is_empty() && self.inactive_array().map.is_empty()
}
fn pick_next(&mut self) -> Option<Arc<Thread>> {
self.active_array().pop().or_else(|| {
self.swap_arrays();
self.active_array().pop()
})
}
fn update_current(
&mut self,
rt: &CurrentRuntime,
attr: &SchedAttr,
flags: UpdateFlags,
) -> bool {
let attr = &attr.real_time;
match flags {
UpdateFlags::Tick | UpdateFlags::Wait => match attr.time_slice.load(Relaxed) {
0 => (self.inactive_array().map.iter_ones().next())
.is_some_and(|prio| prio > usize::from(attr.prio.load(Relaxed))),
ts => ts <= rt.period_delta,
},
UpdateFlags::Yield => true,
}
}
}

View File

@ -0,0 +1,55 @@
// SPDX-License-Identifier: MPL-2.0
use super::*;
/// The per-cpu run queue for the STOP scheduling class.
///
/// This is a singleton class, meaning that only one thread can be in this class at a time.
/// This is used for the most critical tasks, such as powering off and rebooting.
pub(super) struct StopClassRq {
thread: SpinLock<Option<Arc<Thread>>>,
}
impl StopClassRq {
pub fn new() -> Arc<Self> {
Arc::new(StopClassRq {
thread: SpinLock::new(None),
})
}
}
impl core::fmt::Debug for StopClassRq {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
if self.thread.lock().is_some() {
write!(f, "Stop: occupied")?;
} else {
write!(f, "Stop: empty")?;
}
Ok(())
}
}
impl SchedClassRq for Arc<StopClassRq> {
fn enqueue(&mut self, thread: Arc<Thread>, _: Option<EnqueueFlags>) {
if self.thread.lock().replace(thread).is_some() {
panic!("Multiple `stop` threads spawned")
}
}
fn len(&mut self) -> usize {
usize::from(!self.is_empty())
}
fn is_empty(&mut self) -> bool {
self.thread.lock().is_none()
}
fn pick_next(&mut self) -> Option<Arc<Thread>> {
self.thread.lock().take()
}
fn update_current(&mut self, _: &CurrentRuntime, _: &SchedAttr, _flags: UpdateFlags) -> bool {
// Stop threads has the lowest priority value. They should never be preempted.
false
}
}

View File

@ -0,0 +1,53 @@
// SPDX-License-Identifier: MPL-2.0
use core::mem;
use spin::Once;
/// Returns the numerator and denominator of the ratio R:
///
/// R = 10^9 (ns in a sec) / TSC clock frequency
fn tsc_factors() -> (u64, u64) {
static FACTORS: Once<(u64, u64)> = Once::new();
*FACTORS.call_once(|| {
let freq = ostd::arch::tsc_freq();
assert_ne!(freq, 0);
let mut a = 1_000_000_000;
let mut b = freq;
if a < b {
mem::swap(&mut a, &mut b);
}
while a > 1 && b > 1 {
let t = a;
a = b;
b = t % b;
}
let gcd = if a <= 1 { b } else { a };
(1_000_000_000 / gcd, freq / gcd)
})
}
/// The base time slice allocated for every thread, measured in nanoseconds.
pub const BASE_SLICE_NS: u64 = 750_000;
/// The minimum scheduling period, measured in nanoseconds.
pub const MIN_PERIOD_NS: u64 = 6_000_000;
fn consts() -> (u64, u64) {
static CONSTS: Once<(u64, u64)> = Once::new();
*CONSTS.call_once(|| {
let (a, b) = tsc_factors();
(BASE_SLICE_NS * b / a, MIN_PERIOD_NS * b / a)
})
}
/// Returns the base time slice allocated for every thread, measured in TSC clock units.
pub fn base_slice_clocks() -> u64 {
consts().0
}
/// Returns the minimum scheduling period, measured in TSC clock units.
pub fn min_period_clocks() -> u64 {
consts().1
}

View File

@ -5,7 +5,7 @@ use ostd::{
task::{Task, TaskOptions},
};
use super::{oops, status::ThreadStatus, AsThread, Thread};
use super::{oops, AsThread, Thread};
use crate::{prelude::*, sched::priority::Priority};
/// The inner data of a kernel thread.
@ -58,13 +58,11 @@ impl ThreadOptions {
Arc::new_cyclic(|weak_task| {
let thread = {
let kernel_thread = KernelThread;
let status = ThreadStatus::Init;
let priority = self.priority;
let cpu_affinity = self.cpu_affinity;
Arc::new(Thread::new(
weak_task.clone(),
kernel_thread,
status,
priority,
cpu_affinity,
))

View File

@ -12,7 +12,10 @@ use ostd::{
use self::status::{AtomicThreadStatus, ThreadStatus};
use crate::{
prelude::*,
sched::priority::{AtomicPriority, Priority},
sched::{
priority::{AtomicPriority, Priority},
SchedAttr,
},
};
pub mod exception;
@ -40,6 +43,7 @@ pub struct Thread {
priority: AtomicPriority,
/// Thread CPU affinity
cpu_affinity: AtomicCpuSet,
sched_attr: SchedAttr,
}
impl Thread {
@ -47,16 +51,16 @@ impl Thread {
pub fn new(
task: Weak<Task>,
data: impl Send + Sync + Any,
status: ThreadStatus,
priority: Priority,
cpu_affinity: CpuSet,
) -> Self {
Thread {
task,
data: Box::new(data),
status: AtomicThreadStatus::new(status),
status: AtomicThreadStatus::new(ThreadStatus::Init),
priority: AtomicPriority::new(priority),
cpu_affinity: AtomicCpuSet::new(cpu_affinity),
sched_attr: SchedAttr::new(priority.into()),
}
}
@ -140,6 +144,10 @@ impl Thread {
&self.cpu_affinity
}
pub fn sched_attr(&self) -> &SchedAttr {
&self.sched_attr
}
/// Yields the execution to another thread.
///
/// This method will return once the current thread is scheduled again.

View File

@ -10,7 +10,7 @@ use ostd::{
use super::worker_pool::WorkerPool;
use crate::{
prelude::*,
sched::priority::{Priority, PriorityRange},
sched::priority::Priority,
thread::{kernel_thread::ThreadOptions, AsThread},
};
@ -52,8 +52,7 @@ impl Worker {
cpu_affinity.add(bound_cpu);
let mut priority = Priority::default();
if worker_pool.upgrade().unwrap().is_high_priority() {
// FIXME: remove the use of real-time priority.
priority = Priority::new(PriorityRange::new(0));
priority = Priority::default_real_time();
}
let bound_task = ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)

View File

@ -16,7 +16,7 @@ use ostd::{
use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue};
use crate::{
prelude::*,
sched::priority::{Priority, PriorityRange},
sched::priority::Priority,
thread::{kernel_thread::ThreadOptions, AsThread},
};
@ -242,7 +242,7 @@ impl Monitor {
// This workaround is to make the monitor of high-priority worker pool
// starvation-free under the current scheduling policy.
let priority = match priority {
WorkPriority::High => Priority::new(PriorityRange::new(0)),
WorkPriority::High => Priority::default_real_time(),
WorkPriority::Normal => Priority::default(),
};
let bound_task = ThreadOptions::new(task_fn)

View File

@ -138,8 +138,8 @@ where
// `dequeue_current` method and nothing bad will happen. This may need to be revisited
// after more complex schedulers are introduced.
current = local_rq.dequeue_current();
local_rq.update_current(UpdateFlags::Wait);
current = local_rq.dequeue_current();
}
if let Some(next_task) = local_rq.pick_next_current() {