Make each pausing thread has a seperate is_interruped

This commit is contained in:
Jianfeng Jiang 2024-05-14 03:20:10 +00:00 committed by Tate, Hongliang Tian
parent 0dd90cad3c
commit c7cda1df28

View File

@ -16,19 +16,21 @@ use crate::{
/// A `Pauser` allows pausing the execution of the current thread until certain conditions are reached.
///
/// Behind the scene, `Pauser` is implemented with `Waiter` and `WaiterQueue`.
/// But unlike its `Waiter` relatives, `Pauser` is aware of POSIX signals:
/// Behind the scene, `Pauser` is implemented with [`Waiter`] and [`WaitQueue`].
/// But unlike its [`Waiter`] relatives, `Pauser` is aware of POSIX signals:
/// if a thread paused by a `Pauser` receives a signal, then the thread will resume its execution.
///
/// Another key difference is that `Pauser` combines the two roles of `Waiter` and `WaiterQueue`
/// Another key difference is that `Pauser` combines the two roles of [`Waiter`] and [`WaitQueue`]
/// into one. Both putting the current thread to sleep and waking it up can be done through the
/// same `Pauser` object, using its `pause`- and `resume`-family methods.
///
/// [`Waiter`]: aster_frame::sync::Waiter
///
/// # Example
///
/// Here is how the current thread can be put to sleep with a `Pauser`.
///
/// ```rust
/// ```no_run
/// let pauser = Pauser::new(SigMask::new_full());
/// // Pause the execution of the current thread until a user-given condition is met
/// // or the current thread is interrupted by a signal.
@ -54,36 +56,40 @@ use crate::{
/// Then, this second thread can resume the execution of the first thread
/// even when `cond()` does not return `true`.
///
/// ```
/// ```no_run
/// pauser.resume_all();
/// ```
pub struct Pauser {
wait_queue: WaitQueue,
sig_mask: SigMask,
is_interrupted: AtomicBool,
}
impl Pauser {
/// Create a new `Pauser`. The `Pauser` can be interrupted by all signals except that
/// are blocked by current thread.
/// Creates a new `Pauser`.
///
/// The `Pauser` can be interrupted by all signals
/// except that are blocked by current thread.
pub fn new() -> Arc<Self> {
Self::new_with_mask(SigMask::new_empty())
}
/// Create a new `Pauser`, the `Pauser` will ignore signals that are in `sig_mask` and
/// blocked by current thread.
/// Creates a new `Pauser` with specified `sig_mask`.
///
/// The `Pauser` will ignore signals that are in `sig_mask`
/// or blocked by current thread.
pub fn new_with_mask(sig_mask: SigMask) -> Arc<Self> {
let wait_queue = WaitQueue::new();
Arc::new(Self {
wait_queue,
sig_mask,
is_interrupted: AtomicBool::new(false),
})
}
/// Pause the execution of current thread until the `cond` is met ( i.e., `cond()`
/// Pauses the execution of current thread until the `cond` is met ( i.e., `cond()`
/// returns `Some(_)` ), or some signal is received by current thread or process.
///
/// # Errors
///
/// If some signal is received before `cond` is met, this method will returns `Err(EINTR)`.
pub fn pause_until<F, R>(self: &Arc<Self>, cond: F) -> Result<R>
where
@ -92,12 +98,16 @@ impl Pauser {
self.do_pause(cond, None)
}
/// Pause the execution of current thread until the `cond` is met ( i.e., `cond()` returns
/// Pauses the execution of current thread until the `cond` is met ( i.e., `cond()` returns
/// `Some(_)` ), or some signal is received by current thread or process, or the given
/// `timeout` is expired.
///
/// # Errors
///
/// If `timeout` is expired before the `cond` is met or some signal is received,
/// it will returns `Err(ETIME)`.
/// it will returns [`ETIME`].
///
/// [`ETIME`]: crate::error::Errno::ETIME
pub fn pause_until_or_timeout<F, R>(self: &Arc<Self>, cond: F, timeout: &Duration) -> Result<R>
where
F: FnMut() -> Option<R>,
@ -109,8 +119,6 @@ impl Pauser {
where
F: FnMut() -> Option<R>,
{
self.is_interrupted.store(false, Ordering::Release);
let current_thread = current_thread!();
let posix_thread = current_thread.as_posix_thread().unwrap();
@ -128,12 +136,13 @@ impl Pauser {
};
// Register observer on sigqueue
let observer = Arc::downgrade(self) as Weak<dyn Observer<SigEvents>>;
posix_thread.register_sigqueue_observer(observer.clone(), filter);
let observer = SigQueueObserver::new(self.clone());
let weak_observer = Arc::downgrade(&observer) as Weak<dyn Observer<SigEvents>>;
posix_thread.register_sigqueue_observer(weak_observer.clone(), filter);
// Some signal may come before we register observer, so we do another check here.
if posix_thread.has_pending_signals() {
self.is_interrupted.store(true, Ordering::Release);
if posix_thread.has_pending() {
observer.set_interrupted();
}
enum Res<R> {
@ -141,16 +150,19 @@ impl Pauser {
Interrupted,
}
let cond = || {
if let Some(res) = cond() {
return Some(Res::Ok(res));
}
let cond = {
let cloned_observer = observer.clone();
move || {
if let Some(res) = cond() {
return Some(Res::Ok(res));
}
if self.is_interrupted.load(Ordering::Acquire) {
return Some(Res::Interrupted);
}
if cloned_observer.is_interrupted() {
return Some(Res::Interrupted);
}
None
None
}
};
let res = if let Some(timeout) = timeout {
@ -161,7 +173,8 @@ impl Pauser {
Ok(self.wait_queue.wait_until(cond))
};
posix_thread.unregiser_sigqueue_observer(&observer);
// Restore the state
posix_thread.unregiser_sigqueue_observer(&weak_observer);
posix_thread.sig_mask().lock().set(old_mask.as_u64());
match res? {
@ -170,20 +183,42 @@ impl Pauser {
}
}
/// Resume all paused threads on this pauser.
/// Resumes all paused threads on this pauser.
pub fn resume_all(&self) {
self.wait_queue.wake_all();
}
/// Resume one paused thread on this pauser.
/// Resumes one paused thread on this pauser.
pub fn resume_one(&self) {
self.wait_queue.wake_one();
}
}
impl Observer<SigEvents> for Pauser {
fn on_events(&self, events: &SigEvents) {
struct SigQueueObserver {
is_interrupted: AtomicBool,
pauser: Arc<Pauser>,
}
impl SigQueueObserver {
fn new(pauser: Arc<Pauser>) -> Arc<Self> {
Arc::new(Self {
is_interrupted: AtomicBool::new(false),
pauser,
})
}
fn is_interrupted(&self) -> bool {
self.is_interrupted.load(Ordering::Acquire)
}
fn set_interrupted(&self) {
self.is_interrupted.store(true, Ordering::Release);
self.wait_queue.wake_all();
}
}
impl Observer<SigEvents> for SigQueueObserver {
fn on_events(&self, _: &SigEvents) {
self.set_interrupted();
self.pauser.wait_queue.wake_all();
}
}