Add wait_until_or_cancelled API to WaitQueue

This commit is contained in:
Chen Chengjun
2024-05-11 17:14:24 +08:00
committed by Tate, Hongliang Tian
parent 91152bceed
commit 2002db5481
4 changed files with 55 additions and 52 deletions

View File

@ -29,10 +29,10 @@ impl WaitQueue {
/// Wait until some condition becomes true. /// Wait until some condition becomes true.
/// ///
/// This method takes a closure that tests a user-given condition. /// This method takes a closure that tests a user-given condition.
/// The method only returns if the condition returns Some(_). /// The method only returns if the condition returns `Some(_)`.
/// A waker thread should first make the condition Some(_), then invoke the /// A waker thread should first make the condition `Some(_)`, then invoke the
/// `wake`-family method. This ordering is important to ensure that waiter /// `wake`-family method. This ordering is important to ensure that waiter
/// threads do not lose any wakeup notifiations. /// threads do not lose any wakeup notifications.
/// ///
/// By taking a condition closure, his wait-wakeup mechanism becomes /// By taking a condition closure, his wait-wakeup mechanism becomes
/// more efficient and robust. /// more efficient and robust.
@ -44,36 +44,43 @@ impl WaitQueue {
return res; return res;
} }
let (waiter, waker) = Waiter::new_pair(); let (waiter, _) = Waiter::new_pair();
loop { self.wait_until_or_cancelled(cond, waiter, || false)
// Enqueue the waker before checking `cond()` to avoid races .unwrap()
self.enqueue(waker.clone());
if let Some(res) = cond() {
return res;
};
waiter.wait();
}
} }
pub fn wait_until_or_cancelled<F, R>(&self, mut cond: F) -> R /// Wait until some condition becomes true or the cancel condition becomes true.
///
/// This method will return `Some(_)` if the condition returns `Some(_)`, and will return
/// the condition test result regardless what it is when the cancel condition becomes true.
#[doc(hidden)]
pub fn wait_until_or_cancelled<F, R, FCancel>(
&self,
mut cond: F,
waiter: Waiter,
cancel_cond: FCancel,
) -> Option<R>
where where
F: FnMut() -> Option<R>, F: FnMut() -> Option<R>,
FCancel: Fn() -> bool,
{ {
if let Some(res) = cond() { let waker = waiter.waker();
return res;
}
let (waiter, waker) = Waiter::new_pair();
loop { loop {
// Enqueue the waker before checking `cond()` to avoid races // Enqueue the waker before checking `cond()` to avoid races
self.enqueue(waker.clone()); self.enqueue(waker.clone());
if let Some(res) = cond() { if let Some(res) = cond() {
return res; return Some(res);
}; };
if cancel_cond() {
// Drop the waiter and check again to avoid missing a wake event.
drop(waiter);
return cond();
}
waiter.wait(); waiter.wait();
} }
} }
@ -120,6 +127,7 @@ impl WaitQueue {
} }
} }
/// Return whether the current wait queue is empty.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.num_wakers.load(Ordering::Acquire) == 0 self.num_wakers.load(Ordering::Acquire) == 0
} }
@ -174,6 +182,11 @@ impl Waiter {
pub fn wait(&self) { pub fn wait(&self) {
self.waker.do_wait(); self.waker.do_wait();
} }
/// Gets the associated [`Waker`] of the current waiter.
pub fn waker(&self) -> Arc<Waker> {
self.waker.clone()
}
} }
impl Drop for Waiter { impl Drop for Waiter {
@ -237,9 +250,3 @@ impl Waker {
self.has_woken.store(true, Ordering::Release); self.has_woken.store(true, Ordering::Release);
} }
} }
impl Default for Waiter {
fn default() -> Self {
Self::new()
}
}

View File

@ -6,6 +6,7 @@ use core::ops::Range;
use align_ext::AlignExt; use align_ext::AlignExt;
use spin::Once; use spin::Once;
use static_assertions::const_assert;
use super::{ use super::{
page_table::{nr_ptes_per_node, KernelMode, PageTable}, page_table::{nr_ptes_per_node, KernelMode, PageTable},

View File

@ -44,7 +44,7 @@ pub(crate) use crate::{
current, current_thread, current, current_thread,
error::{Errno, Error}, error::{Errno, Error},
print, println, print, println,
time::Clock, time::{wait::WaitTimeout, Clock},
}; };
pub(crate) type Result<T> = core::result::Result<T, Error>; pub(crate) type Result<T> = core::result::Result<T, Error>;
pub(crate) use crate::{return_errno, return_errno_with_message}; pub(crate) use crate::{return_errno, return_errno_with_message};

View File

@ -1,13 +1,12 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use alloc::sync::Arc;
use core::time::Duration; use core::time::Duration;
use aster_frame::sync::{WaitQueue, Waiter, Waker}; use aster_frame::sync::{WaitQueue, Waiter};
use super::clock::JIFFIES_TIMER_MANAGER; use super::clocks::JIFFIES_TIMER_MANAGER;
/// A trait that provide the timeout related function for WaitQueue. /// A trait that provide the timeout related function for [`WaitQueue`]`.
pub trait WaitTimeout { pub trait WaitTimeout {
/// Wait until some condition returns `Some(_)`, or a given timeout is reached. If /// Wait until some condition returns `Some(_)`, or a given timeout is reached. If
/// the condition does not becomes `Some(_)` before the timeout is reached, the /// the condition does not becomes `Some(_)` before the timeout is reached, the
@ -22,36 +21,32 @@ impl WaitTimeout for WaitQueue {
where where
F: FnMut() -> Option<R>, F: FnMut() -> Option<R>,
{ {
if *timeout == Duration::ZERO {
return cond();
}
if let Some(res) = cond() { if let Some(res) = cond() {
return Some(res); return Some(res);
} }
let (waiter, waker) = Waiter::new_pair(); let (waiter, waker) = Waiter::new_pair();
let wake_up = {
let waker = waker.clone();
move || {
waker.wake_up();
}
};
let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(wake_up); let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(move || {
waker.wake_up();
});
jiffies_timer.set_timeout(*timeout); jiffies_timer.set_timeout(*timeout);
loop { let cancel_cond = {
// Enqueue the waker before checking `cond()` to avoid races let jiffies_timer = jiffies_timer.clone();
self.enqueue(waker.clone()); move || jiffies_timer.remain() == Duration::ZERO
};
let res = self.wait_until_or_cancelled(cond, waiter, cancel_cond);
if let Some(res) = cond() { // If res is `Some`, then the timeout may not have been expired. We cancel it manually.
jiffies_timer.clear(); if res.is_some() {
return Some(res); jiffies_timer.cancel();
};
if jiffies_timer.remain() == Duration::ZERO {
drop(waiter);
return cond();
}
waiter.wait();
} }
res
} }
} }