diff --git a/kernel/src/prelude.rs b/kernel/src/prelude.rs index fc3c22e55..356e07e48 100644 --- a/kernel/src/prelude.rs +++ b/kernel/src/prelude.rs @@ -54,6 +54,7 @@ pub(crate) use crate::{ current, current_thread, error::{Errno, Error}, print, println, + process::signal::Pause, time::{wait::WaitTimeout, Clock}, }; pub(crate) type Result = core::result::Result; diff --git a/kernel/src/process/posix_thread/builder.rs b/kernel/src/process/posix_thread/builder.rs index 54002ab1c..b7ae0d676 100644 --- a/kernel/src/process/posix_thread/builder.rs +++ b/kernel/src/process/posix_thread/builder.rs @@ -103,6 +103,7 @@ impl PosixThreadBuilder { sig_queues, sig_context: Mutex::new(None), sig_stack: Mutex::new(None), + signalled_waker: SpinLock::new(None), robust_list: Mutex::new(None), prof_clock, virtual_timer_manager, diff --git a/kernel/src/process/posix_thread/mod.rs b/kernel/src/process/posix_thread/mod.rs index 17a7d2f84..49f70f55f 100644 --- a/kernel/src/process/posix_thread/mod.rs +++ b/kernel/src/process/posix_thread/mod.rs @@ -5,6 +5,7 @@ use core::sync::atomic::Ordering; use aster_rights::{ReadOp, WriteOp}; +use ostd::sync::Waker; use super::{ kill::SignalSenderIds, @@ -41,7 +42,6 @@ pub use robust_list::RobustListHead; pub struct PosixThread { // Immutable part process: Weak, - // Mutable part name: Mutex>, @@ -64,6 +64,9 @@ pub struct PosixThread { /// FIXME: This field may be removed. For glibc applications with RESTORER flag set, the sig_context is always equals with rsp. sig_context: Mutex>, sig_stack: Mutex>, + /// The per-thread signal [`Waker`], which will be used to wake up the thread + /// when enqueuing a signal. + signalled_waker: SpinLock>>, /// A profiling clock measures the user CPU time and kernel CPU time in the thread. prof_clock: Arc, @@ -171,10 +174,33 @@ impl PosixThread { return_errno_with_message!(Errno::EPERM, "sending signal to the thread is not allowed."); } + /// Sets the input [`Waker`] as the signalled waker of this thread. + /// + /// This approach can collaborate with signal-aware wait methods. + /// Once a signalled waker is set for a thread, it cannot be reset until it is cleared. + /// + /// # Panics + /// + /// If setting a new waker before clearing the current thread's signalled waker + /// this method will panic. + pub fn set_signalled_waker(&self, waker: Arc) { + let mut signalled_waker = self.signalled_waker.lock(); + assert!(signalled_waker.is_none()); + *signalled_waker = Some(waker); + } + + /// Clears the signalled waker of this thread. + pub fn clear_signalled_waker(&self) { + *self.signalled_waker.lock() = None; + } + /// Enqueues a thread-directed signal. This method should only be used for enqueue kernel /// signal and fault signal. pub fn enqueue_signal(&self, signal: Box) { self.sig_queues.enqueue(signal); + if let Some(waker) = &*self.signalled_waker.lock() { + waker.wake_up(); + } } /// Returns a reference to the profiling clock of the current thread. @@ -216,7 +242,7 @@ impl PosixThread { self.sig_queues.register_observer(observer, filter); } - pub fn unregiser_sigqueue_observer(&self, observer: &Weak>) { + pub fn unregister_sigqueue_observer(&self, observer: &Weak>) { self.sig_queues.unregister_observer(observer); } diff --git a/kernel/src/process/signal/mod.rs b/kernel/src/process/signal/mod.rs index 40406b1fe..29bf2b5cc 100644 --- a/kernel/src/process/signal/mod.rs +++ b/kernel/src/process/signal/mod.rs @@ -3,7 +3,7 @@ pub mod c_types; pub mod constants; mod events; -mod pauser; +mod pause; mod poll; pub mod sig_action; pub mod sig_disposition; @@ -19,7 +19,7 @@ use align_ext::AlignExt; use c_types::{siginfo_t, ucontext_t}; pub use events::{SigEvents, SigEventsFilter}; use ostd::{cpu::UserContext, user::UserContextApi}; -pub use pauser::Pauser; +pub use pause::{with_signal_blocked, Pause}; pub use poll::{Pollable, Pollee, Poller}; use sig_action::{SigAction, SigActionFlags, SigDefaultAction}; use sig_mask::SigMask; diff --git a/kernel/src/process/signal/pause.rs b/kernel/src/process/signal/pause.rs new file mode 100644 index 000000000..f00375263 --- /dev/null +++ b/kernel/src/process/signal/pause.rs @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: MPL-2.0 + +use core::{sync::atomic::Ordering, time::Duration}; + +use ostd::sync::{WaitQueue, Waiter}; + +use super::sig_mask::SigMask; +use crate::{ + prelude::*, process::posix_thread::PosixThreadExt, thread::Thread, time::wait::WaitTimeout, +}; + +/// `Pause` is an extension trait to make [`Waiter`] and [`WaitQueue`] signal aware. +/// +/// The original methods of `Waiter` and `WaitQueue` only allow a thread +/// to wait (via `wait`) until it is woken up (via `wake_up`, `wake_one`, or `wake_all`) +/// or a condition is met (via `wait_until`). +/// The `WaitTimeout` extension trait grants the extra ability +/// to wait until a timeout (via `wait_until_or_timeout`). +/// On top of `WaitTimeout`, this `Pause` trait provides the `pause`-family methods, +/// which are similar to the `wait`-family methods except that the methods also return +/// when the waiting thread is interrupted by a POSIX signal. +/// When this happens, the `pause`-family methods return `Err(EINTR)`. +pub trait Pause: WaitTimeout { + /// Pauses the execution of the current thread until the `cond` is met ( i.e., `cond()` + /// returns `Some(_)` ), or some signals are received by the current thread or process. + /// + /// # Errors + /// + /// If some signals are received before `cond` is met, this method will return `Err(EINTR)`. + /// + /// [`EINTR`]: crate::error::Errno::EINTR + fn pause_until(&self, cond: F) -> Result + where + F: FnMut() -> Option, + { + self.pause_until_or_timeout_opt(cond, None) + } + + /// Pauses the execution of the current thread until the `cond` is met ( i.e., `cond()` + /// returns `Some(_)` ), or some signals are received by the current thread or process, + /// or the given `timeout` is expired. + /// + /// # Errors + /// + /// If `timeout` is expired before the `cond` is met or some signals are received, + /// this method will return `Err(ETIME)`. If the pausing is interrupted by some signals, + /// this method will return `Err(EINTR)` + /// + /// [`ETIME`]: crate::error::Errno::ETIME + /// [`EINTR`]: crate::error::Errno::EINTR + fn pause_until_or_timeout(&self, mut cond: F, timeout: &Duration) -> Result + where + F: FnMut() -> Option, + { + if *timeout == Duration::ZERO { + return cond() + .ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached")); + } + self.pause_until_or_timeout_opt(cond, Some(timeout)) + } + + /// Pauses the execution of the current thread until the `cond` is met ( i.e., `cond()` + /// returns `Some(_)` ), or some signals are received by the current thread or process. + /// If the input `timeout` is set, the pausing will finish when the `timeout` is expired. + /// + /// # Errors + /// + /// If `timeout` is expired before the `cond` is met or some signals are received, + /// this method will return `Err(ETIME)`. If the pausing is interrupted by some signals, + /// this method will return `Err(EINTR)` + /// + /// [`ETIME`]: crate::error::Errno::ETIME + /// [`EINTR`]: crate::error::Errno::EINTR + #[doc(hidden)] + fn pause_until_or_timeout_opt(&self, cond: F, timeout: Option<&Duration>) -> Result + where + F: FnMut() -> Option; +} + +impl Pause for Waiter { + fn pause_until_or_timeout_opt(&self, mut cond: F, timeout: Option<&Duration>) -> Result + where + F: FnMut() -> Option, + { + if let Some(res) = cond() { + return Ok(res); + } + + let current_thread = self + .task() + .data() + .downcast_ref::>() + .and_then(|thread| thread.upgrade()); + + let Some(posix_thread) = current_thread + .as_ref() + .and_then(|thread| thread.as_posix_thread()) + else { + if let Some(timeout) = timeout { + return self.wait_until_or_timeout(cond, timeout); + } else { + return self.wait_until_or_cancelled(cond, || Ok(())); + } + }; + + let cancel_cond = || { + if posix_thread.has_pending() { + return Err(Error::with_message( + Errno::EINTR, + "the current thread is interrupted by a signal", + )); + } + Ok(()) + }; + + posix_thread.set_signalled_waker(self.waker()); + let res = if let Some(timeout) = timeout { + self.wait_until_or_timeout_cancelled(cond, cancel_cond, timeout) + } else { + self.wait_until_or_cancelled(cond, cancel_cond) + }; + posix_thread.clear_signalled_waker(); + res + } +} + +impl Pause for WaitQueue { + fn pause_until_or_timeout_opt(&self, mut cond: F, timeout: Option<&Duration>) -> Result + where + F: FnMut() -> Option, + { + if let Some(res) = cond() { + return Ok(res); + } + + let (waiter, _) = Waiter::new_pair(); + let cond = || { + self.enqueue(waiter.waker()); + cond() + }; + waiter.pause_until_or_timeout_opt(cond, timeout) + } +} + +/// Executes a closure while temporarily blocking some signals for the current POSIX thread. +pub fn with_signal_blocked(ctx: &Context, mask: SigMask, operate: impl FnOnce() -> R) -> R { + let posix_thread = ctx.posix_thread; + let sig_mask = posix_thread.sig_mask(); + + let old_mask = sig_mask.load(Ordering::Relaxed); + sig_mask.store(old_mask + mask, Ordering::Relaxed); + + let res = operate(); + + sig_mask.store(old_mask, Ordering::Relaxed); + + res +} + +#[cfg(ktest)] +mod test { + use core::sync::atomic::AtomicBool; + + use ostd::prelude::*; + + use super::*; + use crate::thread::{ + kernel_thread::{KernelThreadExt, ThreadOptions}, + Thread, + }; + + #[ktest] + fn test_waiter_pause() { + let wait_queue = Arc::new(WaitQueue::new()); + let wait_queue_cloned = wait_queue.clone(); + + let boolean = Arc::new(AtomicBool::new(false)); + let boolean_cloned = boolean.clone(); + + let thread = Thread::spawn_kernel_thread(ThreadOptions::new(move || { + Thread::yield_now(); + + boolean_cloned.store(true, Ordering::Relaxed); + wait_queue_cloned.wake_all(); + })); + + wait_queue + .pause_until(|| boolean.load(Ordering::Relaxed).then_some(())) + .unwrap(); + + thread.join(); + } +} diff --git a/kernel/src/process/signal/pauser.rs b/kernel/src/process/signal/pauser.rs deleted file mode 100644 index c200dc465..000000000 --- a/kernel/src/process/signal/pauser.rs +++ /dev/null @@ -1,297 +0,0 @@ -// SPDX-License-Identifier: MPL-2.0 - -#![allow(unused_variables)] - -use core::{ - sync::atomic::{AtomicBool, Ordering}, - time::Duration, -}; - -use ostd::sync::WaitQueue; - -use super::{sig_mask::SigMask, SigEvents, SigEventsFilter}; -use crate::{ - events::Observer, - prelude::*, - process::posix_thread::{PosixThread, PosixThreadExt}, - thread::Thread, - time::wait::WaitTimeout, -}; - -/// A `Pauser` allows pausing the execution of the current thread until certain conditions are reached. -/// -/// Behind the scene, `Pauser` is implemented with [`Waiter`] and [`WaitQueue`]. -/// But unlike its [`Waiter`] relatives, `Pauser` is aware of POSIX signals: -/// if a thread paused by a `Pauser` receives a signal, then the thread will resume its execution. -/// -/// Another key difference is that `Pauser` combines the two roles of [`Waiter`] and [`WaitQueue`] -/// into one. Both putting the current thread to sleep and waking it up can be done through the -/// same `Pauser` object, using its `pause`- and `resume`-family methods. -/// -/// [`Waiter`]: ostd::sync::Waiter -/// -/// # Example -/// -/// Here is how the current thread can be put to sleep with a `Pauser`. -/// -/// ```no_run -/// let pauser = Pauser::new(SigMask::new_full()); -/// // Pause the execution of the current thread until a user-given condition is met -/// // or the current thread is interrupted by a signal. -/// let res = pauser.pause_until(|| { -/// if cond() { -/// Some(()) -/// } else { -/// None -/// } -/// }); -/// match res { -/// Ok(_) => { -/// // The user-given condition is met... -/// } -/// Err(EINTR) => { -/// // A signal is received... -/// } -/// _ => unreachable!() -/// } -/// ``` -/// -/// Let's assume that another thread has access to the same object of `Arc`. -/// Then, this second thread can resume the execution of the first thread -/// even when `cond()` does not return `true`. -/// -/// ```no_run -/// pauser.resume_all(); -/// ``` -pub struct Pauser { - wait_queue: WaitQueue, - sig_mask: SigMask, -} - -impl Pauser { - /// Creates a new `Pauser`. - /// - /// The `Pauser` can be interrupted by all signals - /// except that are blocked by current thread. - pub fn new() -> Arc { - Self::new_with_mask(SigMask::new_empty()) - } - - /// Creates a new `Pauser` with specified `sig_mask`. - /// - /// The `Pauser` will ignore signals that are in `sig_mask` - /// or blocked by current thread. - pub fn new_with_mask(sig_mask: SigMask) -> Arc { - let wait_queue = WaitQueue::new(); - Arc::new(Self { - wait_queue, - sig_mask, - }) - } - - /// Pauses the execution of current thread until the `cond` is met ( i.e., `cond()` - /// returns `Some(_)` ), or some signal is received by current thread or process. - /// - /// # Errors - /// - /// If some signal is received before `cond` is met, this method will returns `Err(EINTR)`. - pub fn pause_until(self: &Arc, cond: F) -> Result - where - F: FnMut() -> Option, - { - self.do_pause(cond, None) - } - - /// Pauses the execution of current thread until the `cond` is met ( i.e., `cond()` returns - /// `Some(_)` ), or some signal is received by current thread or process, or the given - /// `timeout` is expired. - /// - /// # Errors - /// - /// If `timeout` is expired before the `cond` is met or some signal is received, - /// it will returns [`ETIME`]. - /// - /// [`ETIME`]: crate::error::Errno::ETIME - pub fn pause_until_or_timeout(self: &Arc, cond: F, timeout: &Duration) -> Result - where - F: FnMut() -> Option, - { - self.do_pause(cond, Some(timeout)) - } - - fn do_pause(self: &Arc, mut cond: F, timeout: Option<&Duration>) -> Result - where - F: FnMut() -> Option, - { - let current_thread = Thread::current(); - let sig_queue_waiter = - SigObserverRegistrar::new(current_thread.as_ref(), self.sig_mask, self.clone()); - - let cond = || { - if let Some(res) = cond() { - return Some(Ok(res)); - } - - if sig_queue_waiter.is_interrupted() { - return Some(Err(Error::with_message( - Errno::EINTR, - "the current thread is interrupted by a signal", - ))); - } - - None - }; - - if let Some(timeout) = timeout { - self.wait_queue - .wait_until_or_timeout(cond, timeout) - .ok_or_else(|| Error::with_message(Errno::ETIME, "the time limit is reached"))? - } else { - self.wait_queue.wait_until(cond) - } - } - - /// Resumes all paused threads on this pauser. - pub fn resume_all(&self) { - self.wait_queue.wake_all(); - } - - /// Resumes one paused thread on this pauser. - pub fn resume_one(&self) { - self.wait_queue.wake_one(); - } -} - -enum SigObserverRegistrar<'a> { - // A POSIX thread may be interrupted by a signal if the signal is not masked. - PosixThread { - thread: &'a PosixThread, - old_mask: SigMask, - observer: Arc, - }, - // A kernel thread ignores all signals. It is not necessary to wait for them. - KernelThread, -} - -impl<'a> SigObserverRegistrar<'a> { - fn new( - current_thread: Option<&'a Arc>, - sig_mask: SigMask, - pauser: Arc, - ) -> Self { - let Some(thread) = current_thread.and_then(|thread| thread.as_posix_thread()) else { - return Self::KernelThread; - }; - - // Block `sig_mask`. - let (old_mask, filter) = { - let old_mask = thread.sig_mask().load(Ordering::Relaxed); - let new_mask = old_mask + sig_mask; - thread.sig_mask().store(new_mask, Ordering::Relaxed); - - (old_mask, SigEventsFilter::new(new_mask)) - }; - - // Register `SigQueueObserver`. - let observer = SigQueueObserver::new(pauser); - thread.register_sigqueue_observer(Arc::downgrade(&observer) as _, filter); - - // Check pending signals after registering the observer to avoid race conditions. - if thread.has_pending() { - observer.set_interrupted(); - } - - Self::PosixThread { - thread, - old_mask, - observer, - } - } - - fn is_interrupted(&self) -> bool { - match self { - Self::PosixThread { observer, .. } => observer.is_interrupted(), - Self::KernelThread => false, - } - } -} - -impl<'a> Drop for SigObserverRegistrar<'a> { - fn drop(&mut self) { - let Self::PosixThread { - thread, - old_mask, - observer, - } = self - else { - return; - }; - - // Restore the state, assuming no one else can modify the current thread's signal mask - // during the pause. - thread.unregiser_sigqueue_observer(&(Arc::downgrade(observer) as _)); - thread.sig_mask().store(*old_mask, Ordering::Relaxed); - } -} - -struct SigQueueObserver { - is_interrupted: AtomicBool, - pauser: Arc, -} - -impl SigQueueObserver { - fn new(pauser: Arc) -> Arc { - Arc::new(Self { - is_interrupted: AtomicBool::new(false), - pauser, - }) - } - - fn is_interrupted(&self) -> bool { - self.is_interrupted.load(Ordering::Acquire) - } - - fn set_interrupted(&self) { - self.is_interrupted.store(true, Ordering::Release); - } -} - -impl Observer for SigQueueObserver { - fn on_events(&self, _: &SigEvents) { - self.set_interrupted(); - self.pauser.wait_queue.wake_all(); - } -} - -#[cfg(ktest)] -mod test { - use ostd::prelude::*; - - use super::*; - use crate::thread::{ - kernel_thread::{KernelThreadExt, ThreadOptions}, - Thread, - }; - - #[ktest] - fn test_pauser() { - let pauser = Pauser::new(); - let pauser_cloned = pauser.clone(); - - let boolean = Arc::new(AtomicBool::new(false)); - let boolean_cloned = boolean.clone(); - - let thread = Thread::spawn_kernel_thread(ThreadOptions::new(move || { - Thread::yield_now(); - - boolean_cloned.store(true, Ordering::Relaxed); - pauser_cloned.resume_all(); - })); - - pauser - .pause_until(|| boolean.load(Ordering::Relaxed).then_some(())) - .unwrap(); - - thread.join(); - } -}