diff --git a/kernel/src/net/socket/vsock/stream/socket.rs b/kernel/src/net/socket/vsock/stream/socket.rs index 549098ba..29a828d1 100644 --- a/kernel/src/net/socket/vsock/stream/socket.rs +++ b/kernel/src/net/socket/vsock/stream/socket.rs @@ -14,7 +14,7 @@ use crate::{ MessageHeader, SendRecvFlags, SockShutdownCmd, Socket, SocketAddr, }, prelude::*, - process::signal::{PollHandle, Pollable}, + process::signal::{PollHandle, Pollable, Poller}, util::{MultiRead, MultiWrite}, }; @@ -231,9 +231,9 @@ impl Socket for VsockStreamSocket { vsockspace.request(&connecting.info()).unwrap(); // wait for response from driver // TODO: Add timeout - let mut poller = PollHandle::new(); + let mut poller = Poller::new(); if !connecting - .poll(IoEvents::IN, Some(&mut poller)) + .poll(IoEvents::IN, Some(poller.as_handle_mut())) .contains(IoEvents::IN) { if let Err(e) = poller.wait(None) { diff --git a/kernel/src/process/signal/mod.rs b/kernel/src/process/signal/mod.rs index 903d4bb2..8099a46c 100644 --- a/kernel/src/process/signal/mod.rs +++ b/kernel/src/process/signal/mod.rs @@ -20,7 +20,7 @@ use c_types::{siginfo_t, ucontext_t}; pub use events::{SigEvents, SigEventsFilter}; use ostd::{cpu::UserContext, user::UserContextApi}; pub use pause::{with_signal_blocked, Pause}; -pub use poll::{PollHandle, Pollable, Pollee}; +pub use poll::{PollHandle, Pollable, Pollee, Poller}; use sig_action::{SigAction, SigActionFlags, SigDefaultAction}; use sig_mask::SigMask; use sig_num::SigNum; diff --git a/kernel/src/process/signal/poll.rs b/kernel/src/process/signal/poll.rs index 02a04f83..923d51b8 100644 --- a/kernel/src/process/signal/poll.rs +++ b/kernel/src/process/signal/poll.rs @@ -59,7 +59,7 @@ impl Pollee { fn register_poller(&self, poller: &mut PollHandle, mask: IoEvents) { self.inner .subject - .register_observer(poller.observer(), mask); + .register_observer(poller.observer.clone(), mask); poller.pollees.push(Arc::downgrade(&self.inner)); } @@ -126,75 +126,139 @@ impl Pollee { } } -/// A poller gets notified when its associated pollees have interesting events. +/// An opaque handle that can be used as an argument of the [`Pollee::poll`] method. +/// +/// This type can represent an entity of [`PollAdaptor`] or [`Poller`], which is done via the +/// [`PollAdaptor::as_handle_mut`] and [`Poller::as_handle_mut`] methods. +/// +/// When this handle is dropped or reset (via [`PollHandle::reset`]), the entity will no longer be +/// notified of the events from the pollee. pub struct PollHandle { - // Use event counter to wait or wake up a poller - event_counter: Arc, - // All pollees that are interesting to this poller + // The event observer. + observer: Weak>, + // The associated pollees. pollees: Vec>, - // A waiter used to pause the current thread. - waiter: Waiter, -} - -impl Default for PollHandle { - fn default() -> Self { - Self::new() - } } impl PollHandle { - /// Constructs a new `PollHandle`. - pub fn new() -> Self { - let (waiter, waker) = Waiter::new_pair(); + /// Constructs a new handle with the observer. + /// + /// Note: It is a *logic error* to construct the multiple handles with the same observer (where + /// "same" means [`Weak::ptr_eq`]). If possible, consider using [`PollAdaptor::with_observer`] + /// instead. + pub fn new(observer: Weak>) -> Self { Self { - event_counter: Arc::new(EventCounter::new(waker)), + observer, pollees: Vec::new(), + } + } + + /// Resets the handle. + /// + /// The observer will be unregistered and will no longer receive events. + pub fn reset(&mut self) { + let observer = &self.observer; + + self.pollees + .iter() + .filter_map(Weak::upgrade) + .for_each(|pollee| { + pollee.subject.unregister_observer(observer); + }); + } +} + +impl Drop for PollHandle { + fn drop(&mut self) { + self.reset(); + } +} + +/// An adaptor to make an [`Observer`] usable for [`Pollee::poll`]. +/// +/// Normally, [`Pollee::poll`] accepts a [`Poller`] which is used to wait for events. By using this +/// adaptor, it is possible to use any [`Observer`] with [`Pollee::poll`]. The observer will be +/// notified whenever there are new events. +pub struct PollAdaptor { + // The event observer. + observer: Arc, + // The inner with observer type erased. + inner: PollHandle, +} + +impl + 'static> PollAdaptor { + /// Constructs a new adaptor with the specified observer. + pub fn with_observer(observer: O) -> Self { + let observer = Arc::new(observer); + let inner = PollHandle::new(Arc::downgrade(&observer) as _); + + Self { observer, inner } + } +} + +impl PollAdaptor { + /// Gets a reference to the observer. + pub fn observer(&self) -> &Arc { + &self.observer + } + + /// Returns a mutable reference of [`PollHandle`]. + pub fn as_handle_mut(&mut self) -> &mut PollHandle { + &mut self.inner + } +} + +/// A poller that can be used to wait for some events. +pub struct Poller { + poller: PollAdaptor, + waiter: Waiter, +} + +impl Poller { + /// Constructs a new poller to wait for interesting events. + pub fn new() -> Self { + let (waiter, event_counter) = EventCounter::new_pair(); + + Self { + poller: PollAdaptor::with_observer(event_counter), waiter, } } + /// Returns a mutable reference of [`PollHandle`]. + pub fn as_handle_mut(&mut self) -> &mut PollHandle { + self.poller.as_handle_mut() + } + /// Waits until some interesting events happen since the last wait or until the timeout /// expires. /// /// The waiting process can be interrupted by a signal. pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { - self.event_counter.read(&self.waiter, timeout)?; + self.poller.observer().read(&self.waiter, timeout)?; Ok(()) } - - fn observer(&self) -> Weak> { - Arc::downgrade(&self.event_counter) as _ - } } -impl Drop for PollHandle { - fn drop(&mut self) { - let observer = self.observer(); - - self.pollees - .iter() - .filter_map(Weak::upgrade) - .for_each(|pollee| { - pollee.subject.unregister_observer(&observer); - }); - } -} - -/// A counter for wait and wakeup. struct EventCounter { counter: AtomicUsize, waker: Arc, } impl EventCounter { - pub fn new(waker: Arc) -> Self { - Self { - counter: AtomicUsize::new(0), - waker, - } + fn new_pair() -> (Waiter, Self) { + let (waiter, waker) = Waiter::new_pair(); + + ( + waiter, + Self { + counter: AtomicUsize::new(0), + waker, + }, + ) } - pub fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result { + fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result { let cond = || { let val = self.counter.swap(0, Ordering::Relaxed); if val > 0 { @@ -207,7 +271,7 @@ impl EventCounter { waiter.pause_until_or_timeout(cond, timeout) } - pub fn write(&self) { + fn write(&self) { self.counter.fetch_add(1, Ordering::Relaxed); self.waker.wake_up(); } @@ -270,8 +334,8 @@ pub trait Pollable { } // Wait until the event happens. - let mut poller = PollHandle::new(); - if self.poll(mask, Some(&mut poller)).is_empty() { + let mut poller = Poller::new(); + if self.poll(mask, Some(poller.as_handle_mut())).is_empty() { poller.wait(timeout)?; } diff --git a/kernel/src/syscall/poll.rs b/kernel/src/syscall/poll.rs index 3936fa50..df62a234 100644 --- a/kernel/src/syscall/poll.rs +++ b/kernel/src/syscall/poll.rs @@ -7,7 +7,7 @@ use crate::{ events::IoEvents, fs::{file_handle::FileLike, file_table::FileDesc}, prelude::*, - process::signal::PollHandle, + process::signal::Poller, }; pub fn sys_poll(fds: Vaddr, nfds: u64, timeout: i32, ctx: &Context) -> Result { @@ -126,20 +126,20 @@ fn hold_files(poll_fds: &[PollFd], ctx: &Context) -> (FileResult, Vec>]) -> PollerResult { - let mut poller = PollHandle::new(); + let mut poller = Poller::new(); for (i, (poll_fd, file)) in poll_fds.iter().zip(files.iter()).enumerate() { let Some(file) = file else { continue; }; - let events = file.poll(poll_fd.events(), Some(&mut poller)); + let events = file.poll(poll_fd.events(), Some(poller.as_handle_mut())); if events.is_empty() { continue; }