Use SpinLock on Subject

This commit is contained in:
Shaowei Song
2024-09-13 03:35:50 +00:00
committed by Tate, Hongliang Tian
parent 8b1c69987f
commit 4a43e317b2

View File

@ -10,7 +10,7 @@ use crate::prelude::*;
/// A Subject notifies interesting events to registered observers. /// A Subject notifies interesting events to registered observers.
pub struct Subject<E: Events, F: EventsFilter<E> = ()> { pub struct Subject<E: Events, F: EventsFilter<E> = ()> {
// A table that maintains all interesting observers. // A table that maintains all interesting observers.
observers: Mutex<BTreeMap<KeyableWeak<dyn Observer<E>>, F>>, observers: SpinLock<BTreeMap<KeyableWeak<dyn Observer<E>>, F>>,
// To reduce lock contentions, we maintain a counter for the size of the table // To reduce lock contentions, we maintain a counter for the size of the table
num_observers: AtomicUsize, num_observers: AtomicUsize,
} }
@ -18,7 +18,7 @@ pub struct Subject<E: Events, F: EventsFilter<E> = ()> {
impl<E: Events, F: EventsFilter<E>> Subject<E, F> { impl<E: Events, F: EventsFilter<E>> Subject<E, F> {
pub const fn new() -> Self { pub const fn new() -> Self {
Self { Self {
observers: Mutex::new(BTreeMap::new()), observers: SpinLock::new(BTreeMap::new()),
num_observers: AtomicUsize::new(0), num_observers: AtomicUsize::new(0),
} }
} }
@ -70,19 +70,29 @@ impl<E: Events, F: EventsFilter<E>> Subject<E, F> {
} }
// Slow path: broadcast the new events to all observers. // Slow path: broadcast the new events to all observers.
let mut active_observers = Vec::new();
let mut num_freed = 0;
let mut observers = self.observers.lock(); let mut observers = self.observers.lock();
observers.retain(|observer, filter| { observers.retain(|observer, filter| {
if let Some(observer) = observer.upgrade() { if let Some(observer) = observer.upgrade() {
if !filter.filter(events) { if filter.filter(events) {
return true; // XXX: Mind the performance impact when there comes many active observers
active_observers.push(observer.clone());
} }
observer.on_events(events);
true true
} else { } else {
self.num_observers.fetch_sub(1, Ordering::Relaxed); num_freed += 1;
false false
} }
}); });
if num_freed > 0 {
self.num_observers.fetch_sub(num_freed, Ordering::Relaxed);
}
drop(observers);
for observer in active_observers {
observer.on_events(events);
}
} }
} }