From 1873bb7a3f785ed062b4765d5dc2761994c163cb Mon Sep 17 00:00:00 2001 From: Chen Chengjun Date: Sat, 14 Sep 2024 10:13:14 +0800 Subject: [PATCH] Add wait_until_* functionality to Waiter --- kernel/src/net/iface/poll.rs | 2 +- kernel/src/process/sync/condvar.rs | 4 +- kernel/src/thread/work_queue/worker_pool.rs | 2 +- kernel/src/time/wait.rs | 113 ++++++++++++++++---- ostd/src/sync/wait.rs | 81 +++++++------- 5 files changed, 140 insertions(+), 62 deletions(-) diff --git a/kernel/src/net/iface/poll.rs b/kernel/src/net/iface/poll.rs index 0a57e9d4d..fdb2d7916 100644 --- a/kernel/src/net/iface/poll.rs +++ b/kernel/src/net/iface/poll.rs @@ -59,7 +59,7 @@ fn spawn_background_poll_thread(iface: Arc) { } let duration = Duration::from_millis(next_poll_at_ms - now_as_ms); - wait_queue.wait_until_or_timeout( + let _ = wait_queue.wait_until_or_timeout( // If `iface_ext.next_poll_at_ms()` changes to an earlier time, we will end the // waiting. || (iface_ext.next_poll_at_ms()? < next_poll_at_ms).then_some(()), diff --git a/kernel/src/process/sync/condvar.rs b/kernel/src/process/sync/condvar.rs index 944fe070f..b4cbde7af 100644 --- a/kernel/src/process/sync/condvar.rs +++ b/kernel/src/process/sync/condvar.rs @@ -164,8 +164,8 @@ impl Condvar { // Wait until the condition becomes true, we're explicitly woken up, or the timeout elapses. let res = self.waitqueue.wait_until_or_timeout(cond, &timeout); match res { - Some(_) => Ok((lock.lock(), false)), - None => { + Ok(()) => Ok((lock.lock(), false)), + Err(_) => { let mut counter = self.counter.lock(); counter.waiter_count -= 1; Err(LockErr::Timeout((lock.lock(), true))) diff --git a/kernel/src/thread/work_queue/worker_pool.rs b/kernel/src/thread/work_queue/worker_pool.rs index c12167b0e..c1a5b467d 100644 --- a/kernel/src/thread/work_queue/worker_pool.rs +++ b/kernel/src/thread/work_queue/worker_pool.rs @@ -264,7 +264,7 @@ impl Monitor { for local_pool in worker_pool.local_pools.iter() { local_pool.set_heartbeat(false); } - sleep_queue.wait_until_or_timeout(|| -> Option<()> { None }, &sleep_duration); + let _ = sleep_queue.wait_until_or_timeout(|| -> Option<()> { None }, &sleep_duration); } } } diff --git a/kernel/src/time/wait.rs b/kernel/src/time/wait.rs index efc830e07..691633dd8 100644 --- a/kernel/src/time/wait.rs +++ b/kernel/src/time/wait.rs @@ -5,48 +5,121 @@ use core::time::Duration; use ostd::sync::{WaitQueue, Waiter}; use super::{clocks::JIFFIES_TIMER_MANAGER, timer::Timeout}; +use crate::prelude::*; -/// A trait that provide the timeout related function for [`WaitQueue`]`. +/// A trait that provide the timeout related function for [`Waiter`] and [`WaitQueue`]`. pub trait WaitTimeout { - /// Wait until some condition returns `Some(_)`, or a given timeout is reached. If - /// the condition does not becomes `Some(_)` before the timeout is reached, the - /// function will return `None`. - fn wait_until_or_timeout(&self, cond: F, timeout: &Duration) -> Option - where - F: FnMut() -> Option; -} - -impl WaitTimeout for WaitQueue { - fn wait_until_or_timeout(&self, mut cond: F, timeout: &Duration) -> Option + /// Waits until some condition returns `Some(_)`, or a given timeout is reached. If + /// the condition does not becomes `Some(_)` before the timeout is reached, + /// this function will return `ETIME` error. + fn wait_until_or_timeout(&self, cond: F, timeout: &Duration) -> Result where F: FnMut() -> Option, + { + self.wait_until_or_timeout_cancelled(cond, || Ok(()), timeout) + } + + /// Waits until some condition returns `Some(_)`, or be cancelled due to + /// reaching the timeout or the inputted cancel condition. If the condition + /// does not becomes `Some(_)` before the timeout is reached or `cancel_cond` + /// returns `Err`, this function will return corresponding `Err`. + #[doc(hidden)] + fn wait_until_or_timeout_cancelled( + &self, + cond: F, + cancel_cond: FCancel, + timeout: &Duration, + ) -> Result + where + F: FnMut() -> Option, + FCancel: Fn() -> Result<()>; +} + +impl WaitTimeout for Waiter { + fn wait_until_or_timeout_cancelled( + &self, + mut cond: F, + cancel_cond: FCancel, + timeout: &Duration, + ) -> Result + where + F: FnMut() -> Option, + FCancel: Fn() -> Result<()>, { if *timeout == Duration::ZERO { - return cond(); + return cond() + .ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached")); } if let Some(res) = cond() { - return Some(res); + return Ok(res); } - let (waiter, waker) = Waiter::new_pair(); - + let waker = self.waker(); let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(move || { waker.wake_up(); }); jiffies_timer.set_timeout(Timeout::After(*timeout)); - let cancel_cond = { + let timeout_cond = { let jiffies_timer = jiffies_timer.clone(); - move || jiffies_timer.remain() == Duration::ZERO + move || { + if jiffies_timer.remain() != Duration::ZERO { + Ok(()) + } else { + Err(Error::with_message( + Errno::ETIME, + "the time limit is reached", + )) + } + } }; - let res = self.wait_until_or_cancelled(cond, waiter, cancel_cond); - // If res is `Some`, then the timeout may not have been expired. We cancel it manually. - if res.is_some() { + let cancel_cond = || { + timeout_cond()?; + cancel_cond() + }; + + let res = self.wait_until_or_cancelled(cond, cancel_cond); + + // If `res` is not `ETIME` error, then the timeout may not have been expired. + // We cancel it manually. + if !res + .as_ref() + .is_err_and(|e: &Error| e.error() == Errno::ETIME) + { jiffies_timer.cancel(); } res } } + +impl WaitTimeout for WaitQueue { + fn wait_until_or_timeout_cancelled( + &self, + mut cond: F, + cancel_cond: FCancel, + timeout: &Duration, + ) -> Result + where + F: FnMut() -> Option, + FCancel: Fn() -> Result<()>, + { + if *timeout == Duration::ZERO { + return cond() + .ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached")); + } + + if let Some(res) = cond() { + return Ok(res); + } + + let (waiter, _) = Waiter::new_pair(); + let cond = || { + self.enqueue(waiter.waker()); + cond() + }; + waiter.wait_until_or_timeout_cancelled(cond, cancel_cond, timeout) + } +} diff --git a/ostd/src/sync/wait.rs b/ostd/src/sync/wait.rs index 56ca700c0..37ccd7847 100644 --- a/ostd/src/sync/wait.rs +++ b/ostd/src/sync/wait.rs @@ -74,46 +74,15 @@ impl WaitQueue { } let (waiter, _) = Waiter::new_pair(); - - self.wait_until_or_cancelled(cond, waiter, || false) + let cond = || { + self.enqueue(waiter.waker()); + cond() + }; + waiter + .wait_until_or_cancelled(cond, || Ok::<(), ()>(())) .unwrap() } - /// Waits until some condition is met or the cancel condition becomes true. - /// - /// This method will return `Some(_)` if the condition returns `Some(_)`, and will return - /// the condition test result regardless what it is when the cancel condition becomes true. - #[doc(hidden)] - pub fn wait_until_or_cancelled( - &self, - mut cond: F, - waiter: Waiter, - cancel_cond: FCancel, - ) -> Option - where - F: FnMut() -> Option, - FCancel: Fn() -> bool, - { - let waker = waiter.waker(); - - loop { - // Enqueue the waker before checking `cond()` to avoid races - self.enqueue(waker.clone()); - - if let Some(res) = cond() { - return Some(res); - }; - - if cancel_cond() { - // Drop the waiter and check again to avoid missing a wake event. - drop(waiter); - return cond(); - } - - waiter.wait(); - } - } - /// Wakes up one waiting thread, if there is one at the point of time when this method is /// called, returning whether such a thread was woken up. pub fn wake_one(&self) -> bool { @@ -170,7 +139,9 @@ impl WaitQueue { self.num_wakers.fetch_add(0, Ordering::Release) == 0 } - fn enqueue(&self, waker: Arc) { + /// Enqueues the input [`Waker`] to the wait queue. + #[doc(hidden)] + pub fn enqueue(&self, waker: Arc) { let mut wakers = self.wakers.disable_irq().lock(); wakers.push_back(waker); self.num_wakers.fetch_add(1, Ordering::Acquire); @@ -227,10 +198,44 @@ impl Waiter { self.waker.do_wait(); } + /// Waits until some condition is met or the cancel condition becomes true. + /// + /// This method will return `Ok(_)` if the condition returns `Some(_)`, and will stop waiting + /// if the cancel condition returns `Err(_)`. In this situation, this method will return the `Err(_)` + /// generated by the cancel condition. + pub fn wait_until_or_cancelled( + &self, + mut cond: F, + cancel_cond: FCancel, + ) -> core::result::Result + where + F: FnMut() -> Option, + FCancel: Fn() -> core::result::Result<(), E>, + { + loop { + if let Some(res) = cond() { + return Ok(res); + }; + + if let Err(e) = cancel_cond() { + // Close the waker and check again to avoid missing a wake event. + self.waker.close(); + return cond().ok_or(e); + } + + self.wait(); + } + } + /// Gets the associated [`Waker`] of the current waiter. pub fn waker(&self) -> Arc { self.waker.clone() } + + /// Returns the task that the associated waker will attempt to wake up. + pub fn task(&self) -> &Arc { + &self.waker.task + } } impl Drop for Waiter {