Support wait_interruptible for Poller

This commit is contained in:
Jianfeng Jiang
2023-09-26 17:45:53 +08:00
committed by Tate, Hongliang Tian
parent d2aa06cbe2
commit 50761a5cc5
12 changed files with 56 additions and 22 deletions

View File

@ -106,7 +106,7 @@ impl FileLike for PtyMaster {
if events.is_empty() {
drop(input);
// FIXME: deal with pty read timeout
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
continue;
}

View File

@ -253,7 +253,7 @@ impl LineDiscipline {
let revents = self.pollee.poll(IoEvents::IN, need_poller);
if revents.is_empty() {
// FIXME: deal with ldisc read timeout
poller.as_ref().unwrap().wait(None)?;
poller.as_ref().unwrap().wait_interruptible(None)?;
}
}
}

View File

@ -191,7 +191,7 @@ impl EpollFile {
}
}
poller.as_ref().unwrap().wait(timeout)?;
poller.as_ref().unwrap().wait_interruptible(timeout)?;
}
}

View File

@ -153,7 +153,7 @@ impl<T: Copy> Producer<T> {
let events = self.poll(mask, Some(&poller));
if events.is_empty() {
// FIXME: should channel deal with timeout?
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
}
}
@ -242,7 +242,7 @@ impl<T: Copy> Consumer<T> {
let events = self.poll(mask, Some(&poller));
if events.is_empty() {
// FIXME: should channel have timeout?
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
}
}

View File

@ -1,10 +1,11 @@
use super::IoEvents;
use crate::events::{Observer, Subject};
use crate::prelude::*;
use crate::process::signal::sig_mask::SigMask;
use crate::process::signal::SigQueueObserver;
use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use core::time::Duration;
use jinux_frame::sync::WaitQueue;
use keyable_arc::KeyableWeak;
/// A pollee maintains a set of active events, which can be polled with
@ -157,9 +158,15 @@ impl Poller {
}
}
/// Wait until there are any interesting events happen since last `wait`.
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.event_counter.read(timeout)?;
/// Wait until there are any interesting events happen since last `wait`. The `wait`
/// can be interrupted by signal.
pub fn wait_interruptible(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.event_counter.read_interruptible(timeout)?;
Ok(())
}
pub fn wait_uninterruptible(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.event_counter.read_uninterruptible(timeout)?;
Ok(())
}
@ -193,19 +200,25 @@ impl Drop for Poller {
/// A counter for wait and wakeup.
struct EventCounter {
counter: AtomicUsize,
wait_queue: WaitQueue,
observer: Arc<SigQueueObserver>,
}
impl EventCounter {
pub fn new() -> Self {
let observer = {
// FIXME: choose the suitable mask
let mask = SigMask::new_full();
SigQueueObserver::new(mask)
};
Self {
counter: AtomicUsize::new(0),
wait_queue: WaitQueue::new(),
observer,
}
}
pub fn read(&self, timeout: Option<&Duration>) -> Result<usize> {
let val = self.wait_queue.wait_until(
pub fn read_interruptible(&self, timeout: Option<&Duration>) -> Result<usize> {
self.observer.wait_until_interruptible(
|| {
let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 {
@ -215,12 +228,25 @@ impl EventCounter {
}
},
timeout,
)?;
Ok(val)
)
}
pub fn read_uninterruptible(&self, timeout: Option<&Duration>) -> Result<usize> {
self.observer.wait_until_uninterruptible(
|| {
let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 {
Some(val)
} else {
None
}
},
timeout,
)
}
pub fn write(&self) {
self.counter.fetch_add(1, Ordering::Relaxed);
self.wait_queue.wake_one();
self.observer.wake_one();
}
}

View File

@ -259,7 +259,7 @@ impl Socket for DatagramSocket {
return_errno_with_message!(Errno::EAGAIN, "try to receive again");
}
// FIXME: deal with recvfrom timeout
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
}
}

View File

@ -58,7 +58,7 @@ impl ConnectedStream {
return_errno_with_message!(Errno::EAGAIN, "try to recv again");
}
// FIXME: deal with receive timeout
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
}
}

View File

@ -152,7 +152,7 @@ impl InitStream {
return_errno_with_message!(Errno::EAGAIN, "try connect again");
} else {
// FIXME: deal with connecting timeout
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
}
}

View File

@ -46,7 +46,7 @@ impl ListenStream {
return_errno_with_message!(Errno::EAGAIN, "try accept again");
}
// FIXME: deal with accept timeout
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
continue;
};

View File

@ -133,7 +133,7 @@ impl BacklogTable {
// FIXME: deal with accept timeout
if events.is_empty() {
poller.wait(None)?;
poller.wait_interruptible(None)?;
}
}
}

View File

@ -117,6 +117,14 @@ impl SigQueueObserver {
{
Ok(self.wait_queue.wait_until(cond, timeout)?)
}
pub fn wake_all(&self) {
self.wait_queue.wake_all();
}
pub fn wake_one(&self) {
self.wait_queue.wake_one();
}
}
impl Observer<SigEvents> for SigQueueObserver {

View File

@ -91,7 +91,7 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option<Duration>) -> Result<usize>
return Ok(0);
}
poller.wait(timeout.as_ref())?;
poller.wait_interruptible(timeout.as_ref())?;
}
}