diff --git a/services/libs/jinux-std/src/events/events.rs b/services/libs/jinux-std/src/events/events.rs index 002197d48..9488b0033 100644 --- a/services/libs/jinux-std/src/events/events.rs +++ b/services/libs/jinux-std/src/events/events.rs @@ -1,2 +1,34 @@ /// A trait to represent any events. pub trait Events: Copy + Clone + Send + Sync + 'static {} + +/// A trait to filter events. +/// +/// # The no-op event filter +/// +/// The unit type `()` can serve as a no-op event filter. +/// It implements `EventsFilter` for any events type `E`, +/// with a `filter` method that always returns `true`. +/// If the `F` type of `Subject` is not specified explicitly, +/// then the unit type `()` is chosen as the event filter. +/// +/// # Per-object event filter +/// +/// Any `Option` is also an event filter thanks to +/// the blanket implementations the `EventsFilter` trait. +/// By using `Option`, we can decide, on a per-observer basis, +/// if an observer needs an event filter. +pub trait EventsFilter: Send + Sync + 'static { + fn filter(&self, event: &E) -> bool; +} + +impl EventsFilter for () { + fn filter(&self, _events: &E) -> bool { + true + } +} + +impl> EventsFilter for Option { + fn filter(&self, events: &E) -> bool { + self.as_ref().map_or(true, |f| f.filter(events)) + } +} diff --git a/services/libs/jinux-std/src/events/mod.rs b/services/libs/jinux-std/src/events/mod.rs index 18e154e93..10f4421e5 100644 --- a/services/libs/jinux-std/src/events/mod.rs +++ b/services/libs/jinux-std/src/events/mod.rs @@ -2,6 +2,6 @@ mod events; mod observer; mod subject; -pub use self::events::Events; +pub use self::events::{Events, EventsFilter}; pub use self::observer::Observer; pub use self::subject::Subject; diff --git a/services/libs/jinux-std/src/events/subject.rs b/services/libs/jinux-std/src/events/subject.rs index 56ab957e7..54dfc2d22 100644 --- a/services/libs/jinux-std/src/events/subject.rs +++ b/services/libs/jinux-std/src/events/subject.rs @@ -1,43 +1,86 @@ use crate::prelude::*; -use super::{Events, Observer}; +use core::sync::atomic::{AtomicUsize, Ordering}; +use keyable_arc::KeyableWeak; -/// A Subject notify interesting events to registered observers. -pub struct Subject { - observers: Mutex>>>, +use super::{Events, EventsFilter, Observer}; + +/// A Subject notifies interesting events to registered observers. +pub struct Subject = ()> { + // A table that maintains all interesting observers. + observers: Mutex>, F>>, + // To reduce lock contentions, we maintain a counter for the size of the table + num_observers: AtomicUsize, } -impl Subject { +impl> Subject { pub fn new() -> Self { Self { - observers: Mutex::new(Vec::new()), + observers: Mutex::new(BTreeMap::new()), + num_observers: AtomicUsize::new(0), } } /// Register an observer. - pub fn register_observer(&self, observer: Weak>) { + /// + /// A registered observer will get notified through its `on_events` method. + /// If events `filter` is provided, only filtered events will notify the observer. + /// + /// If the given observer has already been registered, then its registered events + /// filter will be updated. + pub fn register_observer(&self, observer: Weak>, filter: F) { let mut observers = self.observers.lock(); - observers.push(observer); + let is_new = { + let observer: KeyableWeak> = observer.into(); + observers.insert(observer, filter).is_none() + }; + if is_new { + self.num_observers.fetch_add(1, Ordering::Release); + } } /// Unregister an observer. - pub fn unregister_observer(&self, observer: &Weak>) { + /// + /// If such an observer is found, then the registered observer will be + /// removed from the subject and returned as the return value. Otherwise, + /// a `None` will be returned. + pub fn unregister_observer( + &self, + observer: &Weak>, + ) -> Option>> { + let observer: KeyableWeak> = observer.clone().into(); let mut observers = self.observers.lock(); - observers.retain(|e| !Weak::ptr_eq(&e, observer)); + let observer = observers + .remove_entry(&observer) + .map(|(observer, _)| observer.into()); + if observer.is_some() { + self.num_observers.fetch_sub(1, Ordering::Relaxed); + } + observer } /// Notify events to all registered observers. + /// /// It will remove the observers which have been freed. pub fn notify_observers(&self, events: &E) { - let mut observers = self.observers.lock(); - let mut idx = 0; - while idx < observers.len() { - if let Some(observer) = observers[idx].upgrade() { - observer.on_events(events); - idx += 1; - } else { - observers.remove(idx); - } + // Fast path. + if self.num_observers.load(Ordering::Relaxed) == 0 { + return; } + + // Slow path: broadcast the new events to all observers. + let mut observers = self.observers.lock(); + observers.retain(|observer, filter| { + if let Some(observer) = observer.upgrade() { + if !filter.filter(events) { + return true; + } + observer.on_events(events); + true + } else { + self.num_observers.fetch_sub(1, Ordering::Relaxed); + false + } + }); } } diff --git a/services/libs/jinux-std/src/fs/epoll/epoll_file.rs b/services/libs/jinux-std/src/fs/epoll/epoll_file.rs index a3d0279d8..6df757ae9 100644 --- a/services/libs/jinux-std/src/fs/epoll/epoll_file.rs +++ b/services/libs/jinux-std/src/fs/epoll/epoll_file.rs @@ -76,7 +76,7 @@ impl EpollFile { if interest.contains_key(&fd) { return_errno_with_message!(Errno::EEXIST, "the fd has been added"); } - file.register_observer(entry.clone(), IoEvents::all())?; + file.register_observer(entry.self_weak() as _, IoEvents::all())?; interest.insert(fd, entry.clone()); // Register self to the file table entry file_table_entry.register_observer(self.weak_self.clone() as _); @@ -113,7 +113,7 @@ impl EpollFile { None => return Ok(()), }; - file.unregister_observer(&(entry as _)).unwrap(); + file.unregister_observer(&(entry.self_weak() as _)).unwrap(); Ok(()) } @@ -294,7 +294,7 @@ impl Drop for EpollFile { .map(|(fd, entry)| { entry.set_deleted(); if let Some(file) = entry.file() { - let _ = file.unregister_observer(&(entry as _)); + let _ = file.unregister_observer(&(entry.self_weak() as _)); } fd }) @@ -326,7 +326,7 @@ impl FileLike for EpollFile { fn register_observer( &self, - observer: Arc>, + observer: Weak>, mask: IoEvents, ) -> Result<()> { self.pollee.register_observer(observer, mask); @@ -335,8 +335,8 @@ impl FileLike for EpollFile { fn unregister_observer( &self, - observer: &Arc>, - ) -> Result>> { + observer: &Weak>, + ) -> Result>> { self.pollee .unregister_observer(observer) .ok_or_else(|| Error::with_message(Errno::ENOENT, "observer is not registered")) @@ -398,6 +398,11 @@ impl EpollEntry { self.weak_self.upgrade().unwrap() } + /// Get an instance of `Weak` that refers to this epoll entry. + pub fn self_weak(&self) -> Weak { + self.weak_self.clone() + } + /// Get the file associated with this epoll entry. /// /// Since an epoll entry only holds a weak reference to the file, diff --git a/services/libs/jinux-std/src/fs/file_handle.rs b/services/libs/jinux-std/src/fs/file_handle.rs index 6f3709ab2..e08841e7c 100644 --- a/services/libs/jinux-std/src/fs/file_handle.rs +++ b/services/libs/jinux-std/src/fs/file_handle.rs @@ -51,7 +51,7 @@ pub trait FileLike: Send + Sync + Any { fn register_observer( &self, - observer: Arc>, + observer: Weak>, mask: IoEvents, ) -> Result<()> { return_errno_with_message!(Errno::EINVAL, "register_observer is not supported") @@ -59,8 +59,8 @@ pub trait FileLike: Send + Sync + Any { fn unregister_observer( &self, - observer: &Arc>, - ) -> Result>> { + observer: &Weak>, + ) -> Result>> { return_errno_with_message!(Errno::EINVAL, "unregister_observer is not supported") } diff --git a/services/libs/jinux-std/src/fs/file_table.rs b/services/libs/jinux-std/src/fs/file_table.rs index d52be7575..8964cfb5f 100644 --- a/services/libs/jinux-std/src/fs/file_table.rs +++ b/services/libs/jinux-std/src/fs/file_table.rs @@ -114,7 +114,7 @@ impl FileTable { } pub fn register_observer(&self, observer: Weak>) { - self.subject.register_observer(observer); + self.subject.register_observer(observer, ()); } pub fn unregister_observer(&self, observer: &Weak>) { @@ -170,7 +170,7 @@ impl FileTableEntry { } pub fn register_observer(&self, epoll: Weak>) { - self.subject.register_observer(epoll); + self.subject.register_observer(epoll, ()); } pub fn unregister_observer(&self, epoll: &Weak>) { diff --git a/services/libs/jinux-std/src/fs/pipe.rs b/services/libs/jinux-std/src/fs/pipe.rs index 4e0aa345d..feedfc8cb 100644 --- a/services/libs/jinux-std/src/fs/pipe.rs +++ b/services/libs/jinux-std/src/fs/pipe.rs @@ -45,7 +45,7 @@ impl FileLike for PipeReader { fn register_observer( &self, - observer: Arc>, + observer: Weak>, mask: IoEvents, ) -> Result<()> { self.consumer.register_observer(observer, mask) @@ -53,8 +53,8 @@ impl FileLike for PipeReader { fn unregister_observer( &self, - observer: &Arc>, - ) -> Result>> { + observer: &Weak>, + ) -> Result>> { self.consumer.unregister_observer(observer) } @@ -104,7 +104,7 @@ impl FileLike for PipeWriter { fn register_observer( &self, - observer: Arc>, + observer: Weak>, mask: IoEvents, ) -> Result<()> { self.producer.register_observer(observer, mask) @@ -112,8 +112,8 @@ impl FileLike for PipeWriter { fn unregister_observer( &self, - observer: &Arc>, - ) -> Result>> { + observer: &Weak>, + ) -> Result>> { self.producer.unregister_observer(observer) } diff --git a/services/libs/jinux-std/src/fs/utils/channel.rs b/services/libs/jinux-std/src/fs/utils/channel.rs index 02deda2b9..ed8448b0b 100644 --- a/services/libs/jinux-std/src/fs/utils/channel.rs +++ b/services/libs/jinux-std/src/fs/utils/channel.rs @@ -83,7 +83,7 @@ macro_rules! impl_common_methods_for_channel { pub fn register_observer( &self, - observer: Arc>, + observer: Weak>, mask: IoEvents, ) -> Result<()> { self.this_end().pollee.register_observer(observer, mask); @@ -92,8 +92,8 @@ macro_rules! impl_common_methods_for_channel { pub fn unregister_observer( &self, - observer: &Arc>, - ) -> Result>> { + observer: &Weak>, + ) -> Result>> { self.this_end() .pollee .unregister_observer(observer) diff --git a/services/libs/jinux-std/src/fs/utils/io_events.rs b/services/libs/jinux-std/src/fs/utils/io_events.rs index 4a3cbe41a..7d5ed3c43 100644 --- a/services/libs/jinux-std/src/fs/utils/io_events.rs +++ b/services/libs/jinux-std/src/fs/utils/io_events.rs @@ -1,4 +1,4 @@ -use crate::events::Events; +use crate::events::{Events, EventsFilter}; crate::bitflags! { pub struct IoEvents: u32 { @@ -15,3 +15,9 @@ crate::bitflags! { } impl Events for IoEvents {} + +impl EventsFilter for IoEvents { + fn filter(&self, events: &IoEvents) -> bool { + self.intersects(*events) + } +} diff --git a/services/libs/jinux-std/src/fs/utils/poll.rs b/services/libs/jinux-std/src/fs/utils/poll.rs index 905e0a3be..9a1a37f2d 100644 --- a/services/libs/jinux-std/src/fs/utils/poll.rs +++ b/services/libs/jinux-std/src/fs/utils/poll.rs @@ -1,10 +1,10 @@ use super::IoEvents; -use crate::events::Observer; +use crate::events::{Observer, Subject}; use crate::prelude::*; use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use jinux_frame::sync::WaitQueue; -use keyable_arc::KeyableArc; +use keyable_arc::KeyableWeak; /// A pollee maintains a set of active events, which can be polled with /// pollers or be monitored with observers. @@ -13,21 +13,18 @@ pub struct Pollee { } struct PolleeInner { - // A table that maintains all interesting pollers - pollers: Mutex>, IoEvents>>, - // For efficient manipulation, we use AtomicU32 instead of RwLock + // A subject which is monitored with pollers. + subject: Subject, + // For efficient manipulation, we use AtomicU32 instead of RwLock. events: AtomicU32, - // To reduce lock contentions, we maintain a counter for the size of the table - num_pollers: AtomicUsize, } impl Pollee { /// Creates a new instance of pollee. pub fn new(init_events: IoEvents) -> Self { let inner = PolleeInner { - pollers: Mutex::new(BTreeMap::new()), + subject: Subject::new(), events: AtomicU32::new(init_events.bits()), - num_pollers: AtomicUsize::new(0), }; Self { inner: Arc::new(inner), @@ -51,7 +48,7 @@ impl Pollee { return revents; } - // Slow path: register the provided poller + // Register the provided poller. self.register_poller(poller.unwrap(), mask); // It is important to check events again to handle race conditions @@ -60,17 +57,11 @@ impl Pollee { } fn register_poller(&self, poller: &Poller, mask: IoEvents) { - let mut pollers = self.inner.pollers.lock(); - let is_new = { - let observer = poller.observer(); - pollers.insert(observer, mask).is_none() - }; - if is_new { - let mut pollees = poller.inner.pollees.lock(); - pollees.push(Arc::downgrade(&self.inner)); - - self.inner.num_pollers.fetch_add(1, Ordering::Release); - } + self.inner + .subject + .register_observer(poller.observer(), mask); + let mut pollees = poller.inner.pollees.lock(); + pollees.insert(Arc::downgrade(&self.inner).into(), ()); } /// Register an IoEvents observer. @@ -84,23 +75,9 @@ impl Pollee { /// /// Note that the observer will always get notified of the events in /// `IoEvents::ALWAYS_POLL` regardless of the value of `mask`. - /// - /// # Memory leakage - /// - /// Since an `Arc` for each observer is kept internally by a pollee, - /// it is important for the user to call the `unregister_observer` method - /// when the observer is no longer interested in the pollee. Otherwise, - /// the observer will not be dropped. - pub fn register_observer(&self, observer: Arc>, mask: IoEvents) { - let mut pollers = self.inner.pollers.lock(); - let is_new = { - let observer: KeyableArc> = observer.into(); - let mask = mask | IoEvents::ALWAYS_POLL; - pollers.insert(observer, mask).is_none() - }; - if is_new { - self.inner.num_pollers.fetch_add(1, Ordering::Release); - } + pub fn register_observer(&self, observer: Weak>, mask: IoEvents) { + let mask = mask | IoEvents::ALWAYS_POLL; + self.inner.subject.register_observer(observer, mask); } /// Unregister an IoEvents observer. @@ -110,17 +87,9 @@ impl Pollee { /// a `None` will be returned. pub fn unregister_observer( &self, - observer: &Arc>, - ) -> Option>> { - let observer: KeyableArc> = observer.clone().into(); - let mut pollers = self.inner.pollers.lock(); - let observer = pollers - .remove_entry(&observer) - .map(|(observer, _)| observer.into()); - if observer.is_some() { - self.inner.num_pollers.fetch_sub(1, Ordering::Relaxed); - } - observer + observer: &Weak>, + ) -> Option>> { + self.inner.subject.unregister_observer(observer) } /// Add some events to the pollee's state. @@ -129,18 +98,7 @@ impl Pollee { /// the added events. pub fn add_events(&self, events: IoEvents) { self.inner.events.fetch_or(events.bits(), Ordering::Release); - - // Fast path - if self.inner.num_pollers.load(Ordering::Relaxed) == 0 { - return; - } - - // Slow path: broadcast the new events to all pollers - let pollers = self.inner.pollers.lock(); - pollers - .iter() - .filter(|(_, mask)| mask.intersects(events)) - .for_each(|(poller, mask)| poller.on_events(&(events & *mask))); + self.inner.subject.notify_observers(&events); } /// Remove some events from the pollee's state. @@ -170,14 +128,14 @@ impl Pollee { /// A poller gets notified when its associated pollees have interesting events. pub struct Poller { - inner: KeyableArc, + inner: Arc, } struct PollerInner { // Use event counter to wait or wake up a poller event_counter: EventCounter, // All pollees that are interesting to this poller - pollees: Mutex>>, + pollees: Mutex, ()>>, } impl Poller { @@ -185,10 +143,10 @@ impl Poller { pub fn new() -> Self { let inner = PollerInner { event_counter: EventCounter::new(), - pollees: Mutex::new(Vec::with_capacity(1)), + pollees: Mutex::new(BTreeMap::new()), }; Self { - inner: KeyableArc::new(inner), + inner: Arc::new(inner), } } @@ -197,8 +155,8 @@ impl Poller { self.inner.event_counter.read(); } - fn observer(&self) -> KeyableArc> { - self.inner.clone() as KeyableArc> + fn observer(&self) -> Weak> { + Arc::downgrade(&self.inner) as _ } } @@ -216,14 +174,9 @@ impl Drop for Poller { } let self_observer = self.observer(); - for weak_pollee in pollees.drain(..) { + for (weak_pollee, _) in pollees.drain_filter(|_, _| true) { if let Some(pollee) = weak_pollee.upgrade() { - let mut pollers = pollee.pollers.lock(); - let res = pollers.remove(&self_observer); - assert!(res.is_some()); - drop(pollers); - - pollee.num_pollers.fetch_sub(1, Ordering::Relaxed); + pollee.subject.unregister_observer(&self_observer); } } } diff --git a/services/libs/jinux-std/src/process/process_table.rs b/services/libs/jinux-std/src/process/process_table.rs index 69c4400db..50a693a8e 100644 --- a/services/libs/jinux-std/src/process/process_table.rs +++ b/services/libs/jinux-std/src/process/process_table.rs @@ -65,7 +65,7 @@ pub fn pgid_to_process_group(pgid: Pgid) -> Option> { } pub fn register_observer(observer: Weak>) { - PROCESS_TABLE_SUBJECT.register_observer(observer); + PROCESS_TABLE_SUBJECT.register_observer(observer, ()); } pub fn unregister_observer(observer: &Weak>) {