Introduce PollAdaptor and Poller

This commit is contained in:
Ruihan Li
2024-08-31 23:52:52 +08:00
committed by Tate, Hongliang Tian
parent f12e502ab0
commit e32fb2f91b
4 changed files with 117 additions and 53 deletions

View File

@ -14,7 +14,7 @@ use crate::{
MessageHeader, SendRecvFlags, SockShutdownCmd, Socket, SocketAddr, MessageHeader, SendRecvFlags, SockShutdownCmd, Socket, SocketAddr,
}, },
prelude::*, prelude::*,
process::signal::{PollHandle, Pollable}, process::signal::{PollHandle, Pollable, Poller},
util::{MultiRead, MultiWrite}, util::{MultiRead, MultiWrite},
}; };
@ -231,9 +231,9 @@ impl Socket for VsockStreamSocket {
vsockspace.request(&connecting.info()).unwrap(); vsockspace.request(&connecting.info()).unwrap();
// wait for response from driver // wait for response from driver
// TODO: Add timeout // TODO: Add timeout
let mut poller = PollHandle::new(); let mut poller = Poller::new();
if !connecting if !connecting
.poll(IoEvents::IN, Some(&mut poller)) .poll(IoEvents::IN, Some(poller.as_handle_mut()))
.contains(IoEvents::IN) .contains(IoEvents::IN)
{ {
if let Err(e) = poller.wait(None) { if let Err(e) = poller.wait(None) {

View File

@ -20,7 +20,7 @@ use c_types::{siginfo_t, ucontext_t};
pub use events::{SigEvents, SigEventsFilter}; pub use events::{SigEvents, SigEventsFilter};
use ostd::{cpu::UserContext, user::UserContextApi}; use ostd::{cpu::UserContext, user::UserContextApi};
pub use pause::{with_signal_blocked, Pause}; pub use pause::{with_signal_blocked, Pause};
pub use poll::{PollHandle, Pollable, Pollee}; pub use poll::{PollHandle, Pollable, Pollee, Poller};
use sig_action::{SigAction, SigActionFlags, SigDefaultAction}; use sig_action::{SigAction, SigActionFlags, SigDefaultAction};
use sig_mask::SigMask; use sig_mask::SigMask;
use sig_num::SigNum; use sig_num::SigNum;

View File

@ -59,7 +59,7 @@ impl Pollee {
fn register_poller(&self, poller: &mut PollHandle, mask: IoEvents) { fn register_poller(&self, poller: &mut PollHandle, mask: IoEvents) {
self.inner self.inner
.subject .subject
.register_observer(poller.observer(), mask); .register_observer(poller.observer.clone(), mask);
poller.pollees.push(Arc::downgrade(&self.inner)); poller.pollees.push(Arc::downgrade(&self.inner));
} }
@ -126,75 +126,139 @@ impl Pollee {
} }
} }
/// A poller gets notified when its associated pollees have interesting events. /// An opaque handle that can be used as an argument of the [`Pollee::poll`] method.
///
/// This type can represent an entity of [`PollAdaptor`] or [`Poller`], which is done via the
/// [`PollAdaptor::as_handle_mut`] and [`Poller::as_handle_mut`] methods.
///
/// When this handle is dropped or reset (via [`PollHandle::reset`]), the entity will no longer be
/// notified of the events from the pollee.
pub struct PollHandle { pub struct PollHandle {
// Use event counter to wait or wake up a poller // The event observer.
event_counter: Arc<EventCounter>, observer: Weak<dyn Observer<IoEvents>>,
// All pollees that are interesting to this poller // The associated pollees.
pollees: Vec<Weak<PolleeInner>>, pollees: Vec<Weak<PolleeInner>>,
// A waiter used to pause the current thread.
waiter: Waiter,
}
impl Default for PollHandle {
fn default() -> Self {
Self::new()
}
} }
impl PollHandle { impl PollHandle {
/// Constructs a new `PollHandle`. /// Constructs a new handle with the observer.
pub fn new() -> Self { ///
let (waiter, waker) = Waiter::new_pair(); /// Note: It is a *logic error* to construct the multiple handles with the same observer (where
/// "same" means [`Weak::ptr_eq`]). If possible, consider using [`PollAdaptor::with_observer`]
/// instead.
pub fn new(observer: Weak<dyn Observer<IoEvents>>) -> Self {
Self { Self {
event_counter: Arc::new(EventCounter::new(waker)), observer,
pollees: Vec::new(), pollees: Vec::new(),
}
}
/// Resets the handle.
///
/// The observer will be unregistered and will no longer receive events.
pub fn reset(&mut self) {
let observer = &self.observer;
self.pollees
.iter()
.filter_map(Weak::upgrade)
.for_each(|pollee| {
pollee.subject.unregister_observer(observer);
});
}
}
impl Drop for PollHandle {
fn drop(&mut self) {
self.reset();
}
}
/// An adaptor to make an [`Observer`] usable for [`Pollee::poll`].
///
/// Normally, [`Pollee::poll`] accepts a [`Poller`] which is used to wait for events. By using this
/// adaptor, it is possible to use any [`Observer`] with [`Pollee::poll`]. The observer will be
/// notified whenever there are new events.
pub struct PollAdaptor<O> {
// The event observer.
observer: Arc<O>,
// The inner with observer type erased.
inner: PollHandle,
}
impl<O: Observer<IoEvents> + 'static> PollAdaptor<O> {
/// Constructs a new adaptor with the specified observer.
pub fn with_observer(observer: O) -> Self {
let observer = Arc::new(observer);
let inner = PollHandle::new(Arc::downgrade(&observer) as _);
Self { observer, inner }
}
}
impl<O> PollAdaptor<O> {
/// Gets a reference to the observer.
pub fn observer(&self) -> &Arc<O> {
&self.observer
}
/// Returns a mutable reference of [`PollHandle`].
pub fn as_handle_mut(&mut self) -> &mut PollHandle {
&mut self.inner
}
}
/// A poller that can be used to wait for some events.
pub struct Poller {
poller: PollAdaptor<EventCounter>,
waiter: Waiter,
}
impl Poller {
/// Constructs a new poller to wait for interesting events.
pub fn new() -> Self {
let (waiter, event_counter) = EventCounter::new_pair();
Self {
poller: PollAdaptor::with_observer(event_counter),
waiter, waiter,
} }
} }
/// Returns a mutable reference of [`PollHandle`].
pub fn as_handle_mut(&mut self) -> &mut PollHandle {
self.poller.as_handle_mut()
}
/// Waits until some interesting events happen since the last wait or until the timeout /// Waits until some interesting events happen since the last wait or until the timeout
/// expires. /// expires.
/// ///
/// The waiting process can be interrupted by a signal. /// The waiting process can be interrupted by a signal.
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
self.event_counter.read(&self.waiter, timeout)?; self.poller.observer().read(&self.waiter, timeout)?;
Ok(()) Ok(())
} }
fn observer(&self) -> Weak<dyn Observer<IoEvents>> {
Arc::downgrade(&self.event_counter) as _
}
} }
impl Drop for PollHandle {
fn drop(&mut self) {
let observer = self.observer();
self.pollees
.iter()
.filter_map(Weak::upgrade)
.for_each(|pollee| {
pollee.subject.unregister_observer(&observer);
});
}
}
/// A counter for wait and wakeup.
struct EventCounter { struct EventCounter {
counter: AtomicUsize, counter: AtomicUsize,
waker: Arc<Waker>, waker: Arc<Waker>,
} }
impl EventCounter { impl EventCounter {
pub fn new(waker: Arc<Waker>) -> Self { fn new_pair() -> (Waiter, Self) {
Self { let (waiter, waker) = Waiter::new_pair();
counter: AtomicUsize::new(0),
waker, (
} waiter,
Self {
counter: AtomicUsize::new(0),
waker,
},
)
} }
pub fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result<usize> { fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result<usize> {
let cond = || { let cond = || {
let val = self.counter.swap(0, Ordering::Relaxed); let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 { if val > 0 {
@ -207,7 +271,7 @@ impl EventCounter {
waiter.pause_until_or_timeout(cond, timeout) waiter.pause_until_or_timeout(cond, timeout)
} }
pub fn write(&self) { fn write(&self) {
self.counter.fetch_add(1, Ordering::Relaxed); self.counter.fetch_add(1, Ordering::Relaxed);
self.waker.wake_up(); self.waker.wake_up();
} }
@ -270,8 +334,8 @@ pub trait Pollable {
} }
// Wait until the event happens. // Wait until the event happens.
let mut poller = PollHandle::new(); let mut poller = Poller::new();
if self.poll(mask, Some(&mut poller)).is_empty() { if self.poll(mask, Some(poller.as_handle_mut())).is_empty() {
poller.wait(timeout)?; poller.wait(timeout)?;
} }

View File

@ -7,7 +7,7 @@ use crate::{
events::IoEvents, events::IoEvents,
fs::{file_handle::FileLike, file_table::FileDesc}, fs::{file_handle::FileLike, file_table::FileDesc},
prelude::*, prelude::*,
process::signal::PollHandle, process::signal::Poller,
}; };
pub fn sys_poll(fds: Vaddr, nfds: u64, timeout: i32, ctx: &Context) -> Result<SyscallReturn> { pub fn sys_poll(fds: Vaddr, nfds: u64, timeout: i32, ctx: &Context) -> Result<SyscallReturn> {
@ -126,20 +126,20 @@ fn hold_files(poll_fds: &[PollFd], ctx: &Context) -> (FileResult, Vec<Option<Arc
} }
enum PollerResult { enum PollerResult {
AllRegistered(PollHandle), AllRegistered(Poller),
EventFoundAt(usize), EventFoundAt(usize),
} }
/// Registers the files with a poller, or exits early if some events are detected. /// Registers the files with a poller, or exits early if some events are detected.
fn register_poller(poll_fds: &[PollFd], files: &[Option<Arc<dyn FileLike>>]) -> PollerResult { fn register_poller(poll_fds: &[PollFd], files: &[Option<Arc<dyn FileLike>>]) -> PollerResult {
let mut poller = PollHandle::new(); let mut poller = Poller::new();
for (i, (poll_fd, file)) in poll_fds.iter().zip(files.iter()).enumerate() { for (i, (poll_fd, file)) in poll_fds.iter().zip(files.iter()).enumerate() {
let Some(file) = file else { let Some(file) = file else {
continue; continue;
}; };
let events = file.poll(poll_fd.events(), Some(&mut poller)); let events = file.poll(poll_fd.events(), Some(poller.as_handle_mut()));
if events.is_empty() { if events.is_empty() {
continue; continue;
} }