Optimize the SigQueues to return early without lock

This commit is contained in:
LI Qing
2024-05-09 16:03:06 +08:00
committed by Tate, Hongliang Tian
parent 8bcadee540
commit 0eaa6e637d
3 changed files with 82 additions and 51 deletions

View File

@ -120,7 +120,7 @@ impl PosixThreadBuilder {
credentials, credentials,
real_timer: Mutex::new(real_timer), real_timer: Mutex::new(real_timer),
sig_mask: Mutex::new(sig_mask), sig_mask: Mutex::new(sig_mask),
sig_queues: Mutex::new(sig_queues), sig_queues,
sig_context: Mutex::new(None), sig_context: Mutex::new(None),
sig_stack: Mutex::new(None), sig_stack: Mutex::new(None),
robust_list: Mutex::new(None), robust_list: Mutex::new(None),

View File

@ -59,7 +59,7 @@ pub struct PosixThread {
/// Blocked signals /// Blocked signals
sig_mask: Mutex<SigMask>, sig_mask: Mutex<SigMask>,
/// Thread-directed sigqueue /// Thread-directed sigqueue
sig_queues: Mutex<SigQueues>, sig_queues: SigQueues,
/// Signal handler ucontext address /// Signal handler ucontext address
/// FIXME: This field may be removed. For glibc applications with RESTORER flag set, the sig_context is always equals with rsp. /// FIXME: This field may be removed. For glibc applications with RESTORER flag set, the sig_context is always equals with rsp.
sig_context: Mutex<Option<Vaddr>>, sig_context: Mutex<Option<Vaddr>>,
@ -88,7 +88,7 @@ impl PosixThread {
} }
pub fn has_pending_signal(&self) -> bool { pub fn has_pending_signal(&self) -> bool {
!self.sig_queues.lock().is_empty() !self.sig_queues.is_empty()
} }
/// Returns whether the signal is blocked by the thread. /// Returns whether the signal is blocked by the thread.
@ -151,11 +151,11 @@ impl PosixThread {
} }
pub(in crate::process) fn enqueue_signal(&self, signal: Box<dyn Signal>) { pub(in crate::process) fn enqueue_signal(&self, signal: Box<dyn Signal>) {
self.sig_queues.lock().enqueue(signal); self.sig_queues.enqueue(signal);
} }
pub fn dequeue_signal(&self, mask: &SigMask) -> Option<Box<dyn Signal>> { pub fn dequeue_signal(&self, mask: &SigMask) -> Option<Box<dyn Signal>> {
self.sig_queues.lock().dequeue(mask) self.sig_queues.dequeue(mask)
} }
pub fn register_sigqueue_observer( pub fn register_sigqueue_observer(
@ -163,11 +163,11 @@ impl PosixThread {
observer: Weak<dyn Observer<SigEvents>>, observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter, filter: SigEventsFilter,
) { ) {
self.sig_queues.lock().register_observer(observer, filter); self.sig_queues.register_observer(observer, filter);
} }
pub fn unregiser_sigqueue_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) { pub fn unregiser_sigqueue_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.sig_queues.lock().unregister_observer(observer); self.sig_queues.unregister_observer(observer);
} }
pub fn sig_context(&self) -> &Mutex<Option<Vaddr>> { pub fn sig_context(&self) -> &Mutex<Option<Vaddr>> {

View File

@ -1,5 +1,7 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use core::sync::atomic::{AtomicUsize, Ordering};
use super::{ use super::{
constants::*, sig_mask::SigMask, sig_num::SigNum, signals::Signal, SigEvents, SigEventsFilter, constants::*, sig_mask::SigMask, sig_num::SigNum, signals::Signal, SigEvents, SigEventsFilter,
}; };
@ -9,31 +11,87 @@ use crate::{
}; };
pub struct SigQueues { pub struct SigQueues {
count: usize, // The number of pending signals.
std_queues: Vec<Option<Box<dyn Signal>>>, // Useful for quickly determining if any signals are pending without locking `queues`.
rt_queues: Vec<VecDeque<Box<dyn Signal>>>, count: AtomicUsize,
queues: Mutex<Queues>,
subject: Subject<SigEvents, SigEventsFilter>, subject: Subject<SigEvents, SigEventsFilter>,
} }
impl SigQueues { impl SigQueues {
pub fn new() -> Self { pub fn new() -> Self {
let count = 0; Self {
let std_queues = (0..COUNT_STD_SIGS).map(|_| None).collect(); count: AtomicUsize::new(0),
let rt_queues = (0..COUNT_RT_SIGS).map(|_| Default::default()).collect(); queues: Mutex::new(Queues::new()),
let subject = Subject::new(); subject: Subject::new(),
SigQueues {
count,
std_queues,
rt_queues,
subject,
} }
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.count == 0 self.count.load(Ordering::Relaxed) == 0
} }
pub fn enqueue(&mut self, signal: Box<dyn Signal>) { pub fn enqueue(&self, signal: Box<dyn Signal>) {
let signum = signal.num();
let mut queues = self.queues.lock();
if queues.enqueue(signal) {
self.count.fetch_add(1, Ordering::Relaxed);
// Avoid holding lock when notifying observers
drop(queues);
self.subject.notify_observers(&SigEvents::new(signum));
}
}
pub fn dequeue(&self, blocked: &SigMask) -> Option<Box<dyn Signal>> {
// Fast path for the common case of no pending signals
if self.is_empty() {
return None;
}
let mut queues = self.queues.lock();
let signal = queues.dequeue(blocked);
if signal.is_some() {
self.count.fetch_sub(1, Ordering::Relaxed);
}
signal
}
pub fn register_observer(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
) {
self.subject.register_observer(observer, filter);
}
pub fn unregister_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.subject.unregister_observer(observer);
}
}
impl Default for SigQueues {
fn default() -> Self {
Self::new()
}
}
struct Queues {
std_queues: Vec<Option<Box<dyn Signal>>>,
rt_queues: Vec<VecDeque<Box<dyn Signal>>>,
}
impl Queues {
fn new() -> Self {
let std_queues = (0..COUNT_STD_SIGS).map(|_| None).collect();
let rt_queues = (0..COUNT_RT_SIGS).map(|_| Default::default()).collect();
Self {
std_queues,
rt_queues,
}
}
fn enqueue(&mut self, signal: Box<dyn Signal>) -> bool {
let signum = signal.num(); let signum = signal.num();
if signum.is_std() { if signum.is_std() {
// Standard signals // Standard signals
@ -52,26 +110,19 @@ impl SigQueues {
let queue = self.get_std_queue_mut(signum); let queue = self.get_std_queue_mut(signum);
if queue.is_some() { if queue.is_some() {
// If there is already a signal pending, just ignore all subsequent signals // If there is already a signal pending, just ignore all subsequent signals
return; return false;
} }
*queue = Some(signal); *queue = Some(signal);
self.count += 1;
} else { } else {
// Real-time signals // Real-time signals
let queue = self.get_rt_queue_mut(signum); let queue = self.get_rt_queue_mut(signum);
queue.push_back(signal); queue.push_back(signal);
self.count += 1;
} }
self.subject.notify_observers(&SigEvents::new(signum)); true
} }
pub fn dequeue(&mut self, blocked: &SigMask) -> Option<Box<dyn Signal>> { fn dequeue(&mut self, blocked: &SigMask) -> Option<Box<dyn Signal>> {
// Fast path for the common case of no pending signals
if self.is_empty() {
return None;
}
// Deliver standard signals. // Deliver standard signals.
// //
// According to signal(7): // According to signal(7):
@ -100,7 +151,6 @@ impl SigQueues {
let queue = self.get_std_queue_mut(signum); let queue = self.get_std_queue_mut(signum);
let signal = queue.take(); let signal = queue.take();
if signal.is_some() { if signal.is_some() {
self.count -= 1;
return signal; return signal;
} }
} }
@ -122,7 +172,6 @@ impl SigQueues {
let queue = self.get_rt_queue_mut(signum); let queue = self.get_rt_queue_mut(signum);
let signal = queue.pop_front(); let signal = queue.pop_front();
if signal.is_some() { if signal.is_some() {
self.count -= 1;
return signal; return signal;
} }
} }
@ -142,22 +191,4 @@ impl SigQueues {
let idx = (signum.as_u8() - MIN_RT_SIG_NUM) as usize; let idx = (signum.as_u8() - MIN_RT_SIG_NUM) as usize;
&mut self.rt_queues[idx] &mut self.rt_queues[idx]
} }
pub fn register_observer(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
) {
self.subject.register_observer(observer, filter);
}
pub fn unregister_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.subject.unregister_observer(observer);
}
}
impl Default for SigQueues {
fn default() -> Self {
Self::new()
}
} }