diff --git a/kernel/src/device/pty/pty.rs b/kernel/src/device/pty/pty.rs index 1ddbc83c4..2b310c176 100644 --- a/kernel/src/device/pty/pty.rs +++ b/kernel/src/device/pty/pty.rs @@ -138,7 +138,7 @@ impl FileIo for PtyMaster { } // TODO: deal with nonblocking and timeout - self.wait_events(IoEvents::IN, || self.try_read(writer)) + self.wait_events(IoEvents::IN, None, || self.try_read(writer)) } fn write(&self, reader: &mut VmReader) -> Result { diff --git a/kernel/src/device/tty/line_discipline.rs b/kernel/src/device/tty/line_discipline.rs index 8a7956975..d51d85afd 100644 --- a/kernel/src/device/tty/line_discipline.rs +++ b/kernel/src/device/tty/line_discipline.rs @@ -235,7 +235,7 @@ impl LineDiscipline { } pub fn read(&self, buf: &mut [u8]) -> Result { - self.wait_events(IoEvents::IN, || self.try_read(buf)) + self.wait_events(IoEvents::IN, None, || self.try_read(buf)) } /// Reads all bytes buffered to `dst`. diff --git a/kernel/src/fs/epoll/epoll_file.rs b/kernel/src/fs/epoll/epoll_file.rs index f391973c1..f95d9f12d 100644 --- a/kernel/src/fs/epoll/epoll_file.rs +++ b/kernel/src/fs/epoll/epoll_file.rs @@ -194,34 +194,21 @@ impl EpollFile { /// expires or a signal arrives. pub fn wait(&self, max_events: usize, timeout: Option<&Duration>) -> Result> { let mut ep_events = Vec::new(); - let mut poller = None; - loop { - // Try to pop some ready entries + + self.wait_events(IoEvents::IN, timeout, || { self.pop_multi_ready(max_events, &mut ep_events); - if !ep_events.is_empty() { - return Ok(ep_events); + + if ep_events.is_empty() { + return Err(Error::with_message( + Errno::EAGAIN, + "there are no available events", + )); } - // Return immediately if specifying a timeout of zero - if timeout.is_some() && timeout.as_ref().unwrap().is_zero() { - return Ok(ep_events); - } + Ok(()) + })?; - // If no ready entries for now, wait for them - if poller.is_none() { - poller = Some(Poller::new()); - let events = self.pollee.poll(IoEvents::IN, poller.as_mut()); - if !events.is_empty() { - continue; - } - } - - if let Some(timeout) = timeout { - poller.as_ref().unwrap().wait_timeout(timeout)?; - } else { - poller.as_ref().unwrap().wait()?; - } - } + Ok(ep_events) } fn push_ready(&self, entry: Arc) { diff --git a/kernel/src/fs/pipe.rs b/kernel/src/fs/pipe.rs index 0a82bd952..782479268 100644 --- a/kernel/src/fs/pipe.rs +++ b/kernel/src/fs/pipe.rs @@ -63,7 +63,7 @@ impl FileLike for PipeReader { let read_len = if self.status_flags().contains(StatusFlags::O_NONBLOCK) { self.consumer.try_read(writer)? } else { - self.wait_events(IoEvents::IN, || self.consumer.try_read(writer))? + self.wait_events(IoEvents::IN, None, || self.consumer.try_read(writer))? }; Ok(read_len) } @@ -148,7 +148,7 @@ impl FileLike for PipeWriter { if self.status_flags().contains(StatusFlags::O_NONBLOCK) { self.producer.try_write(reader) } else { - self.wait_events(IoEvents::OUT, || self.producer.try_write(reader)) + self.wait_events(IoEvents::OUT, None, || self.producer.try_write(reader)) } } diff --git a/kernel/src/net/socket/ip/datagram/mod.rs b/kernel/src/net/socket/ip/datagram/mod.rs index 913db8dc6..f4c385d1e 100644 --- a/kernel/src/net/socket/ip/datagram/mod.rs +++ b/kernel/src/net/socket/ip/datagram/mod.rs @@ -175,7 +175,7 @@ impl DatagramSocket { if self.is_nonblocking() { self.try_recv(writer, flags) } else { - self.wait_events(IoEvents::IN, || self.try_recv(writer, flags)) + self.wait_events(IoEvents::IN, None, || self.try_recv(writer, flags)) } } diff --git a/kernel/src/net/socket/ip/stream/mod.rs b/kernel/src/net/socket/ip/stream/mod.rs index 81257fa28..149f72175 100644 --- a/kernel/src/net/socket/ip/stream/mod.rs +++ b/kernel/src/net/socket/ip/stream/mod.rs @@ -318,7 +318,7 @@ impl StreamSocket { if self.is_nonblocking() { self.try_recv(writer, flags) } else { - self.wait_events(IoEvents::IN, || self.try_recv(writer, flags)) + self.wait_events(IoEvents::IN, None, || self.try_recv(writer, flags)) } } @@ -350,7 +350,7 @@ impl StreamSocket { if self.is_nonblocking() { self.try_send(reader, flags) } else { - self.wait_events(IoEvents::OUT, || self.try_send(reader, flags)) + self.wait_events(IoEvents::OUT, None, || self.try_send(reader, flags)) } } @@ -479,7 +479,7 @@ impl Socket for StreamSocket { return result; } - self.wait_events(IoEvents::OUT, || self.check_connect()) + self.wait_events(IoEvents::OUT, None, || self.check_connect()) } fn listen(&self, backlog: usize) -> Result<()> { @@ -518,7 +518,7 @@ impl Socket for StreamSocket { if self.is_nonblocking() { self.try_accept() } else { - self.wait_events(IoEvents::IN, || self.try_accept()) + self.wait_events(IoEvents::IN, None, || self.try_accept()) } } diff --git a/kernel/src/net/socket/unix/stream/socket.rs b/kernel/src/net/socket/unix/stream/socket.rs index 39c88ada8..d94bd1e29 100644 --- a/kernel/src/net/socket/unix/stream/socket.rs +++ b/kernel/src/net/socket/unix/stream/socket.rs @@ -69,7 +69,7 @@ impl UnixStreamSocket { if self.is_nonblocking() { self.try_send(reader, flags) } else { - self.wait_events(IoEvents::OUT, || self.try_send(reader, flags)) + self.wait_events(IoEvents::OUT, None, || self.try_send(reader, flags)) } } @@ -86,7 +86,7 @@ impl UnixStreamSocket { if self.is_nonblocking() { self.try_recv(writer, flags) } else { - self.wait_events(IoEvents::IN, || self.try_recv(writer, flags)) + self.wait_events(IoEvents::IN, None, || self.try_recv(writer, flags)) } } @@ -296,7 +296,7 @@ impl Socket for UnixStreamSocket { if self.is_nonblocking() { self.try_accept() } else { - self.wait_events(IoEvents::IN, || self.try_accept()) + self.wait_events(IoEvents::IN, None, || self.try_accept()) } } diff --git a/kernel/src/net/socket/vsock/stream/socket.rs b/kernel/src/net/socket/vsock/stream/socket.rs index bee778382..dc2ff7e43 100644 --- a/kernel/src/net/socket/vsock/stream/socket.rs +++ b/kernel/src/net/socket/vsock/stream/socket.rs @@ -125,7 +125,7 @@ impl VsockStreamSocket { if self.is_nonblocking() { self.try_recv(writer, flags) } else { - self.wait_events(IoEvents::IN, || self.try_recv(writer, flags)) + self.wait_events(IoEvents::IN, None, || self.try_recv(writer, flags)) } } } @@ -236,7 +236,7 @@ impl Socket for VsockStreamSocket { .poll(IoEvents::IN, Some(&mut poller)) .contains(IoEvents::IN) { - if let Err(e) = poller.wait() { + if let Err(e) = poller.wait(None) { vsockspace .remove_connecting_socket(&connecting.local_addr()) .unwrap(); @@ -285,7 +285,7 @@ impl Socket for VsockStreamSocket { if self.is_nonblocking() { self.try_accept() } else { - self.wait_events(IoEvents::IN, || self.try_accept()) + self.wait_events(IoEvents::IN, None, || self.try_accept()) } } diff --git a/kernel/src/process/signal/poll.rs b/kernel/src/process/signal/poll.rs index dc2957dea..a7cc8a1b7 100644 --- a/kernel/src/process/signal/poll.rs +++ b/kernel/src/process/signal/poll.rs @@ -158,17 +158,12 @@ impl Poller { } } - /// Wait until there are any interesting events happen since last `wait`. The `wait` - /// can be interrupted by signal. - pub fn wait(&self) -> Result<()> { - self.event_counter.read(&self.waiter, None)?; - Ok(()) - } - - /// Wait until there are any interesting events happen since last `wait` or a given timeout - /// is expired. This method can be interrupted by signal. - pub fn wait_timeout(&self, timeout: &Duration) -> Result<()> { - self.event_counter.read(&self.waiter, Some(timeout))?; + /// Waits until some interesting events happen since the last wait or until the timeout + /// expires. + /// + /// The waiting process can be interrupted by a signal. + pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { + self.event_counter.read(&self.waiter, timeout)?; Ok(()) } @@ -256,29 +251,52 @@ pub trait Pollable { /// will return whatever the call to `cond()` returns. Otherwise, the method will wait for some /// interesting events specified in `mask` to happen and try again. /// + /// This method will fail with `ETIME` if the timeout is specified and the event does not occur + /// before the timeout expires. + /// /// The user must ensure that a call to `cond()` does not fail with `EAGAIN` when the /// interesting events occur. However, it is allowed to have spurious `EAGAIN` failures due to /// race conditions where the events are consumed by another thread. - fn wait_events(&self, mask: IoEvents, mut cond: F) -> Result + fn wait_events( + &self, + mask: IoEvents, + timeout: Option<&Duration>, + mut cond: F, + ) -> Result where Self: Sized, F: FnMut() -> Result, { + // Fast path: Return immediately if the operation gives a result. + match cond() { + Err(err) if err.error() == Errno::EAGAIN => (), + result => return result, + } + + // Fast path: Return immediately if the timeout is zero. + if timeout.is_some_and(|duration| duration.is_zero()) { + return_errno_with_message!(Errno::ETIME, "the timeout expired"); + } + + // Wait until the event happens. let mut poller = Poller::new(); + if self.poll(mask, Some(&mut poller)).is_empty() { + poller.wait(timeout)?; + } loop { + // Try again after the event happens. match cond() { Err(err) if err.error() == Errno::EAGAIN => (), result => return result, }; - let events = self.poll(mask, Some(&mut poller)); - if !events.is_empty() { - continue; + // Wait until the next event happens. + // + // FIXME: We need to update `timeout` since we have waited for some time. + if self.poll(mask, Some(&mut poller)).is_empty() { + poller.wait(timeout)?; } - - // TODO: Support timeout - poller.wait()?; } } } diff --git a/kernel/src/syscall/eventfd.rs b/kernel/src/syscall/eventfd.rs index ad3d9ff70..f3693496a 100644 --- a/kernel/src/syscall/eventfd.rs +++ b/kernel/src/syscall/eventfd.rs @@ -190,7 +190,7 @@ impl FileLike for EventFile { if self.is_nonblocking() { self.try_read(writer)?; } else { - self.wait_events(IoEvents::IN, || self.try_read(writer))?; + self.wait_events(IoEvents::IN, None, || self.try_read(writer))?; } Ok(read_len) diff --git a/kernel/src/syscall/poll.rs b/kernel/src/syscall/poll.rs index 8dbe2f637..a6adbeafc 100644 --- a/kernel/src/syscall/poll.rs +++ b/kernel/src/syscall/poll.rs @@ -41,7 +41,7 @@ pub fn sys_poll(fds: Vaddr, nfds: u64, timeout: i32, ctx: &Context) -> Result Result, ctx: &Context) -> Result { +pub fn do_poll(poll_fds: &[PollFd], timeout: Option<&Duration>, ctx: &Context) -> Result { let (result, files) = hold_files(poll_fds, ctx); match result { FileResult::AllValid => (), @@ -74,24 +74,22 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option, ctx: &Context) -> }; loop { - if let Some(timeout) = timeout.as_ref() { - match poller.wait_timeout(timeout) { - Ok(_) => {} - Err(e) if e.error() == Errno::ETIME => { - // The return value is zero if the timeout expires - // before any file descriptors became ready - return Ok(0); - } - Err(e) => return Err(e), - }; - } else { - poller.wait()?; - } + match poller.wait(timeout) { + Ok(_) => {} + Err(e) if e.error() == Errno::ETIME => { + // The return value is zero if the timeout expires + // before any file descriptors became ready + return Ok(0); + } + Err(e) => return Err(e), + }; let num_events = count_all_events(poll_fds, &files); if num_events > 0 { return Ok(num_events); } + + // FIXME: We need to update `timeout` since we have waited for some time. } } diff --git a/kernel/src/syscall/select.rs b/kernel/src/syscall/select.rs index 3ad3e1574..52be2b86d 100644 --- a/kernel/src/syscall/select.rs +++ b/kernel/src/syscall/select.rs @@ -72,7 +72,7 @@ pub fn do_sys_select( readfds.as_mut(), writefds.as_mut(), exceptfds.as_mut(), - timeout, + timeout.as_ref(), ctx, )?; @@ -100,7 +100,7 @@ fn do_select( mut readfds: Option<&mut FdSet>, mut writefds: Option<&mut FdSet>, mut exceptfds: Option<&mut FdSet>, - timeout: Option, + timeout: Option<&Duration>, ctx: &Context, ) -> Result { // Convert the FdSet to an array of PollFd