// SPDX-License-Identifier: MPL-2.0 use core::sync::atomic::Ordering; use ostd::{ cpu::{num_cpus, CpuId, CpuSet, PinCurrentCpu}, task::{ disable_preempt, scheduler::{ info::CommonSchedInfo, inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, UpdateFlags, }, Task, }, trap::disable_local, }; use super::{ priority::Priority, stats::{set_stats_from_scheduler, SchedulerStats}, }; use crate::{prelude::*, thread::Thread}; pub fn init() { let preempt_scheduler = Box::new(PreemptScheduler::default()); let scheduler = Box::>::leak(preempt_scheduler); // Inject the scheduler into the ostd for actual scheduling work. inject_scheduler(scheduler); // Set the scheduler into the system for statistics. // We set this after injecting the scheduler into ostd, // so that the loadavg statistics are updated after the scheduler is used. set_stats_from_scheduler(scheduler); } /// The preempt scheduler. /// /// Real-time tasks are placed in the `real_time_entities` queue and /// are always prioritized during scheduling. /// Normal tasks are placed in the `normal_entities` queue and are only /// scheduled for execution when there are no real-time tasks. struct PreemptScheduler, U: CommonSchedInfo> { rq: Vec>>, } impl, U: CommonSchedInfo> PreemptScheduler { fn new(nr_cpus: usize) -> Self { let mut rq = Vec::with_capacity(nr_cpus); for _ in 0..nr_cpus { rq.push(SpinLock::new(PreemptRunQueue::new())); } Self { rq } } /// Selects a CPU for task to run on for the first time. fn select_cpu(&self, entity: &PreemptSchedEntity) -> CpuId { // If the CPU of a runnable task has been set before, keep scheduling // the task to that one. // TODO: Consider migrating tasks between CPUs for load balancing. if let Some(cpu_id) = entity.task.cpu().get() { return cpu_id; } let irq_guard = disable_local(); let mut selected = irq_guard.current_cpu(); let mut minimum_load = usize::MAX; for candidate in entity.thread.cpu_affinity().iter() { let rq = self.rq[candidate.as_usize()].lock(); // A wild guess measuring the load of a runqueue. We assume that // real-time tasks are 4-times as important as normal tasks. let load = rq.real_time_entities.len() * 8 + rq.normal_entities.len() * 2 + rq.lowest_entities.len(); if load < minimum_load { selected = candidate; minimum_load = load; } } selected } } impl, U: Sync + Send + CommonSchedInfo> Scheduler for PreemptScheduler { fn enqueue(&self, task: Arc, flags: EnqueueFlags) -> Option { let entity = PreemptSchedEntity::new(task); let (still_in_rq, target_cpu) = { let selected_cpu_id = self.select_cpu(&entity); if let Err(task_cpu_id) = entity.task.cpu().set_if_is_none(selected_cpu_id) { debug_assert!(flags != EnqueueFlags::Spawn); (true, task_cpu_id) } else { (false, selected_cpu_id) } }; let mut rq = self.rq[target_cpu.as_usize()].disable_irq().lock(); if still_in_rq && let Err(_) = entity.task.cpu().set_if_is_none(target_cpu) { return None; } let new_priority = entity.thread.priority(); if entity.thread.is_real_time() { rq.real_time_entities.push_back(entity); } else if entity.thread.is_lowest() { rq.lowest_entities.push_back(entity); } else { rq.normal_entities.push_back(entity); } // Preempt the current task, but only if the newly queued task has a strictly higher // priority (i.e., a lower value returned by the `priority` method) than the current task. if rq .current .as_ref() .is_some_and(|current| new_priority < current.thread.priority()) { Some(target_cpu) } else { None } } fn local_rq_with(&self, f: &mut dyn FnMut(&dyn LocalRunQueue)) { let irq_guard = disable_local(); let local_rq: &PreemptRunQueue = &self.rq[irq_guard.current_cpu().as_usize()].lock(); f(local_rq); } fn local_mut_rq_with(&self, f: &mut dyn FnMut(&mut dyn LocalRunQueue)) { let irq_guard = disable_local(); let local_rq: &mut PreemptRunQueue = &mut self.rq[irq_guard.current_cpu().as_usize()].lock(); f(local_rq); } } impl, U: Sync + Send + CommonSchedInfo> SchedulerStats for PreemptScheduler { fn nr_queued_and_running(&self) -> (u32, u32) { let _preempt_guard = disable_preempt(); let mut nr_queued = 0; let mut nr_running = 0; for rq in self.rq.iter() { let rq = rq.lock(); nr_queued += rq.real_time_entities.len() + rq.normal_entities.len() + rq.lowest_entities.len(); if rq.current.is_some() { nr_running += 1; } } (nr_queued as u32, nr_running) } } impl Default for PreemptScheduler { fn default() -> Self { Self::new(num_cpus()) } } struct PreemptRunQueue, U: CommonSchedInfo> { current: Option>, real_time_entities: VecDeque>, normal_entities: VecDeque>, lowest_entities: VecDeque>, } impl, U: CommonSchedInfo> PreemptRunQueue { pub fn new() -> Self { Self { current: None, real_time_entities: VecDeque::new(), normal_entities: VecDeque::new(), lowest_entities: VecDeque::new(), } } } impl, U: CommonSchedInfo> LocalRunQueue for PreemptRunQueue { fn current(&self) -> Option<&Arc> { self.current.as_ref().map(|entity| &entity.task) } fn update_current(&mut self, flags: UpdateFlags) -> bool { match flags { UpdateFlags::Tick => { let Some(ref mut current_entity) = self.current else { return false; }; current_entity.tick() || (!current_entity.thread.is_real_time() && !self.real_time_entities.is_empty()) } _ => true, } } fn pick_next_current(&mut self) -> Option<&Arc> { let next_entity = if !self.real_time_entities.is_empty() { self.real_time_entities.pop_front() } else if !self.normal_entities.is_empty() { self.normal_entities.pop_front() } else { self.lowest_entities.pop_front() }?; if let Some(prev_entity) = self.current.replace(next_entity) { if prev_entity.thread.is_real_time() { self.real_time_entities.push_back(prev_entity); } else if prev_entity.thread.is_lowest() { self.lowest_entities.push_back(prev_entity); } else { self.normal_entities.push_back(prev_entity); } } Some(&self.current.as_ref().unwrap().task) } fn dequeue_current(&mut self) -> Option> { self.current.take().map(|entity| { let runnable = entity.task; runnable.cpu().set_to_none(); runnable }) } } struct PreemptSchedEntity, U: CommonSchedInfo> { task: Arc, thread: Arc, time_slice: TimeSlice, } impl, U: CommonSchedInfo> PreemptSchedEntity { fn new(task: Arc) -> Self { let thread = T::from_task(&task); let time_slice = TimeSlice::default(); Self { task, thread, time_slice, } } fn tick(&mut self) -> bool { self.time_slice.elapse() } } impl, U: CommonSchedInfo> Clone for PreemptSchedEntity { fn clone(&self) -> Self { Self { task: self.task.clone(), thread: self.thread.clone(), time_slice: self.time_slice, } } } #[derive(Clone, Copy)] pub struct TimeSlice { elapsed_ticks: u32, } impl TimeSlice { const DEFAULT_TIME_SLICE: u32 = 100; pub const fn new() -> Self { TimeSlice { elapsed_ticks: 0 } } pub fn elapse(&mut self) -> bool { self.elapsed_ticks = (self.elapsed_ticks + 1) % Self::DEFAULT_TIME_SLICE; self.elapsed_ticks == 0 } } impl Default for TimeSlice { fn default() -> Self { Self::new() } } impl PreemptSchedInfo for Thread { const REAL_TIME_TASK_PRIORITY: Priority = Priority::default_real_time(); const LOWEST_TASK_PRIORITY: Priority = Priority::idle(); fn priority(&self) -> Priority { self.atomic_priority().load(Ordering::Relaxed) } fn cpu_affinity(&self) -> CpuSet { self.atomic_cpu_affinity().load() } } trait PreemptSchedInfo { const REAL_TIME_TASK_PRIORITY: Priority; const LOWEST_TASK_PRIORITY: Priority; fn priority(&self) -> Priority; fn cpu_affinity(&self) -> CpuSet; fn is_real_time(&self) -> bool { self.priority() < Self::REAL_TIME_TASK_PRIORITY } fn is_lowest(&self) -> bool { self.priority() == Self::LOWEST_TASK_PRIORITY } } impl FromTask for Thread { fn from_task(task: &Arc) -> Arc { task.data().downcast_ref::>().unwrap().clone() } } trait FromTask { fn from_task(task: &Arc) -> Arc; }