diff --git a/kernel/aster-nix/src/process/signal/pauser.rs b/kernel/aster-nix/src/process/signal/pauser.rs index a1197056f..25c4f6f64 100644 --- a/kernel/aster-nix/src/process/signal/pauser.rs +++ b/kernel/aster-nix/src/process/signal/pauser.rs @@ -16,19 +16,21 @@ use crate::{ /// A `Pauser` allows pausing the execution of the current thread until certain conditions are reached. /// -/// Behind the scene, `Pauser` is implemented with `Waiter` and `WaiterQueue`. -/// But unlike its `Waiter` relatives, `Pauser` is aware of POSIX signals: +/// Behind the scene, `Pauser` is implemented with [`Waiter`] and [`WaitQueue`]. +/// But unlike its [`Waiter`] relatives, `Pauser` is aware of POSIX signals: /// if a thread paused by a `Pauser` receives a signal, then the thread will resume its execution. /// -/// Another key difference is that `Pauser` combines the two roles of `Waiter` and `WaiterQueue` +/// Another key difference is that `Pauser` combines the two roles of [`Waiter`] and [`WaitQueue`] /// into one. Both putting the current thread to sleep and waking it up can be done through the /// same `Pauser` object, using its `pause`- and `resume`-family methods. /// +/// [`Waiter`]: aster_frame::sync::Waiter +/// /// # Example /// /// Here is how the current thread can be put to sleep with a `Pauser`. /// -/// ```rust +/// ```no_run /// let pauser = Pauser::new(SigMask::new_full()); /// // Pause the execution of the current thread until a user-given condition is met /// // or the current thread is interrupted by a signal. @@ -54,36 +56,40 @@ use crate::{ /// Then, this second thread can resume the execution of the first thread /// even when `cond()` does not return `true`. /// -/// ``` +/// ```no_run /// pauser.resume_all(); /// ``` pub struct Pauser { wait_queue: WaitQueue, sig_mask: SigMask, - is_interrupted: AtomicBool, } impl Pauser { - /// Create a new `Pauser`. The `Pauser` can be interrupted by all signals except that - /// are blocked by current thread. + /// Creates a new `Pauser`. + /// + /// The `Pauser` can be interrupted by all signals + /// except that are blocked by current thread. pub fn new() -> Arc { Self::new_with_mask(SigMask::new_empty()) } - /// Create a new `Pauser`, the `Pauser` will ignore signals that are in `sig_mask` and - /// blocked by current thread. + /// Creates a new `Pauser` with specified `sig_mask`. + /// + /// The `Pauser` will ignore signals that are in `sig_mask` + /// or blocked by current thread. pub fn new_with_mask(sig_mask: SigMask) -> Arc { let wait_queue = WaitQueue::new(); Arc::new(Self { wait_queue, sig_mask, - is_interrupted: AtomicBool::new(false), }) } - /// Pause the execution of current thread until the `cond` is met ( i.e., `cond()` + /// Pauses the execution of current thread until the `cond` is met ( i.e., `cond()` /// returns `Some(_)` ), or some signal is received by current thread or process. /// + /// # Errors + /// /// If some signal is received before `cond` is met, this method will returns `Err(EINTR)`. pub fn pause_until(self: &Arc, cond: F) -> Result where @@ -92,12 +98,16 @@ impl Pauser { self.do_pause(cond, None) } - /// Pause the execution of current thread until the `cond` is met ( i.e., `cond()` returns + /// Pauses the execution of current thread until the `cond` is met ( i.e., `cond()` returns /// `Some(_)` ), or some signal is received by current thread or process, or the given /// `timeout` is expired. /// + /// # Errors + /// /// If `timeout` is expired before the `cond` is met or some signal is received, - /// it will returns `Err(ETIME)`. + /// it will returns [`ETIME`]. + /// + /// [`ETIME`]: crate::error::Errno::ETIME pub fn pause_until_or_timeout(self: &Arc, cond: F, timeout: &Duration) -> Result where F: FnMut() -> Option, @@ -109,8 +119,6 @@ impl Pauser { where F: FnMut() -> Option, { - self.is_interrupted.store(false, Ordering::Release); - let current_thread = current_thread!(); let posix_thread = current_thread.as_posix_thread().unwrap(); @@ -128,12 +136,13 @@ impl Pauser { }; // Register observer on sigqueue - let observer = Arc::downgrade(self) as Weak>; - posix_thread.register_sigqueue_observer(observer.clone(), filter); + let observer = SigQueueObserver::new(self.clone()); + let weak_observer = Arc::downgrade(&observer) as Weak>; + posix_thread.register_sigqueue_observer(weak_observer.clone(), filter); // Some signal may come before we register observer, so we do another check here. - if posix_thread.has_pending_signals() { - self.is_interrupted.store(true, Ordering::Release); + if posix_thread.has_pending() { + observer.set_interrupted(); } enum Res { @@ -141,16 +150,19 @@ impl Pauser { Interrupted, } - let cond = || { - if let Some(res) = cond() { - return Some(Res::Ok(res)); - } + let cond = { + let cloned_observer = observer.clone(); + move || { + if let Some(res) = cond() { + return Some(Res::Ok(res)); + } - if self.is_interrupted.load(Ordering::Acquire) { - return Some(Res::Interrupted); - } + if cloned_observer.is_interrupted() { + return Some(Res::Interrupted); + } - None + None + } }; let res = if let Some(timeout) = timeout { @@ -161,7 +173,8 @@ impl Pauser { Ok(self.wait_queue.wait_until(cond)) }; - posix_thread.unregiser_sigqueue_observer(&observer); + // Restore the state + posix_thread.unregiser_sigqueue_observer(&weak_observer); posix_thread.sig_mask().lock().set(old_mask.as_u64()); match res? { @@ -170,20 +183,42 @@ impl Pauser { } } - /// Resume all paused threads on this pauser. + /// Resumes all paused threads on this pauser. pub fn resume_all(&self) { self.wait_queue.wake_all(); } - /// Resume one paused thread on this pauser. + /// Resumes one paused thread on this pauser. pub fn resume_one(&self) { self.wait_queue.wake_one(); } } -impl Observer for Pauser { - fn on_events(&self, events: &SigEvents) { +struct SigQueueObserver { + is_interrupted: AtomicBool, + pauser: Arc, +} + +impl SigQueueObserver { + fn new(pauser: Arc) -> Arc { + Arc::new(Self { + is_interrupted: AtomicBool::new(false), + pauser, + }) + } + + fn is_interrupted(&self) -> bool { + self.is_interrupted.load(Ordering::Acquire) + } + + fn set_interrupted(&self) { self.is_interrupted.store(true, Ordering::Release); - self.wait_queue.wake_all(); + } +} + +impl Observer for SigQueueObserver { + fn on_events(&self, _: &SigEvents) { + self.set_interrupted(); + self.pauser.wait_queue.wake_all(); } }