Remove the nonsense mutex in Poller

This commit is contained in:
Ruihan Li 2024-07-14 17:47:11 +08:00 committed by Tate, Hongliang Tian
parent 94eba6d85e
commit 9f125cd671
34 changed files with 83 additions and 95 deletions

View File

@ -27,7 +27,7 @@ impl FileIo for Null {
Ok(buf.len())
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -68,12 +68,12 @@ impl PtyMaster {
self.update_state(&input);
}
pub(super) fn slave_poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub(super) fn slave_poll(&self, mask: IoEvents, mut poller: Option<&mut Poller>) -> IoEvents {
let mut poll_status = IoEvents::empty();
let poll_in_mask = mask & IoEvents::IN;
if !poll_in_mask.is_empty() {
let poll_in_status = self.output.poll(poll_in_mask, poller);
let poll_in_status = self.output.poll(poll_in_mask, poller.as_deref_mut());
poll_status |= poll_in_status;
}
@ -106,12 +106,12 @@ impl FileIo for PtyMaster {
return Ok(0);
}
let poller = Poller::new();
let mut poller = Poller::new();
loop {
let mut input = self.input.lock_irq_disabled();
if input.is_empty() {
let events = self.pollee.poll(IoEvents::IN, Some(&poller));
let events = self.pollee.poll(IoEvents::IN, Some(&mut poller));
if events.contains(IoEvents::ERR) {
return_errno_with_message!(Errno::EACCES, "unexpected err");
@ -245,12 +245,12 @@ impl FileIo for PtyMaster {
}
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, mut poller: Option<&mut Poller>) -> IoEvents {
let mut poll_status = IoEvents::empty();
let poll_in_mask = mask & IoEvents::IN;
if !poll_in_mask.is_empty() {
let poll_in_status = self.pollee.poll(poll_in_mask, poller);
let poll_in_status = self.pollee.poll(poll_in_mask, poller.as_deref_mut());
poll_status |= poll_in_status;
}
@ -348,7 +348,7 @@ impl FileIo for PtySlave {
Ok(buf.len())
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.master().slave_poll(mask, poller)
}

View File

@ -42,7 +42,7 @@ impl FileIo for Random {
Ok(buf.len())
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -74,7 +74,7 @@ impl FileIo for TdxGuest {
}
}
fn poll(&self, mask: IoEvents, _poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, _poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -49,7 +49,7 @@ impl FileIo for TtyDevice {
return_errno_with_message!(Errno::EINVAL, "cannot write tty device");
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
IoEvents::empty()
}
}

View File

@ -250,9 +250,9 @@ impl LineDiscipline {
Ok(len) => return Ok(len),
Err(e) if e.error() != Errno::EAGAIN => return Err(e),
Err(_) => {
let poller = Some(Poller::new());
if self.poll(IoEvents::IN, poller.as_ref()).is_empty() {
poller.as_ref().unwrap().wait()?
let mut poller = Poller::new();
if self.poll(IoEvents::IN, Some(&mut poller)).is_empty() {
poller.wait()?
}
}
}
@ -288,7 +288,7 @@ impl LineDiscipline {
Ok(read_len)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}

View File

@ -87,7 +87,7 @@ impl FileIo for Tty {
Ok(buf.len())
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.ldisc.poll(mask, poller)
}

View File

@ -42,7 +42,7 @@ impl FileIo for Urandom {
Ok(buf.len())
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -30,7 +30,7 @@ impl FileIo for Zero {
Ok(buf.len())
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -186,7 +186,7 @@ impl FileIo for Inner {
return_errno_with_message!(Errno::EINVAL, "cannot write ptmx");
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
IoEvents::empty()
}
}

View File

@ -133,7 +133,7 @@ impl Inode for PtySlaveInode {
self.device.ioctl(cmd, arg)
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.device.poll(mask, poller)
}

View File

@ -187,7 +187,7 @@ impl EpollFile {
// If no ready entries for now, wait for them
if poller.is_none() {
poller = Some(Poller::new());
let events = self.pollee.poll(IoEvents::IN, poller.as_ref());
let events = self.pollee.poll(IoEvents::IN, poller.as_mut());
if !events.is_empty() {
continue;
}
@ -322,7 +322,7 @@ impl Drop for EpollFile {
}
impl Pollable for EpollFile {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}

View File

@ -1698,7 +1698,7 @@ impl Inode for ExfatInode {
Ok(())
}
fn poll(&self, mask: IoEvents, _poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, _poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -81,7 +81,7 @@ impl Clone for InodeHandle<Rights> {
#[inherit_methods(from = "self.0")]
impl Pollable for InodeHandle<Rights> {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents;
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents;
}
#[inherit_methods(from = "self.0")]

View File

@ -176,7 +176,7 @@ impl InodeHandle_ {
Ok(read_cnt)
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
if let Some(ref file_io) = self.file_io {
return file_io.poll(mask, poller);
}
@ -228,7 +228,7 @@ pub trait FileIo: Send + Sync + 'static {
fn write(&self, buf: &[u8]) -> Result<usize>;
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents;
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents;
fn ioctl(&self, cmd: IoctlCmd, arg: usize) -> Result<i32> {
return_errno_with_message!(Errno::EINVAL, "ioctl is not supported");

View File

@ -27,7 +27,7 @@ impl PipeReader {
}
impl Pollable for PipeReader {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.consumer.poll(mask, poller)
}
}
@ -96,7 +96,7 @@ impl PipeWriter {
}
impl Pollable for PipeWriter {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.producer.poll(mask, poller)
}
}

View File

@ -1092,7 +1092,7 @@ impl Inode for RamInode {
}
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
if let Some(device) = self.node.read().inner.as_device() {
device.poll(mask, poller)
} else {

View File

@ -82,7 +82,7 @@ macro_rules! impl_common_methods_for_channel {
.contains(StatusFlags::O_NONBLOCK)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.this_end().pollee.poll(mask, poller)
}
@ -140,7 +140,7 @@ impl<T> Producer<T> {
}
impl<T> Pollable for Producer<T> {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.poll(mask, poller)
}
}
@ -266,7 +266,7 @@ impl<T> Consumer<T> {
}
impl<T> Pollable for Consumer<T> {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.poll(mask, poller)
}
}

View File

@ -348,7 +348,7 @@ pub trait Inode: Any + Sync + Send {
Ok(())
}
fn poll(&self, mask: IoEvents, _poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, _poller: Option<&mut Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}

View File

@ -180,7 +180,7 @@ impl DatagramSocket {
}
impl Pollable for DatagramSocket {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}

View File

@ -338,7 +338,7 @@ impl StreamSocket {
}
impl Pollable for StreamSocket {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}

View File

@ -45,7 +45,7 @@ impl Connected {
self.local_endpoint.set_nonblocking(is_nonblocking).unwrap();
}
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.local_endpoint.poll(mask, poller)
}
}

View File

@ -106,10 +106,10 @@ impl Endpoint {
self.0.peer.upgrade().is_some()
}
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub(super) fn poll(&self, mask: IoEvents, mut poller: Option<&mut Poller>) -> IoEvents {
let mut events = IoEvents::empty();
// FIXME: should reader and writer use the same mask?
let reader_events = self.0.reader.poll(mask, poller);
let reader_events = self.0.reader.poll(mask, poller.as_deref_mut());
let writer_events = self.0.writer.poll(mask, poller);
if reader_events.contains(IoEvents::HUP) || self.0.reader.is_shutdown() {

View File

@ -79,7 +79,7 @@ impl Init {
self.is_nonblocking.store(is_nonblocking, Ordering::Release);
}
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}

View File

@ -65,7 +65,7 @@ impl Listener {
Ok((socket, peer_addr))
}
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let addr = self.addr();
let backlog = BACKLOG_TABLE.get_backlog(addr).unwrap();
backlog.poll(mask, poller)
@ -119,7 +119,7 @@ impl BacklogTable {
}
fn pop_incoming(&self, nonblocking: bool, addr: &UnixSocketAddrBound) -> Result<Arc<Endpoint>> {
let poller = Poller::new();
let mut poller = Poller::new();
loop {
let backlog = self.get_backlog(addr)?;
@ -133,7 +133,7 @@ impl BacklogTable {
let events = {
let mask = IoEvents::IN;
backlog.poll(mask, Some(&poller))
backlog.poll(mask, Some(&mut poller))
};
if events.contains(IoEvents::ERR) | events.contains(IoEvents::HUP) {
@ -202,7 +202,7 @@ impl Backlog {
endpoint
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
// Lock to avoid any events may change pollee state when we poll
let _lock = self.incoming_endpoints.lock();
self.pollee.poll(mask, poller)

View File

@ -104,7 +104,7 @@ impl UnixStreamSocket {
}
impl Pollable for UnixStreamSocket {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let inner = self.0.read();
match &*inner {
State::Init(init) => init.poll(mask, poller),

View File

@ -126,7 +126,7 @@ impl Connected {
.set_peer_requested_shutdown()
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}

View File

@ -45,7 +45,7 @@ impl Connecting {
self.info.lock_irq_disabled().update_for_event(event)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}

View File

@ -61,7 +61,7 @@ impl Init {
*self.bound_addr.lock()
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}

View File

@ -50,7 +50,7 @@ impl Listen {
Ok(connection)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}

View File

@ -123,7 +123,7 @@ impl VsockStreamSocket {
}
impl Pollable for VsockStreamSocket {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
match &*self.status.read() {
Status::Init(init) => init.poll(mask, poller),
Status::Listen(listen) => listen.poll(mask, poller),
@ -210,9 +210,9 @@ impl Socket for VsockStreamSocket {
vsockspace.request(&connecting.info()).unwrap();
// wait for response from driver
// TODO: Add timeout
let poller = Poller::new();
let mut poller = Poller::new();
if !connecting
.poll(IoEvents::IN, Some(&poller))
.poll(IoEvents::IN, Some(&mut poller))
.contains(IoEvents::IN)
{
if let Err(e) = poller.wait() {

View File

@ -5,8 +5,6 @@ use core::{
time::Duration,
};
use keyable_arc::KeyableWeak;
use crate::{
events::{IoEvents, Observer, Subject},
prelude::*,
@ -47,7 +45,7 @@ impl Pollee {
///
/// This operation is _atomic_ in the sense that either some interesting
/// events are returned or the poller is registered (if a poller is provided).
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let mask = mask | IoEvents::ALWAYS_POLL;
// Fast path: return events immediately
@ -63,12 +61,12 @@ impl Pollee {
self.events() & mask
}
fn register_poller(&self, poller: &Poller, mask: IoEvents) {
fn register_poller(&self, poller: &mut Poller, mask: IoEvents) {
self.inner
.subject
.register_observer(poller.observer(), mask);
let mut pollees = poller.inner.pollees.lock();
pollees.insert(Arc::downgrade(&self.inner).into(), ());
poller.pollees.push(Arc::downgrade(&self.inner));
}
/// Register an IoEvents observer.
@ -135,14 +133,10 @@ impl Pollee {
/// A poller gets notified when its associated pollees have interesting events.
pub struct Poller {
inner: Arc<PollerInner>,
}
struct PollerInner {
// Use event counter to wait or wake up a poller
event_counter: EventCounter,
event_counter: Arc<EventCounter>,
// All pollees that are interesting to this poller
pollees: Mutex<BTreeMap<KeyableWeak<PolleeInner>, ()>>,
pollees: Vec<Weak<PolleeInner>>,
}
impl Default for Poller {
@ -154,53 +148,41 @@ impl Default for Poller {
impl Poller {
/// Constructs a new `Poller`.
pub fn new() -> Self {
let inner = PollerInner {
event_counter: EventCounter::new(),
pollees: Mutex::new(BTreeMap::new()),
};
Self {
inner: Arc::new(inner),
event_counter: Arc::new(EventCounter::new()),
pollees: Vec::new(),
}
}
/// Wait until there are any interesting events happen since last `wait`. The `wait`
/// can be interrupted by signal.
pub fn wait(&self) -> Result<()> {
self.inner.event_counter.read(None)?;
self.event_counter.read(None)?;
Ok(())
}
/// Wait until there are any interesting events happen since last `wait` or a given timeout
/// is expired. This method can be interrupted by signal.
pub fn wait_timeout(&self, timeout: &Duration) -> Result<()> {
self.inner.event_counter.read(Some(timeout))?;
self.event_counter.read(Some(timeout))?;
Ok(())
}
fn observer(&self) -> Weak<dyn Observer<IoEvents>> {
Arc::downgrade(&self.inner) as _
}
}
impl Observer<IoEvents> for PollerInner {
fn on_events(&self, _events: &IoEvents) {
self.event_counter.write();
Arc::downgrade(&self.event_counter) as _
}
}
impl Drop for Poller {
fn drop(&mut self) {
let mut pollees = self.inner.pollees.lock();
if pollees.len() == 0 {
return;
}
let observer = self.observer();
let self_observer = self.observer();
for (weak_pollee, _) in pollees.extract_if(|_, _| true) {
if let Some(pollee) = weak_pollee.upgrade() {
pollee.subject.unregister_observer(&self_observer);
}
}
self.pollees
.iter()
.filter_map(Weak::upgrade)
.for_each(|pollee| {
pollee.subject.unregister_observer(&observer);
});
}
}
@ -243,6 +225,12 @@ impl EventCounter {
}
}
impl Observer<IoEvents> for EventCounter {
fn on_events(&self, _events: &IoEvents) {
self.write();
}
}
/// The `Pollable` trait allows for waiting for events and performing event-based operations.
///
/// Implementors are required to provide a method, [`Pollable::poll`], which is usually implemented
@ -258,7 +246,7 @@ pub trait Pollable {
/// none.
///
/// This method has the same semantics as [`Pollee::poll`].
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents;
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents;
/// Waits for events and performs event-based operations.
///
@ -274,7 +262,7 @@ pub trait Pollable {
Self: Sized,
F: FnMut() -> Result<R>,
{
let poller = Poller::new();
let mut poller = Poller::new();
loop {
match cond() {
@ -282,7 +270,7 @@ pub trait Pollable {
result => return result,
};
let events = self.poll(mask, Some(&poller));
let events = self.poll(mask, Some(&mut poller));
if !events.is_empty() {
continue;
}

View File

@ -151,7 +151,7 @@ impl EventFile {
}
impl Pollable for EventFile {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}
@ -175,8 +175,8 @@ impl FileLike for EventFile {
self.update_io_state(&counter);
drop(counter);
let poller = Poller::new();
if self.pollee.poll(IoEvents::IN, Some(&poller)).is_empty() {
let mut poller = Poller::new();
if self.pollee.poll(IoEvents::IN, Some(&mut poller)).is_empty() {
poller.wait()?;
}
continue;

View File

@ -61,7 +61,7 @@ pub fn sys_poll(fds: Vaddr, nfds: u64, timeout: i32) -> Result<SyscallReturn> {
pub fn do_poll(poll_fds: &[PollFd], timeout: Option<Duration>) -> Result<usize> {
// The main loop of polling
let poller = Poller::new();
let mut poller = Poller::new();
loop {
let mut num_revents = 0;
@ -79,7 +79,7 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option<Duration>) -> Result<usize>
file_table.get_file(fd)?.clone()
};
let need_poller = if num_revents == 0 {
Some(&poller)
Some(&mut poller)
} else {
None
};