Add timeout parameter for poller.wait

This commit is contained in:
Jianfeng Jiang
2023-09-21 10:36:49 +08:00
committed by Tate, Hongliang Tian
parent bd6a4d34ff
commit ec857e5205
11 changed files with 37 additions and 29 deletions

View File

@ -105,7 +105,8 @@ impl FileLike for PtyMaster {
if events.is_empty() { if events.is_empty() {
drop(input); drop(input);
poller.wait(); // FIXME: deal with pty read timeout
poller.wait(None)?;
} }
continue; continue;
} }

View File

@ -252,7 +252,8 @@ impl LineDiscipline {
}; };
let revents = self.pollee.poll(IoEvents::IN, need_poller); let revents = self.pollee.poll(IoEvents::IN, need_poller);
if revents.is_empty() { if revents.is_empty() {
poller.as_ref().unwrap().wait(); // FIXME: deal with ldisc read timeout
poller.as_ref().unwrap().wait(None)?;
} }
} }
} }

View File

@ -191,8 +191,7 @@ impl EpollFile {
} }
} }
// FIXME: respect timeout parameter poller.as_ref().unwrap().wait(timeout)?;
poller.as_ref().unwrap().wait();
} }
} }

View File

@ -152,7 +152,8 @@ impl<T: Copy> Producer<T> {
} }
let events = self.poll(mask, Some(&poller)); let events = self.poll(mask, Some(&poller));
if events.is_empty() { if events.is_empty() {
poller.wait(); // FIXME: should channel deal with timeout?
poller.wait(None)?;
} }
} }
} }
@ -240,7 +241,8 @@ impl<T: Copy> Consumer<T> {
} }
let events = self.poll(mask, Some(&poller)); let events = self.poll(mask, Some(&poller));
if events.is_empty() { if events.is_empty() {
poller.wait(); // FIXME: should channel have timeout?
poller.wait(None)?;
} }
} }
} }

View File

@ -3,6 +3,7 @@ use crate::events::{Observer, Subject};
use crate::prelude::*; use crate::prelude::*;
use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use core::time::Duration;
use jinux_frame::sync::WaitQueue; use jinux_frame::sync::WaitQueue;
use keyable_arc::KeyableWeak; use keyable_arc::KeyableWeak;
@ -157,8 +158,9 @@ impl Poller {
} }
/// Wait until there are any interesting events happen since last `wait`. /// Wait until there are any interesting events happen since last `wait`.
pub fn wait(&self) { pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.event_counter.read(); self.inner.event_counter.read(timeout)?;
Ok(())
} }
fn observer(&self) -> Weak<dyn Observer<IoEvents>> { fn observer(&self) -> Weak<dyn Observer<IoEvents>> {
@ -202,20 +204,19 @@ impl EventCounter {
} }
} }
pub fn read(&self) -> usize { pub fn read(&self, timeout: Option<&Duration>) -> Result<usize> {
self.wait_queue let val = self.wait_queue.wait_until(
.wait_until( || {
|| { let val = self.counter.swap(0, Ordering::Relaxed);
let val = self.counter.swap(0, Ordering::Relaxed); if val > 0 {
if val > 0 { Some(val)
Some(val) } else {
} else { None
None }
} },
}, timeout,
None, )?;
) Ok(val)
.unwrap()
} }
pub fn write(&self) { pub fn write(&self) {

View File

@ -258,7 +258,8 @@ impl Socket for DatagramSocket {
if self.nonblocking() { if self.nonblocking() {
return_errno_with_message!(Errno::EAGAIN, "try to receive again"); return_errno_with_message!(Errno::EAGAIN, "try to receive again");
} }
poller.wait(); // FIXME: deal with recvfrom timeout
poller.wait(None)?;
} }
} }
} }

View File

@ -57,7 +57,8 @@ impl ConnectedStream {
if self.is_nonblocking() { if self.is_nonblocking() {
return_errno_with_message!(Errno::EAGAIN, "try to recv again"); return_errno_with_message!(Errno::EAGAIN, "try to recv again");
} }
poller.wait(); // FIXME: deal with receive timeout
poller.wait(None)?;
} }
} }
} }

View File

@ -151,7 +151,8 @@ impl InitStream {
} else if self.is_nonblocking() { } else if self.is_nonblocking() {
return_errno_with_message!(Errno::EAGAIN, "try connect again"); return_errno_with_message!(Errno::EAGAIN, "try connect again");
} else { } else {
poller.wait(); // FIXME: deal with connecting timeout
poller.wait(None)?;
} }
} }
} }

View File

@ -45,7 +45,8 @@ impl ListenStream {
if self.is_nonblocking() { if self.is_nonblocking() {
return_errno_with_message!(Errno::EAGAIN, "try accept again"); return_errno_with_message!(Errno::EAGAIN, "try accept again");
} }
poller.wait(); // FIXME: deal with accept timeout
poller.wait(None)?;
} }
continue; continue;
}; };

View File

@ -131,8 +131,9 @@ impl BacklogTable {
return_errno_with_message!(Errno::ECONNABORTED, "connection is aborted"); return_errno_with_message!(Errno::ECONNABORTED, "connection is aborted");
} }
// FIXME: deal with accept timeout
if events.is_empty() { if events.is_empty() {
poller.wait(); poller.wait(None)?;
} }
} }
} }

View File

@ -91,8 +91,7 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option<Duration>) -> Result<usize>
return Ok(0); return Ok(0);
} }
// FIXME: respect timeout parameter poller.wait(timeout.as_ref())?;
poller.wait();
} }
} }