From 8187fa2f1b3fd72dbacf16c4a46620b958c34a96 Mon Sep 17 00:00:00 2001 From: Ruihan Li Date: Thu, 13 Feb 2025 15:45:12 +0800 Subject: [PATCH] Fix timeout mechanism in `wait_events` --- kernel/src/net/socket/vsock/stream/socket.rs | 4 +- kernel/src/process/posix_thread/futex.rs | 2 +- kernel/src/process/signal/pause.rs | 8 +- kernel/src/process/signal/poll.rs | 88 +++++++------------- kernel/src/syscall/poll.rs | 22 ++--- kernel/src/time/core/timer.rs | 5 ++ kernel/src/time/wait.rs | 22 +++++ 7 files changed, 74 insertions(+), 77 deletions(-) diff --git a/kernel/src/net/socket/vsock/stream/socket.rs b/kernel/src/net/socket/vsock/stream/socket.rs index 831b45fe8..83a0fa063 100644 --- a/kernel/src/net/socket/vsock/stream/socket.rs +++ b/kernel/src/net/socket/vsock/stream/socket.rs @@ -171,12 +171,12 @@ impl Socket for VsockStreamSocket { vsockspace.request(&connecting.info()).unwrap(); // wait for response from driver // TODO: Add timeout - let mut poller = Poller::new(); + let mut poller = Poller::new(None); if !connecting .poll(IoEvents::IN, Some(poller.as_handle_mut())) .contains(IoEvents::IN) { - if let Err(e) = poller.wait(None) { + if let Err(e) = poller.wait() { vsockspace .remove_connecting_socket(&connecting.local_addr()) .unwrap(); diff --git a/kernel/src/process/posix_thread/futex.rs b/kernel/src/process/posix_thread/futex.rs index 0eea7d1a5..edc2e263a 100644 --- a/kernel/src/process/posix_thread/futex.rs +++ b/kernel/src/process/posix_thread/futex.rs @@ -73,7 +73,7 @@ pub fn futex_wait_bitset( // drop lock drop(futex_bucket); - let result = waiter.pause_timeout(timeout); + let result = waiter.pause_timeout(&timeout.into()); match result { // FIXME: If the futex is woken up and a signal comes at the same time, we should succeed // instead of failing with `EINTR`. The code below is of course wrong, but was needed to diff --git a/kernel/src/process/signal/pause.rs b/kernel/src/process/signal/pause.rs index 0de206a30..fa641dbe7 100644 --- a/kernel/src/process/signal/pause.rs +++ b/kernel/src/process/signal/pause.rs @@ -96,7 +96,7 @@ pub trait Pause: WaitTimeout { /// [`ETIME`]: crate::error::Errno::ETIME /// [`EINTR`]: crate::error::Errno::EINTR #[track_caller] - fn pause_timeout<'a>(&self, timeout: impl Into>) -> Result<()>; + fn pause_timeout<'a>(&self, timeout: &TimeoutExt<'a>) -> Result<()>; } impl Pause for Waiter { @@ -136,8 +136,8 @@ impl Pause for Waiter { res } - fn pause_timeout<'a>(&self, timeout: impl Into>) -> Result<()> { - let timer = timeout.into().check_expired()?.map(|timeout| { + fn pause_timeout<'a>(&self, timeout: &TimeoutExt<'a>) -> Result<()> { + let timer = timeout.check_expired()?.map(|timeout| { let waker = self.waker(); timeout.create_timer(move || { waker.wake_up(); @@ -201,7 +201,7 @@ impl Pause for WaitQueue { waiter.pause_until_or_timeout_impl(cond, timeout) } - fn pause_timeout<'a>(&self, _timeout: impl Into>) -> Result<()> { + fn pause_timeout<'a>(&self, _timeout: &TimeoutExt<'a>) -> Result<()> { panic!("`pause_timeout` can only be used on `Waiter`"); } } diff --git a/kernel/src/process/signal/poll.rs b/kernel/src/process/signal/poll.rs index 956b4f695..9d7c116f3 100644 --- a/kernel/src/process/signal/poll.rs +++ b/kernel/src/process/signal/poll.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 use core::{ - sync::atomic::{AtomicIsize, AtomicUsize, Ordering}, + sync::atomic::{AtomicIsize, Ordering}, time::Duration, }; @@ -13,6 +13,7 @@ use ostd::{ use crate::{ events::{IoEvents, Observer, Subject}, prelude::*, + time::wait::TimeoutExt, }; /// A pollee represents any I/O object (e.g., a file or socket) that can be polled. @@ -255,6 +256,7 @@ impl + 'static> PollAdaptor { impl PollAdaptor { /// Gets a reference to the observer. + #[expect(dead_code, reason = "Keep this `Arc` to avoid dropping the observer")] pub fn observer(&self) -> &Arc { &self.observer } @@ -267,76 +269,50 @@ impl PollAdaptor { /// A poller that can be used to wait for some events. pub struct Poller { - poller: PollAdaptor, + poller: PollHandle, waiter: Waiter, + timeout: TimeoutExt<'static>, } impl Poller { /// Constructs a new poller to wait for interesting events. - pub fn new() -> Self { - let (waiter, event_counter) = EventCounter::new_pair(); + /// + /// If `timeout` is specified, [`Self::wait`] will fail with [`ETIME`] after the specified + /// timeout is expired. + /// + /// [`ETIME`]: crate::error::Errno::ETIME + pub fn new(timeout: Option<&Duration>) -> Self { + let (waiter, waker) = Waiter::new_pair(); + + let mut timeout_ext = TimeoutExt::from(timeout); + timeout_ext.freeze(); Self { - poller: PollAdaptor::with_observer(event_counter), + poller: PollHandle::new(Arc::downgrade(&waker) as Weak<_>), waiter, + timeout: timeout_ext, } } /// Returns a mutable reference of [`PollHandle`]. pub fn as_handle_mut(&mut self) -> &mut PollHandle { - self.poller.as_handle_mut() + &mut self.poller } - /// Waits until some interesting events happen since the last wait or until the timeout - /// expires. + /// Waits until some interesting events happen since the last wait. /// - /// The waiting process can be interrupted by a signal. - pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { - self.poller.observer().read(&self.waiter, timeout)?; - Ok(()) + /// This method will fail with [`EINTR`] if interrupted by signals or [`ETIME`] on timeout. + /// + /// [`EINTR`]: crate::error::Errno::EINTR + /// [`ETIME`]: crate::error::Errno::ETIME + pub fn wait(&self) -> Result<()> { + self.waiter.pause_timeout(&self.timeout) } } -struct EventCounter { - counter: AtomicUsize, - waker: Arc, -} - -impl EventCounter { - fn new_pair() -> (Waiter, Self) { - let (waiter, waker) = Waiter::new_pair(); - - ( - waiter, - Self { - counter: AtomicUsize::new(0), - waker, - }, - ) - } - - fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result { - let cond = || { - let val = self.counter.swap(0, Ordering::Relaxed); - if val > 0 { - Some(val) - } else { - None - } - }; - - waiter.pause_until_or_timeout(cond, timeout) - } - - fn write(&self) { - self.counter.fetch_add(1, Ordering::Relaxed); - self.waker.wake_up(); - } -} - -impl Observer for EventCounter { +impl Observer for Waker { fn on_events(&self, _events: &IoEvents) { - self.write(); + self.wake_up(); } } @@ -391,10 +367,10 @@ pub trait Pollable { return_errno_with_message!(Errno::ETIME, "the timeout expired"); } - // Wait until the event happens. - let mut poller = Poller::new(); + // Create the poller and register to wait for the events. + let mut poller = Poller::new(timeout); if self.poll(mask, Some(poller.as_handle_mut())).is_empty() { - poller.wait(timeout)?; + poller.wait()?; } loop { @@ -405,9 +381,7 @@ pub trait Pollable { }; // Wait until the next event happens. - // - // FIXME: We need to update `timeout` since we have waited for some time. - poller.wait(timeout)?; + poller.wait()?; } } } diff --git a/kernel/src/syscall/poll.rs b/kernel/src/syscall/poll.rs index 0eb3e553e..a689db2b6 100644 --- a/kernel/src/syscall/poll.rs +++ b/kernel/src/syscall/poll.rs @@ -68,28 +68,24 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option<&Duration>, ctx: &Context) - PollFiles::new_owned(poll_fds, &file_table_locked) }; - let poller = match poll_files.register_poller() { + let poller = match poll_files.register_poller(timeout) { PollerResult::Registered(poller) => poller, PollerResult::FoundEvents(num_events) => return Ok(num_events), }; loop { - match poller.wait(timeout) { - Ok(_) => {} - Err(e) if e.error() == Errno::ETIME => { - // The return value is zero if the timeout expires - // before any file descriptors became ready - return Ok(0); - } - Err(e) => return Err(e), + match poller.wait() { + Ok(()) => (), + // We should return zero if the timeout expires + // before any file descriptors are ready. + Err(err) if err.error() == Errno::ETIME => return Ok(0), + Err(err) => return Err(err), }; let num_events = poll_files.count_events(); if num_events > 0 { return Ok(num_events); } - - // FIXME: We need to update `timeout` since we have waited for some time. } } @@ -136,8 +132,8 @@ enum PollerResult { impl PollFiles<'_> { /// Registers the files with a poller, or exits early if some events are detected. - fn register_poller(&self) -> PollerResult { - let mut poller = Poller::new(); + fn register_poller(&self, timeout: Option<&Duration>) -> PollerResult { + let mut poller = Poller::new(timeout); for (index, poll_fd) in self.poll_fds.iter().enumerate() { let events = if let Some(file) = self.file_at(index) { diff --git a/kernel/src/time/core/timer.rs b/kernel/src/time/core/timer.rs index 36d51e579..13d2f978e 100644 --- a/kernel/src/time/core/timer.rs +++ b/kernel/src/time/core/timer.rs @@ -160,6 +160,11 @@ impl TimerManager { }) } + /// Returns the clock associated with this timer manager. + pub fn clock(&self) -> &Arc { + &self.clock + } + /// Returns whether a given `timeout` is expired. pub fn is_expired_timeout(&self, timeout: &Timeout) -> bool { match timeout { diff --git a/kernel/src/time/wait.rs b/kernel/src/time/wait.rs index a246211fa..db5e1201c 100644 --- a/kernel/src/time/wait.rs +++ b/kernel/src/time/wait.rs @@ -78,6 +78,16 @@ impl<'a> TimeoutExt<'a> { TimeoutExt::Never => Ok(None), } } + + /// Freezes the expired time. + /// + /// This works in the same way as [`ManagedTimeout::freeze`]. + pub fn freeze(&mut self) { + match self { + Self::Never => (), + Self::At(timeout) => timeout.freeze(), + } + } } impl From<&Duration> for TimeoutExt<'_> { @@ -134,6 +144,18 @@ impl<'a> ManagedTimeout<'a> { self.manager.is_expired_timeout(&self.timeout) } + /// Freezes the expired time. + /// + /// If the timeout is specified as an instant after a period of time from the current time + /// (i.e., [`Timeout::After`]), this method will freeze the timeout by converting it to a fixed + /// instant (i.e., [`Timeout::When`]). + pub fn freeze(&mut self) { + self.timeout = match self.timeout { + Timeout::When(instant) => Timeout::When(instant), + Timeout::After(duration) => Timeout::When(self.manager.clock().read_time() + duration), + } + } + /// Creates a timer for the timeout. pub fn create_timer(&self, callback: F) -> Arc where