Support futex wait timeout

This commit is contained in:
Jianfeng Jiang
2024-08-01 03:17:49 +00:00
committed by Tate, Hongliang Tian
parent 12325733b0
commit 70505ff4f8
8 changed files with 332 additions and 74 deletions

View File

@ -6,7 +6,10 @@ use ostd::sync::{WaitQueue, Waiter};
use super::sig_mask::SigMask;
use crate::{
prelude::*, process::posix_thread::PosixThreadExt, thread::Thread, time::wait::WaitTimeout,
prelude::*,
process::posix_thread::PosixThreadExt,
thread::Thread,
time::wait::{TimerBuilder, WaitTimeout},
};
/// `Pause` is an extension trait to make [`Waiter`] and [`WaitQueue`] signal aware.
@ -33,7 +36,7 @@ pub trait Pause: WaitTimeout {
where
F: FnMut() -> Option<R>,
{
self.pause_until_or_timeout_opt(cond, None)
self.pause_until_or_timer_timeout_opt(cond, None)
}
/// Pauses the execution of the current thread until the `cond` is met ( i.e., `cond()`
@ -56,7 +59,18 @@ pub trait Pause: WaitTimeout {
return cond()
.ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached"));
}
self.pause_until_or_timeout_opt(cond, Some(timeout))
let timer_builder = TimerBuilder::new(timeout);
self.pause_until_or_timer_timeout_opt(cond, Some(&timer_builder))
}
/// Similar to [`Pause::pause_until_or_timeout`].
///
/// The only difference is that the timeout is against the user-specified clock.
fn pause_until_or_timer_timeout<F, R>(&self, cond: F, timer_builder: &TimerBuilder) -> Result<R>
where
F: FnMut() -> Option<R>,
{
self.pause_until_or_timer_timeout_opt(cond, Some(timer_builder))
}
/// Pauses the execution of the current thread until the `cond` is met ( i.e., `cond()`
@ -71,14 +85,31 @@ pub trait Pause: WaitTimeout {
///
/// [`ETIME`]: crate::error::Errno::ETIME
/// [`EINTR`]: crate::error::Errno::EINTR
#[doc(hidden)]
fn pause_until_or_timeout_opt<F, R>(&self, cond: F, timeout: Option<&Duration>) -> Result<R>
fn pause_until_or_timer_timeout_opt<F, R>(
&self,
cond: F,
timer_builder: Option<&TimerBuilder>,
) -> Result<R>
where
F: FnMut() -> Option<R>;
/// Pauses the execution of the current thread until being woken up,
/// or some signals are received by the current thread or process.
/// If the input `timeout` is set, the pausing will finish when the `timeout` is expired.
///
/// # Errors
///
/// If `timeout` is expired before woken up or some signals are received,
/// this method will return [`Errno::ETIME`].
fn pause_timer_timeout(&self, timer_builder: Option<&TimerBuilder>) -> Result<()>;
}
impl Pause for Waiter {
fn pause_until_or_timeout_opt<F, R>(&self, mut cond: F, timeout: Option<&Duration>) -> Result<R>
fn pause_until_or_timer_timeout_opt<F, R>(
&self,
mut cond: F,
timer_builder: Option<&TimerBuilder>,
) -> Result<R>
where
F: FnMut() -> Option<R>,
{
@ -88,9 +119,12 @@ impl Pause for Waiter {
let current_thread = self.task().data().downcast_ref::<Arc<Thread>>();
let Some(posix_thread) = current_thread.and_then(|thread| thread.as_posix_thread()) else {
if let Some(timeout) = timeout {
return self.wait_until_or_timeout(cond, timeout);
let Some(posix_thread) = current_thread
.as_ref()
.and_then(|thread| thread.as_posix_thread())
else {
if let Some(timer_builder) = timer_builder {
return self.wait_until_or_timer_timeout(cond, timer_builder);
} else {
return self.wait_until_or_cancelled(cond, || Ok(()));
}
@ -107,18 +141,54 @@ impl Pause for Waiter {
};
posix_thread.set_signalled_waker(self.waker());
let res = if let Some(timeout) = timeout {
self.wait_until_or_timeout_cancelled(cond, cancel_cond, timeout)
let res = if let Some(timer_builder) = timer_builder {
self.wait_until_or_timer_timeout_cancelled(cond, cancel_cond, timer_builder)
} else {
self.wait_until_or_cancelled(cond, cancel_cond)
};
posix_thread.clear_signalled_waker();
res
}
fn pause_timer_timeout(&self, timer_builder: Option<&TimerBuilder>) -> Result<()> {
let timer = timer_builder.map(|timer_builder| {
let waker = self.waker();
timer_builder.fire(move || {
waker.wake_up();
})
});
let current_thread = self.task().data().downcast_ref::<Arc<Thread>>();
if let Some(posix_thread) = current_thread
.as_ref()
.and_then(|thread| thread.as_posix_thread())
{
posix_thread.set_signalled_waker(self.waker());
self.wait();
posix_thread.clear_signalled_waker();
} else {
self.wait();
}
if let Some(timer) = timer {
if timer.remain().is_zero() {
return_errno_with_message!(Errno::ETIME, "the timeout is reached");
}
// If the timeout is not expired, cancel the timer manually.
timer.cancel();
}
Ok(())
}
}
impl Pause for WaitQueue {
fn pause_until_or_timeout_opt<F, R>(&self, mut cond: F, timeout: Option<&Duration>) -> Result<R>
fn pause_until_or_timer_timeout_opt<F, R>(
&self,
mut cond: F,
timer_builder: Option<&TimerBuilder>,
) -> Result<R>
where
F: FnMut() -> Option<R>,
{
@ -131,7 +201,11 @@ impl Pause for WaitQueue {
self.enqueue(waiter.waker());
cond()
};
waiter.pause_until_or_timeout_opt(cond, timeout)
waiter.pause_until_or_timer_timeout_opt(cond, timer_builder)
}
fn pause_timer_timeout(&self, _timer_builder: Option<&TimerBuilder>) -> Result<()> {
panic!("`pause_timer_timeout` can only be used on `Waiter`");
}
}