Revise the public APIs of WaitQueue

This commit is contained in:
Ruihan Li
2024-05-31 10:54:33 +08:00
committed by Tate, Hongliang Tian
parent 140640c5d6
commit a664f1a9fc
2 changed files with 38 additions and 35 deletions

View File

@ -11,7 +11,7 @@ use crate::task::{add_task, current_task, schedule, Task, TaskStatus};
/// One may wait on a wait queue to put its executing thread to sleep. /// One may wait on a wait queue to put its executing thread to sleep.
/// Multiple threads may be the waiters of a wait queue. /// Multiple threads may be the waiters of a wait queue.
/// Other threads may invoke the `wake`-family methods of a wait queue to /// Other threads may invoke the `wake`-family methods of a wait queue to
/// wake up one or many waiter threads. /// wake up one or many waiting threads.
pub struct WaitQueue { pub struct WaitQueue {
// A copy of `wakers.len()`, used for the lock-free fast path in `wake_one` and `wake_all`. // A copy of `wakers.len()`, used for the lock-free fast path in `wake_one` and `wake_all`.
num_wakers: AtomicU32, num_wakers: AtomicU32,
@ -19,6 +19,7 @@ pub struct WaitQueue {
} }
impl WaitQueue { impl WaitQueue {
/// Creates a new, empty wait queue.
pub const fn new() -> Self { pub const fn new() -> Self {
WaitQueue { WaitQueue {
num_wakers: AtomicU32::new(0), num_wakers: AtomicU32::new(0),
@ -26,7 +27,7 @@ impl WaitQueue {
} }
} }
/// Wait until some condition becomes true. /// Waits until some condition is met.
/// ///
/// This method takes a closure that tests a user-given condition. /// This method takes a closure that tests a user-given condition.
/// The method only returns if the condition returns `Some(_)`. /// The method only returns if the condition returns `Some(_)`.
@ -34,7 +35,7 @@ impl WaitQueue {
/// `wake`-family method. This ordering is important to ensure that waiter /// `wake`-family method. This ordering is important to ensure that waiter
/// threads do not lose any wakeup notifications. /// threads do not lose any wakeup notifications.
/// ///
/// By taking a condition closure, his wait-wakeup mechanism becomes /// By taking a condition closure, this wait-wakeup mechanism becomes
/// more efficient and robust. /// more efficient and robust.
pub fn wait_until<F, R>(&self, mut cond: F) -> R pub fn wait_until<F, R>(&self, mut cond: F) -> R
where where
@ -50,7 +51,7 @@ impl WaitQueue {
.unwrap() .unwrap()
} }
/// Wait until some condition becomes true or the cancel condition becomes true. /// Waits until some condition is met or the cancel condition becomes true.
/// ///
/// This method will return `Some(_)` if the condition returns `Some(_)`, and will return /// This method will return `Some(_)` if the condition returns `Some(_)`, and will return
/// the condition test result regardless what it is when the cancel condition becomes true. /// the condition test result regardless what it is when the cancel condition becomes true.
@ -85,13 +86,38 @@ impl WaitQueue {
} }
} }
/// Wake up one waiting thread. /// Wakes up one waiting thread, if there is one at the point of time when this method is
pub fn wake_one(&self) { /// called, returning whether such a thread was woken up.
pub fn wake_one(&self) -> bool {
// Fast path // Fast path
if self.is_empty() { if self.is_empty() {
return; return false;
} }
loop {
let mut wakers = self.wakers.lock_irq_disabled();
let Some(waker) = wakers.pop_front() else {
return false;
};
self.num_wakers.fetch_sub(1, Ordering::Release);
// Avoid holding lock when calling `wake_up`
drop(wakers);
if waker.wake_up() {
return true;
}
}
}
/// Wakes up all waiting threads, returning the number of threads that were woken up.
pub fn wake_all(&self) -> usize {
// Fast path
if self.is_empty() {
return 0;
}
let mut num_woken = 0;
loop { loop {
let mut wakers = self.wakers.lock_irq_disabled(); let mut wakers = self.wakers.lock_irq_disabled();
let Some(waker) = wakers.pop_front() else { let Some(waker) = wakers.pop_front() else {
@ -102,33 +128,14 @@ impl WaitQueue {
drop(wakers); drop(wakers);
if waker.wake_up() { if waker.wake_up() {
return; num_woken += 1;
}
}
}
/// Wake up all waiting threads.
pub fn wake_all(&self) {
// Fast path
if self.is_empty() {
return;
}
loop {
let mut wakers = self.wakers.lock_irq_disabled();
let Some(waker) = wakers.pop_front() else {
break;
};
self.num_wakers.fetch_sub(1, Ordering::Release);
// Avoid holding lock when calling `wake_up`
drop(wakers);
waker.wake_up();
} }
} }
/// Return whether the current wait queue is empty. num_woken
pub fn is_empty(&self) -> bool { }
fn is_empty(&self) -> bool {
self.num_wakers.load(Ordering::Acquire) == 0 self.num_wakers.load(Ordering::Acquire) == 0
} }

View File

@ -87,11 +87,7 @@ impl LocalWorkerPool {
} }
fn wake_worker(&self) -> bool { fn wake_worker(&self) -> bool {
if !self.idle_wait_queue.is_empty() { self.idle_wait_queue.wake_one()
self.idle_wait_queue.wake_one();
return true;
}
false
} }
fn has_pending_work_items(&self) -> bool { fn has_pending_work_items(&self) -> bool {