Add wait_until_* functionality to Waiter

This commit is contained in:
Chen Chengjun
2024-09-14 10:13:14 +08:00
committed by Tate, Hongliang Tian
parent 6a5a5b4d3d
commit 1873bb7a3f
5 changed files with 140 additions and 62 deletions

View File

@ -59,7 +59,7 @@ fn spawn_background_poll_thread(iface: Arc<Iface>) {
} }
let duration = Duration::from_millis(next_poll_at_ms - now_as_ms); let duration = Duration::from_millis(next_poll_at_ms - now_as_ms);
wait_queue.wait_until_or_timeout( let _ = wait_queue.wait_until_or_timeout(
// If `iface_ext.next_poll_at_ms()` changes to an earlier time, we will end the // If `iface_ext.next_poll_at_ms()` changes to an earlier time, we will end the
// waiting. // waiting.
|| (iface_ext.next_poll_at_ms()? < next_poll_at_ms).then_some(()), || (iface_ext.next_poll_at_ms()? < next_poll_at_ms).then_some(()),

View File

@ -164,8 +164,8 @@ impl Condvar {
// Wait until the condition becomes true, we're explicitly woken up, or the timeout elapses. // Wait until the condition becomes true, we're explicitly woken up, or the timeout elapses.
let res = self.waitqueue.wait_until_or_timeout(cond, &timeout); let res = self.waitqueue.wait_until_or_timeout(cond, &timeout);
match res { match res {
Some(_) => Ok((lock.lock(), false)), Ok(()) => Ok((lock.lock(), false)),
None => { Err(_) => {
let mut counter = self.counter.lock(); let mut counter = self.counter.lock();
counter.waiter_count -= 1; counter.waiter_count -= 1;
Err(LockErr::Timeout((lock.lock(), true))) Err(LockErr::Timeout((lock.lock(), true)))

View File

@ -264,7 +264,7 @@ impl Monitor {
for local_pool in worker_pool.local_pools.iter() { for local_pool in worker_pool.local_pools.iter() {
local_pool.set_heartbeat(false); local_pool.set_heartbeat(false);
} }
sleep_queue.wait_until_or_timeout(|| -> Option<()> { None }, &sleep_duration); let _ = sleep_queue.wait_until_or_timeout(|| -> Option<()> { None }, &sleep_duration);
} }
} }
} }

View File

@ -5,48 +5,121 @@ use core::time::Duration;
use ostd::sync::{WaitQueue, Waiter}; use ostd::sync::{WaitQueue, Waiter};
use super::{clocks::JIFFIES_TIMER_MANAGER, timer::Timeout}; use super::{clocks::JIFFIES_TIMER_MANAGER, timer::Timeout};
use crate::prelude::*;
/// A trait that provide the timeout related function for [`WaitQueue`]`. /// A trait that provide the timeout related function for [`Waiter`] and [`WaitQueue`]`.
pub trait WaitTimeout { pub trait WaitTimeout {
/// Wait until some condition returns `Some(_)`, or a given timeout is reached. If /// Waits until some condition returns `Some(_)`, or a given timeout is reached. If
/// the condition does not becomes `Some(_)` before the timeout is reached, the /// the condition does not becomes `Some(_)` before the timeout is reached,
/// function will return `None`. /// this function will return `ETIME` error.
fn wait_until_or_timeout<F, R>(&self, cond: F, timeout: &Duration) -> Option<R> fn wait_until_or_timeout<F, R>(&self, cond: F, timeout: &Duration) -> Result<R>
where
F: FnMut() -> Option<R>;
}
impl WaitTimeout for WaitQueue {
fn wait_until_or_timeout<F, R>(&self, mut cond: F, timeout: &Duration) -> Option<R>
where where
F: FnMut() -> Option<R>, F: FnMut() -> Option<R>,
{
self.wait_until_or_timeout_cancelled(cond, || Ok(()), timeout)
}
/// Waits until some condition returns `Some(_)`, or be cancelled due to
/// reaching the timeout or the inputted cancel condition. If the condition
/// does not becomes `Some(_)` before the timeout is reached or `cancel_cond`
/// returns `Err`, this function will return corresponding `Err`.
#[doc(hidden)]
fn wait_until_or_timeout_cancelled<F, R, FCancel>(
&self,
cond: F,
cancel_cond: FCancel,
timeout: &Duration,
) -> Result<R>
where
F: FnMut() -> Option<R>,
FCancel: Fn() -> Result<()>;
}
impl WaitTimeout for Waiter {
fn wait_until_or_timeout_cancelled<F, R, FCancel>(
&self,
mut cond: F,
cancel_cond: FCancel,
timeout: &Duration,
) -> Result<R>
where
F: FnMut() -> Option<R>,
FCancel: Fn() -> Result<()>,
{ {
if *timeout == Duration::ZERO { if *timeout == Duration::ZERO {
return cond(); return cond()
.ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached"));
} }
if let Some(res) = cond() { if let Some(res) = cond() {
return Some(res); return Ok(res);
} }
let (waiter, waker) = Waiter::new_pair(); let waker = self.waker();
let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(move || { let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(move || {
waker.wake_up(); waker.wake_up();
}); });
jiffies_timer.set_timeout(Timeout::After(*timeout)); jiffies_timer.set_timeout(Timeout::After(*timeout));
let cancel_cond = { let timeout_cond = {
let jiffies_timer = jiffies_timer.clone(); let jiffies_timer = jiffies_timer.clone();
move || jiffies_timer.remain() == Duration::ZERO move || {
if jiffies_timer.remain() != Duration::ZERO {
Ok(())
} else {
Err(Error::with_message(
Errno::ETIME,
"the time limit is reached",
))
}
}
}; };
let res = self.wait_until_or_cancelled(cond, waiter, cancel_cond);
// If res is `Some`, then the timeout may not have been expired. We cancel it manually. let cancel_cond = || {
if res.is_some() { timeout_cond()?;
cancel_cond()
};
let res = self.wait_until_or_cancelled(cond, cancel_cond);
// If `res` is not `ETIME` error, then the timeout may not have been expired.
// We cancel it manually.
if !res
.as_ref()
.is_err_and(|e: &Error| e.error() == Errno::ETIME)
{
jiffies_timer.cancel(); jiffies_timer.cancel();
} }
res res
} }
} }
impl WaitTimeout for WaitQueue {
fn wait_until_or_timeout_cancelled<F, R, FCancel>(
&self,
mut cond: F,
cancel_cond: FCancel,
timeout: &Duration,
) -> Result<R>
where
F: FnMut() -> Option<R>,
FCancel: Fn() -> Result<()>,
{
if *timeout == Duration::ZERO {
return cond()
.ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached"));
}
if let Some(res) = cond() {
return Ok(res);
}
let (waiter, _) = Waiter::new_pair();
let cond = || {
self.enqueue(waiter.waker());
cond()
};
waiter.wait_until_or_timeout_cancelled(cond, cancel_cond, timeout)
}
}

View File

@ -74,46 +74,15 @@ impl WaitQueue {
} }
let (waiter, _) = Waiter::new_pair(); let (waiter, _) = Waiter::new_pair();
let cond = || {
self.wait_until_or_cancelled(cond, waiter, || false) self.enqueue(waiter.waker());
cond()
};
waiter
.wait_until_or_cancelled(cond, || Ok::<(), ()>(()))
.unwrap() .unwrap()
} }
/// Waits until some condition is met or the cancel condition becomes true.
///
/// This method will return `Some(_)` if the condition returns `Some(_)`, and will return
/// the condition test result regardless what it is when the cancel condition becomes true.
#[doc(hidden)]
pub fn wait_until_or_cancelled<F, R, FCancel>(
&self,
mut cond: F,
waiter: Waiter,
cancel_cond: FCancel,
) -> Option<R>
where
F: FnMut() -> Option<R>,
FCancel: Fn() -> bool,
{
let waker = waiter.waker();
loop {
// Enqueue the waker before checking `cond()` to avoid races
self.enqueue(waker.clone());
if let Some(res) = cond() {
return Some(res);
};
if cancel_cond() {
// Drop the waiter and check again to avoid missing a wake event.
drop(waiter);
return cond();
}
waiter.wait();
}
}
/// Wakes up one waiting thread, if there is one at the point of time when this method is /// Wakes up one waiting thread, if there is one at the point of time when this method is
/// called, returning whether such a thread was woken up. /// called, returning whether such a thread was woken up.
pub fn wake_one(&self) -> bool { pub fn wake_one(&self) -> bool {
@ -170,7 +139,9 @@ impl WaitQueue {
self.num_wakers.fetch_add(0, Ordering::Release) == 0 self.num_wakers.fetch_add(0, Ordering::Release) == 0
} }
fn enqueue(&self, waker: Arc<Waker>) { /// Enqueues the input [`Waker`] to the wait queue.
#[doc(hidden)]
pub fn enqueue(&self, waker: Arc<Waker>) {
let mut wakers = self.wakers.disable_irq().lock(); let mut wakers = self.wakers.disable_irq().lock();
wakers.push_back(waker); wakers.push_back(waker);
self.num_wakers.fetch_add(1, Ordering::Acquire); self.num_wakers.fetch_add(1, Ordering::Acquire);
@ -227,10 +198,44 @@ impl Waiter {
self.waker.do_wait(); self.waker.do_wait();
} }
/// Waits until some condition is met or the cancel condition becomes true.
///
/// This method will return `Ok(_)` if the condition returns `Some(_)`, and will stop waiting
/// if the cancel condition returns `Err(_)`. In this situation, this method will return the `Err(_)`
/// generated by the cancel condition.
pub fn wait_until_or_cancelled<F, R, FCancel, E>(
&self,
mut cond: F,
cancel_cond: FCancel,
) -> core::result::Result<R, E>
where
F: FnMut() -> Option<R>,
FCancel: Fn() -> core::result::Result<(), E>,
{
loop {
if let Some(res) = cond() {
return Ok(res);
};
if let Err(e) = cancel_cond() {
// Close the waker and check again to avoid missing a wake event.
self.waker.close();
return cond().ok_or(e);
}
self.wait();
}
}
/// Gets the associated [`Waker`] of the current waiter. /// Gets the associated [`Waker`] of the current waiter.
pub fn waker(&self) -> Arc<Waker> { pub fn waker(&self) -> Arc<Waker> {
self.waker.clone() self.waker.clone()
} }
/// Returns the task that the associated waker will attempt to wake up.
pub fn task(&self) -> &Arc<Task> {
&self.waker.task
}
} }
impl Drop for Waiter { impl Drop for Waiter {