Distribute tasks among all the CPUs

This commit is contained in:
Zejun Zhao 2025-01-13 16:39:47 +08:00 committed by Tate, Hongliang Tian
parent d74d126e64
commit 29791ba77e
6 changed files with 47 additions and 23 deletions

View File

@ -235,11 +235,11 @@ impl SchedClassRq for FairClassRq {
self.entities.push(Reverse(FairQueueItem(entity, vruntime)));
}
fn len(&mut self) -> usize {
fn len(&self) -> usize {
self.entities.len()
}
fn is_empty(&mut self) -> bool {
fn is_empty(&self) -> bool {
self.entities.is_empty()
}

View File

@ -43,11 +43,11 @@ impl SchedClassRq for IdleClassRq {
}
}
fn len(&mut self) -> usize {
fn len(&self) -> usize {
usize::from(!self.is_empty())
}
fn is_empty(&mut self) -> bool {
fn is_empty(&self) -> bool {
self.entity.is_none()
}

View File

@ -7,14 +7,14 @@ use core::fmt;
use ostd::{
arch::read_tsc as sched_clock,
cpu::{all_cpus, AtomicCpuSet, CpuId, PinCurrentCpu},
cpu::{all_cpus, CpuId, PinCurrentCpu},
sync::SpinLock,
task::{
scheduler::{
info::CommonSchedInfo, inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler,
UpdateFlags,
},
Task,
AtomicCpuId, Task,
},
trap::disable_local,
};
@ -104,10 +104,10 @@ trait SchedClassRq: Send + fmt::Debug {
fn enqueue(&mut self, task: Arc<Task>, flags: Option<EnqueueFlags>);
/// Returns the number of threads in the run queue.
fn len(&mut self) -> usize;
fn len(&self) -> usize;
/// Checks if the run queue is empty.
fn is_empty(&mut self) -> bool {
fn is_empty(&self) -> bool {
self.len() == 0
}
@ -126,7 +126,7 @@ trait SchedClassRq: Send + fmt::Debug {
#[derive(Debug)]
pub struct SchedAttr {
policy: SchedPolicyState,
last_cpu: AtomicCpuId,
real_time: real_time::RealTimeAttr,
fair: fair::FairAttr,
}
@ -136,6 +136,7 @@ impl SchedAttr {
pub fn new(policy: SchedPolicy) -> Self {
Self {
policy: SchedPolicyState::new(policy),
last_cpu: AtomicCpuId::default(),
real_time: {
let (prio, policy) = match policy {
SchedPolicy::RealTime { rt_prio, rt_policy } => (rt_prio.get(), rt_policy),
@ -172,6 +173,14 @@ impl SchedAttr {
_ => {}
});
}
fn last_cpu(&self) -> Option<CpuId> {
self.last_cpu.get()
}
fn set_last_cpu(&self, cpu_id: CpuId) {
self.last_cpu.set_anyway(cpu_id);
}
}
impl Scheduler for ClassScheduler {
@ -179,7 +188,7 @@ impl Scheduler for ClassScheduler {
let thread = task.as_thread()?.clone();
let (still_in_rq, cpu) = {
let selected_cpu_id = self.select_cpu(thread.atomic_cpu_affinity());
let selected_cpu_id = self.select_cpu(&thread, flags);
if let Err(task_cpu_id) = task.cpu().set_if_is_none(selected_cpu_id) {
debug_assert!(flags != EnqueueFlags::Spawn);
@ -196,6 +205,7 @@ impl Scheduler for ClassScheduler {
return None;
}
thread.sched_attr().set_last_cpu(cpu);
rq.enqueue_entity((task, thread), Some(flags));
Some(cpu)
}
@ -229,15 +239,24 @@ impl ClassScheduler {
}
// TODO: Implement a better algorithm and replace the current naive implementation.
fn select_cpu(&self, affinity: &AtomicCpuSet) -> CpuId {
let guard = disable_local();
let affinity = affinity.load();
let cur = guard.current_cpu();
if affinity.contains(cur) {
cur
} else {
affinity.iter().next().expect("empty affinity")
fn select_cpu(&self, thread: &Thread, flags: EnqueueFlags) -> CpuId {
if let Some(last_cpu) = thread.sched_attr().last_cpu() {
return last_cpu;
}
debug_assert!(flags == EnqueueFlags::Spawn);
let affinity = thread.atomic_cpu_affinity();
let guard = disable_local();
let mut selected = guard.current_cpu();
let mut minimum_load = u32::MAX;
for candidate in affinity.load().iter() {
let rq = self.rqs[candidate.as_usize()].lock();
let (load, _) = rq.nr_queued_and_running();
if load < minimum_load {
minimum_load = load;
selected = candidate;
}
}
selected
}
}
@ -262,7 +281,7 @@ impl PerCpuClassRqSet {
}
}
fn nr_queued_and_running(&mut self) -> (u32, u32) {
fn nr_queued_and_running(&self) -> (u32, u32) {
let queued = self.stop.len() + self.real_time.len() + self.fair.len() + self.idle.len();
let running = usize::from(self.current.is_some());
(queued as u32, running as u32)

View File

@ -184,11 +184,11 @@ impl SchedClassRq for RealTimeClassRq {
self.nr_running += 1;
}
fn len(&mut self) -> usize {
fn len(&self) -> usize {
self.nr_running
}
fn is_empty(&mut self) -> bool {
fn is_empty(&self) -> bool {
self.nr_running == 0
}

View File

@ -41,11 +41,11 @@ impl SchedClassRq for StopClassRq {
}
}
fn len(&mut self) -> usize {
fn len(&self) -> usize {
usize::from(!self.is_empty())
}
fn is_empty(&mut self) -> bool {
fn is_empty(&self) -> bool {
self.thread.is_none()
}

View File

@ -48,6 +48,11 @@ impl AtomicCpuId {
.map_err(|prev| (prev as usize).try_into().unwrap())
}
/// Sets the inner value of an `AtomicCpuId` anyway.
pub fn set_anyway(&self, cpu_id: CpuId) {
self.0.store(cpu_id.as_usize() as u32, Ordering::Relaxed);
}
/// Sets the inner value of an `AtomicCpuId` to `AtomicCpuId::NONE`, i.e. makes
/// an `AtomicCpuId` empty.
pub fn set_to_none(&self) {