Fix timeout mechanism in wait_events

This commit is contained in:
Ruihan Li 2025-02-13 15:45:12 +08:00 committed by Tate, Hongliang Tian
parent 2411ba671c
commit 8187fa2f1b
7 changed files with 74 additions and 77 deletions

View File

@ -171,12 +171,12 @@ impl Socket for VsockStreamSocket {
vsockspace.request(&connecting.info()).unwrap(); vsockspace.request(&connecting.info()).unwrap();
// wait for response from driver // wait for response from driver
// TODO: Add timeout // TODO: Add timeout
let mut poller = Poller::new(); let mut poller = Poller::new(None);
if !connecting if !connecting
.poll(IoEvents::IN, Some(poller.as_handle_mut())) .poll(IoEvents::IN, Some(poller.as_handle_mut()))
.contains(IoEvents::IN) .contains(IoEvents::IN)
{ {
if let Err(e) = poller.wait(None) { if let Err(e) = poller.wait() {
vsockspace vsockspace
.remove_connecting_socket(&connecting.local_addr()) .remove_connecting_socket(&connecting.local_addr())
.unwrap(); .unwrap();

View File

@ -73,7 +73,7 @@ pub fn futex_wait_bitset(
// drop lock // drop lock
drop(futex_bucket); drop(futex_bucket);
let result = waiter.pause_timeout(timeout); let result = waiter.pause_timeout(&timeout.into());
match result { match result {
// FIXME: If the futex is woken up and a signal comes at the same time, we should succeed // FIXME: If the futex is woken up and a signal comes at the same time, we should succeed
// instead of failing with `EINTR`. The code below is of course wrong, but was needed to // instead of failing with `EINTR`. The code below is of course wrong, but was needed to

View File

@ -96,7 +96,7 @@ pub trait Pause: WaitTimeout {
/// [`ETIME`]: crate::error::Errno::ETIME /// [`ETIME`]: crate::error::Errno::ETIME
/// [`EINTR`]: crate::error::Errno::EINTR /// [`EINTR`]: crate::error::Errno::EINTR
#[track_caller] #[track_caller]
fn pause_timeout<'a>(&self, timeout: impl Into<TimeoutExt<'a>>) -> Result<()>; fn pause_timeout<'a>(&self, timeout: &TimeoutExt<'a>) -> Result<()>;
} }
impl Pause for Waiter { impl Pause for Waiter {
@ -136,8 +136,8 @@ impl Pause for Waiter {
res res
} }
fn pause_timeout<'a>(&self, timeout: impl Into<TimeoutExt<'a>>) -> Result<()> { fn pause_timeout<'a>(&self, timeout: &TimeoutExt<'a>) -> Result<()> {
let timer = timeout.into().check_expired()?.map(|timeout| { let timer = timeout.check_expired()?.map(|timeout| {
let waker = self.waker(); let waker = self.waker();
timeout.create_timer(move || { timeout.create_timer(move || {
waker.wake_up(); waker.wake_up();
@ -201,7 +201,7 @@ impl Pause for WaitQueue {
waiter.pause_until_or_timeout_impl(cond, timeout) waiter.pause_until_or_timeout_impl(cond, timeout)
} }
fn pause_timeout<'a>(&self, _timeout: impl Into<TimeoutExt<'a>>) -> Result<()> { fn pause_timeout<'a>(&self, _timeout: &TimeoutExt<'a>) -> Result<()> {
panic!("`pause_timeout` can only be used on `Waiter`"); panic!("`pause_timeout` can only be used on `Waiter`");
} }
} }

View File

@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use core::{ use core::{
sync::atomic::{AtomicIsize, AtomicUsize, Ordering}, sync::atomic::{AtomicIsize, Ordering},
time::Duration, time::Duration,
}; };
@ -13,6 +13,7 @@ use ostd::{
use crate::{ use crate::{
events::{IoEvents, Observer, Subject}, events::{IoEvents, Observer, Subject},
prelude::*, prelude::*,
time::wait::TimeoutExt,
}; };
/// A pollee represents any I/O object (e.g., a file or socket) that can be polled. /// A pollee represents any I/O object (e.g., a file or socket) that can be polled.
@ -255,6 +256,7 @@ impl<O: Observer<IoEvents> + 'static> PollAdaptor<O> {
impl<O> PollAdaptor<O> { impl<O> PollAdaptor<O> {
/// Gets a reference to the observer. /// Gets a reference to the observer.
#[expect(dead_code, reason = "Keep this `Arc` to avoid dropping the observer")]
pub fn observer(&self) -> &Arc<O> { pub fn observer(&self) -> &Arc<O> {
&self.observer &self.observer
} }
@ -267,76 +269,50 @@ impl<O> PollAdaptor<O> {
/// A poller that can be used to wait for some events. /// A poller that can be used to wait for some events.
pub struct Poller { pub struct Poller {
poller: PollAdaptor<EventCounter>, poller: PollHandle,
waiter: Waiter, waiter: Waiter,
timeout: TimeoutExt<'static>,
} }
impl Poller { impl Poller {
/// Constructs a new poller to wait for interesting events. /// Constructs a new poller to wait for interesting events.
pub fn new() -> Self { ///
let (waiter, event_counter) = EventCounter::new_pair(); /// If `timeout` is specified, [`Self::wait`] will fail with [`ETIME`] after the specified
/// timeout is expired.
///
/// [`ETIME`]: crate::error::Errno::ETIME
pub fn new(timeout: Option<&Duration>) -> Self {
let (waiter, waker) = Waiter::new_pair();
let mut timeout_ext = TimeoutExt::from(timeout);
timeout_ext.freeze();
Self { Self {
poller: PollAdaptor::with_observer(event_counter), poller: PollHandle::new(Arc::downgrade(&waker) as Weak<_>),
waiter, waiter,
timeout: timeout_ext,
} }
} }
/// Returns a mutable reference of [`PollHandle`]. /// Returns a mutable reference of [`PollHandle`].
pub fn as_handle_mut(&mut self) -> &mut PollHandle { pub fn as_handle_mut(&mut self) -> &mut PollHandle {
self.poller.as_handle_mut() &mut self.poller
} }
/// Waits until some interesting events happen since the last wait or until the timeout /// Waits until some interesting events happen since the last wait.
/// expires.
/// ///
/// The waiting process can be interrupted by a signal. /// This method will fail with [`EINTR`] if interrupted by signals or [`ETIME`] on timeout.
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { ///
self.poller.observer().read(&self.waiter, timeout)?; /// [`EINTR`]: crate::error::Errno::EINTR
Ok(()) /// [`ETIME`]: crate::error::Errno::ETIME
pub fn wait(&self) -> Result<()> {
self.waiter.pause_timeout(&self.timeout)
} }
} }
struct EventCounter { impl Observer<IoEvents> for Waker {
counter: AtomicUsize,
waker: Arc<Waker>,
}
impl EventCounter {
fn new_pair() -> (Waiter, Self) {
let (waiter, waker) = Waiter::new_pair();
(
waiter,
Self {
counter: AtomicUsize::new(0),
waker,
},
)
}
fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result<usize> {
let cond = || {
let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 {
Some(val)
} else {
None
}
};
waiter.pause_until_or_timeout(cond, timeout)
}
fn write(&self) {
self.counter.fetch_add(1, Ordering::Relaxed);
self.waker.wake_up();
}
}
impl Observer<IoEvents> for EventCounter {
fn on_events(&self, _events: &IoEvents) { fn on_events(&self, _events: &IoEvents) {
self.write(); self.wake_up();
} }
} }
@ -391,10 +367,10 @@ pub trait Pollable {
return_errno_with_message!(Errno::ETIME, "the timeout expired"); return_errno_with_message!(Errno::ETIME, "the timeout expired");
} }
// Wait until the event happens. // Create the poller and register to wait for the events.
let mut poller = Poller::new(); let mut poller = Poller::new(timeout);
if self.poll(mask, Some(poller.as_handle_mut())).is_empty() { if self.poll(mask, Some(poller.as_handle_mut())).is_empty() {
poller.wait(timeout)?; poller.wait()?;
} }
loop { loop {
@ -405,9 +381,7 @@ pub trait Pollable {
}; };
// Wait until the next event happens. // Wait until the next event happens.
// poller.wait()?;
// FIXME: We need to update `timeout` since we have waited for some time.
poller.wait(timeout)?;
} }
} }
} }

View File

@ -68,28 +68,24 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option<&Duration>, ctx: &Context) -
PollFiles::new_owned(poll_fds, &file_table_locked) PollFiles::new_owned(poll_fds, &file_table_locked)
}; };
let poller = match poll_files.register_poller() { let poller = match poll_files.register_poller(timeout) {
PollerResult::Registered(poller) => poller, PollerResult::Registered(poller) => poller,
PollerResult::FoundEvents(num_events) => return Ok(num_events), PollerResult::FoundEvents(num_events) => return Ok(num_events),
}; };
loop { loop {
match poller.wait(timeout) { match poller.wait() {
Ok(_) => {} Ok(()) => (),
Err(e) if e.error() == Errno::ETIME => { // We should return zero if the timeout expires
// The return value is zero if the timeout expires // before any file descriptors are ready.
// before any file descriptors became ready Err(err) if err.error() == Errno::ETIME => return Ok(0),
return Ok(0); Err(err) => return Err(err),
}
Err(e) => return Err(e),
}; };
let num_events = poll_files.count_events(); let num_events = poll_files.count_events();
if num_events > 0 { if num_events > 0 {
return Ok(num_events); return Ok(num_events);
} }
// FIXME: We need to update `timeout` since we have waited for some time.
} }
} }
@ -136,8 +132,8 @@ enum PollerResult {
impl PollFiles<'_> { impl PollFiles<'_> {
/// Registers the files with a poller, or exits early if some events are detected. /// Registers the files with a poller, or exits early if some events are detected.
fn register_poller(&self) -> PollerResult { fn register_poller(&self, timeout: Option<&Duration>) -> PollerResult {
let mut poller = Poller::new(); let mut poller = Poller::new(timeout);
for (index, poll_fd) in self.poll_fds.iter().enumerate() { for (index, poll_fd) in self.poll_fds.iter().enumerate() {
let events = if let Some(file) = self.file_at(index) { let events = if let Some(file) = self.file_at(index) {

View File

@ -160,6 +160,11 @@ impl TimerManager {
}) })
} }
/// Returns the clock associated with this timer manager.
pub fn clock(&self) -> &Arc<dyn Clock> {
&self.clock
}
/// Returns whether a given `timeout` is expired. /// Returns whether a given `timeout` is expired.
pub fn is_expired_timeout(&self, timeout: &Timeout) -> bool { pub fn is_expired_timeout(&self, timeout: &Timeout) -> bool {
match timeout { match timeout {

View File

@ -78,6 +78,16 @@ impl<'a> TimeoutExt<'a> {
TimeoutExt::Never => Ok(None), TimeoutExt::Never => Ok(None),
} }
} }
/// Freezes the expired time.
///
/// This works in the same way as [`ManagedTimeout::freeze`].
pub fn freeze(&mut self) {
match self {
Self::Never => (),
Self::At(timeout) => timeout.freeze(),
}
}
} }
impl From<&Duration> for TimeoutExt<'_> { impl From<&Duration> for TimeoutExt<'_> {
@ -134,6 +144,18 @@ impl<'a> ManagedTimeout<'a> {
self.manager.is_expired_timeout(&self.timeout) self.manager.is_expired_timeout(&self.timeout)
} }
/// Freezes the expired time.
///
/// If the timeout is specified as an instant after a period of time from the current time
/// (i.e., [`Timeout::After`]), this method will freeze the timeout by converting it to a fixed
/// instant (i.e., [`Timeout::When`]).
pub fn freeze(&mut self) {
self.timeout = match self.timeout {
Timeout::When(instant) => Timeout::When(instant),
Timeout::After(duration) => Timeout::When(self.manager.clock().read_time() + duration),
}
}
/// Creates a timer for the timeout. /// Creates a timer for the timeout.
pub fn create_timer<F>(&self, callback: F) -> Arc<Timer> pub fn create_timer<F>(&self, callback: F) -> Arc<Timer>
where where