From 29791ba77e299664d2bcfd02ac07ec693c1af0cf Mon Sep 17 00:00:00 2001 From: Zejun Zhao Date: Mon, 13 Jan 2025 16:39:47 +0800 Subject: [PATCH] Distribute tasks among all the CPUs --- kernel/src/sched/sched_class/fair.rs | 4 +- kernel/src/sched/sched_class/idle.rs | 4 +- kernel/src/sched/sched_class/mod.rs | 49 ++++++++++++++++------- kernel/src/sched/sched_class/real_time.rs | 4 +- kernel/src/sched/sched_class/stop.rs | 4 +- ostd/src/task/scheduler/info.rs | 5 +++ 6 files changed, 47 insertions(+), 23 deletions(-) diff --git a/kernel/src/sched/sched_class/fair.rs b/kernel/src/sched/sched_class/fair.rs index 9492aff6d..2e5440e99 100644 --- a/kernel/src/sched/sched_class/fair.rs +++ b/kernel/src/sched/sched_class/fair.rs @@ -235,11 +235,11 @@ impl SchedClassRq for FairClassRq { self.entities.push(Reverse(FairQueueItem(entity, vruntime))); } - fn len(&mut self) -> usize { + fn len(&self) -> usize { self.entities.len() } - fn is_empty(&mut self) -> bool { + fn is_empty(&self) -> bool { self.entities.is_empty() } diff --git a/kernel/src/sched/sched_class/idle.rs b/kernel/src/sched/sched_class/idle.rs index fd7953fcf..db55684f4 100644 --- a/kernel/src/sched/sched_class/idle.rs +++ b/kernel/src/sched/sched_class/idle.rs @@ -43,11 +43,11 @@ impl SchedClassRq for IdleClassRq { } } - fn len(&mut self) -> usize { + fn len(&self) -> usize { usize::from(!self.is_empty()) } - fn is_empty(&mut self) -> bool { + fn is_empty(&self) -> bool { self.entity.is_none() } diff --git a/kernel/src/sched/sched_class/mod.rs b/kernel/src/sched/sched_class/mod.rs index 51bc69ff2..d296e2a51 100644 --- a/kernel/src/sched/sched_class/mod.rs +++ b/kernel/src/sched/sched_class/mod.rs @@ -7,14 +7,14 @@ use core::fmt; use ostd::{ arch::read_tsc as sched_clock, - cpu::{all_cpus, AtomicCpuSet, CpuId, PinCurrentCpu}, + cpu::{all_cpus, CpuId, PinCurrentCpu}, sync::SpinLock, task::{ scheduler::{ info::CommonSchedInfo, inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler, UpdateFlags, }, - Task, + AtomicCpuId, Task, }, trap::disable_local, }; @@ -104,10 +104,10 @@ trait SchedClassRq: Send + fmt::Debug { fn enqueue(&mut self, task: Arc, flags: Option); /// Returns the number of threads in the run queue. - fn len(&mut self) -> usize; + fn len(&self) -> usize; /// Checks if the run queue is empty. - fn is_empty(&mut self) -> bool { + fn is_empty(&self) -> bool { self.len() == 0 } @@ -126,7 +126,7 @@ trait SchedClassRq: Send + fmt::Debug { #[derive(Debug)] pub struct SchedAttr { policy: SchedPolicyState, - + last_cpu: AtomicCpuId, real_time: real_time::RealTimeAttr, fair: fair::FairAttr, } @@ -136,6 +136,7 @@ impl SchedAttr { pub fn new(policy: SchedPolicy) -> Self { Self { policy: SchedPolicyState::new(policy), + last_cpu: AtomicCpuId::default(), real_time: { let (prio, policy) = match policy { SchedPolicy::RealTime { rt_prio, rt_policy } => (rt_prio.get(), rt_policy), @@ -172,6 +173,14 @@ impl SchedAttr { _ => {} }); } + + fn last_cpu(&self) -> Option { + self.last_cpu.get() + } + + fn set_last_cpu(&self, cpu_id: CpuId) { + self.last_cpu.set_anyway(cpu_id); + } } impl Scheduler for ClassScheduler { @@ -179,7 +188,7 @@ impl Scheduler for ClassScheduler { let thread = task.as_thread()?.clone(); let (still_in_rq, cpu) = { - let selected_cpu_id = self.select_cpu(thread.atomic_cpu_affinity()); + let selected_cpu_id = self.select_cpu(&thread, flags); if let Err(task_cpu_id) = task.cpu().set_if_is_none(selected_cpu_id) { debug_assert!(flags != EnqueueFlags::Spawn); @@ -196,6 +205,7 @@ impl Scheduler for ClassScheduler { return None; } + thread.sched_attr().set_last_cpu(cpu); rq.enqueue_entity((task, thread), Some(flags)); Some(cpu) } @@ -229,15 +239,24 @@ impl ClassScheduler { } // TODO: Implement a better algorithm and replace the current naive implementation. - fn select_cpu(&self, affinity: &AtomicCpuSet) -> CpuId { - let guard = disable_local(); - let affinity = affinity.load(); - let cur = guard.current_cpu(); - if affinity.contains(cur) { - cur - } else { - affinity.iter().next().expect("empty affinity") + fn select_cpu(&self, thread: &Thread, flags: EnqueueFlags) -> CpuId { + if let Some(last_cpu) = thread.sched_attr().last_cpu() { + return last_cpu; } + debug_assert!(flags == EnqueueFlags::Spawn); + let affinity = thread.atomic_cpu_affinity(); + let guard = disable_local(); + let mut selected = guard.current_cpu(); + let mut minimum_load = u32::MAX; + for candidate in affinity.load().iter() { + let rq = self.rqs[candidate.as_usize()].lock(); + let (load, _) = rq.nr_queued_and_running(); + if load < minimum_load { + minimum_load = load; + selected = candidate; + } + } + selected } } @@ -262,7 +281,7 @@ impl PerCpuClassRqSet { } } - fn nr_queued_and_running(&mut self) -> (u32, u32) { + fn nr_queued_and_running(&self) -> (u32, u32) { let queued = self.stop.len() + self.real_time.len() + self.fair.len() + self.idle.len(); let running = usize::from(self.current.is_some()); (queued as u32, running as u32) diff --git a/kernel/src/sched/sched_class/real_time.rs b/kernel/src/sched/sched_class/real_time.rs index 5644490ae..e95350653 100644 --- a/kernel/src/sched/sched_class/real_time.rs +++ b/kernel/src/sched/sched_class/real_time.rs @@ -184,11 +184,11 @@ impl SchedClassRq for RealTimeClassRq { self.nr_running += 1; } - fn len(&mut self) -> usize { + fn len(&self) -> usize { self.nr_running } - fn is_empty(&mut self) -> bool { + fn is_empty(&self) -> bool { self.nr_running == 0 } diff --git a/kernel/src/sched/sched_class/stop.rs b/kernel/src/sched/sched_class/stop.rs index 59b3d6cd6..7dfadd7fc 100644 --- a/kernel/src/sched/sched_class/stop.rs +++ b/kernel/src/sched/sched_class/stop.rs @@ -41,11 +41,11 @@ impl SchedClassRq for StopClassRq { } } - fn len(&mut self) -> usize { + fn len(&self) -> usize { usize::from(!self.is_empty()) } - fn is_empty(&mut self) -> bool { + fn is_empty(&self) -> bool { self.thread.is_none() } diff --git a/ostd/src/task/scheduler/info.rs b/ostd/src/task/scheduler/info.rs index 0f576deb8..485fd2cea 100644 --- a/ostd/src/task/scheduler/info.rs +++ b/ostd/src/task/scheduler/info.rs @@ -48,6 +48,11 @@ impl AtomicCpuId { .map_err(|prev| (prev as usize).try_into().unwrap()) } + /// Sets the inner value of an `AtomicCpuId` anyway. + pub fn set_anyway(&self, cpu_id: CpuId) { + self.0.store(cpu_id.as_usize() as u32, Ordering::Relaxed); + } + /// Sets the inner value of an `AtomicCpuId` to `AtomicCpuId::NONE`, i.e. makes /// an `AtomicCpuId` empty. pub fn set_to_none(&self) {