From 50761a5cc55655ff479d99adc4516f027deaae1d Mon Sep 17 00:00:00 2001 From: Jianfeng Jiang Date: Tue, 26 Sep 2023 17:45:53 +0800 Subject: [PATCH] Support wait_interruptible for Poller --- services/libs/jinux-std/src/device/pty/pty.rs | 2 +- .../src/device/tty/line_discipline.rs | 2 +- .../libs/jinux-std/src/fs/epoll/epoll_file.rs | 2 +- .../libs/jinux-std/src/fs/utils/channel.rs | 4 +- services/libs/jinux-std/src/fs/utils/poll.rs | 48 ++++++++++++++----- .../jinux-std/src/net/socket/ip/datagram.rs | 2 +- .../src/net/socket/ip/stream/connected.rs | 2 +- .../src/net/socket/ip/stream/init.rs | 2 +- .../src/net/socket/ip/stream/listen.rs | 2 +- .../src/net/socket/unix/stream/listener.rs | 2 +- .../jinux-std/src/process/signal/events.rs | 8 ++++ services/libs/jinux-std/src/syscall/poll.rs | 2 +- 12 files changed, 56 insertions(+), 22 deletions(-) diff --git a/services/libs/jinux-std/src/device/pty/pty.rs b/services/libs/jinux-std/src/device/pty/pty.rs index 292333b23..3a47dd7bb 100644 --- a/services/libs/jinux-std/src/device/pty/pty.rs +++ b/services/libs/jinux-std/src/device/pty/pty.rs @@ -106,7 +106,7 @@ impl FileLike for PtyMaster { if events.is_empty() { drop(input); // FIXME: deal with pty read timeout - poller.wait(None)?; + poller.wait_interruptible(None)?; } continue; } diff --git a/services/libs/jinux-std/src/device/tty/line_discipline.rs b/services/libs/jinux-std/src/device/tty/line_discipline.rs index 81d9f997a..b8b516825 100644 --- a/services/libs/jinux-std/src/device/tty/line_discipline.rs +++ b/services/libs/jinux-std/src/device/tty/line_discipline.rs @@ -253,7 +253,7 @@ impl LineDiscipline { let revents = self.pollee.poll(IoEvents::IN, need_poller); if revents.is_empty() { // FIXME: deal with ldisc read timeout - poller.as_ref().unwrap().wait(None)?; + poller.as_ref().unwrap().wait_interruptible(None)?; } } } diff --git a/services/libs/jinux-std/src/fs/epoll/epoll_file.rs b/services/libs/jinux-std/src/fs/epoll/epoll_file.rs index ca4b712fa..5b51879aa 100644 --- a/services/libs/jinux-std/src/fs/epoll/epoll_file.rs +++ b/services/libs/jinux-std/src/fs/epoll/epoll_file.rs @@ -191,7 +191,7 @@ impl EpollFile { } } - poller.as_ref().unwrap().wait(timeout)?; + poller.as_ref().unwrap().wait_interruptible(timeout)?; } } diff --git a/services/libs/jinux-std/src/fs/utils/channel.rs b/services/libs/jinux-std/src/fs/utils/channel.rs index 7d1d0e759..b482b3f2a 100644 --- a/services/libs/jinux-std/src/fs/utils/channel.rs +++ b/services/libs/jinux-std/src/fs/utils/channel.rs @@ -153,7 +153,7 @@ impl Producer { let events = self.poll(mask, Some(&poller)); if events.is_empty() { // FIXME: should channel deal with timeout? - poller.wait(None)?; + poller.wait_interruptible(None)?; } } } @@ -242,7 +242,7 @@ impl Consumer { let events = self.poll(mask, Some(&poller)); if events.is_empty() { // FIXME: should channel have timeout? - poller.wait(None)?; + poller.wait_interruptible(None)?; } } } diff --git a/services/libs/jinux-std/src/fs/utils/poll.rs b/services/libs/jinux-std/src/fs/utils/poll.rs index 639b9a763..34a8d358a 100644 --- a/services/libs/jinux-std/src/fs/utils/poll.rs +++ b/services/libs/jinux-std/src/fs/utils/poll.rs @@ -1,10 +1,11 @@ use super::IoEvents; use crate::events::{Observer, Subject}; use crate::prelude::*; +use crate::process::signal::sig_mask::SigMask; +use crate::process::signal::SigQueueObserver; use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use core::time::Duration; -use jinux_frame::sync::WaitQueue; use keyable_arc::KeyableWeak; /// A pollee maintains a set of active events, which can be polled with @@ -157,9 +158,15 @@ impl Poller { } } - /// Wait until there are any interesting events happen since last `wait`. - pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { - self.inner.event_counter.read(timeout)?; + /// Wait until there are any interesting events happen since last `wait`. The `wait` + /// can be interrupted by signal. + pub fn wait_interruptible(&self, timeout: Option<&Duration>) -> Result<()> { + self.inner.event_counter.read_interruptible(timeout)?; + Ok(()) + } + + pub fn wait_uninterruptible(&self, timeout: Option<&Duration>) -> Result<()> { + self.inner.event_counter.read_uninterruptible(timeout)?; Ok(()) } @@ -193,19 +200,25 @@ impl Drop for Poller { /// A counter for wait and wakeup. struct EventCounter { counter: AtomicUsize, - wait_queue: WaitQueue, + observer: Arc, } impl EventCounter { pub fn new() -> Self { + let observer = { + // FIXME: choose the suitable mask + let mask = SigMask::new_full(); + SigQueueObserver::new(mask) + }; + Self { counter: AtomicUsize::new(0), - wait_queue: WaitQueue::new(), + observer, } } - pub fn read(&self, timeout: Option<&Duration>) -> Result { - let val = self.wait_queue.wait_until( + pub fn read_interruptible(&self, timeout: Option<&Duration>) -> Result { + self.observer.wait_until_interruptible( || { let val = self.counter.swap(0, Ordering::Relaxed); if val > 0 { @@ -215,12 +228,25 @@ impl EventCounter { } }, timeout, - )?; - Ok(val) + ) + } + + pub fn read_uninterruptible(&self, timeout: Option<&Duration>) -> Result { + self.observer.wait_until_uninterruptible( + || { + let val = self.counter.swap(0, Ordering::Relaxed); + if val > 0 { + Some(val) + } else { + None + } + }, + timeout, + ) } pub fn write(&self) { self.counter.fetch_add(1, Ordering::Relaxed); - self.wait_queue.wake_one(); + self.observer.wake_one(); } } diff --git a/services/libs/jinux-std/src/net/socket/ip/datagram.rs b/services/libs/jinux-std/src/net/socket/ip/datagram.rs index 73f4fa11d..c0fadab75 100644 --- a/services/libs/jinux-std/src/net/socket/ip/datagram.rs +++ b/services/libs/jinux-std/src/net/socket/ip/datagram.rs @@ -259,7 +259,7 @@ impl Socket for DatagramSocket { return_errno_with_message!(Errno::EAGAIN, "try to receive again"); } // FIXME: deal with recvfrom timeout - poller.wait(None)?; + poller.wait_interruptible(None)?; } } } diff --git a/services/libs/jinux-std/src/net/socket/ip/stream/connected.rs b/services/libs/jinux-std/src/net/socket/ip/stream/connected.rs index 3a17de07a..f53f146f0 100644 --- a/services/libs/jinux-std/src/net/socket/ip/stream/connected.rs +++ b/services/libs/jinux-std/src/net/socket/ip/stream/connected.rs @@ -58,7 +58,7 @@ impl ConnectedStream { return_errno_with_message!(Errno::EAGAIN, "try to recv again"); } // FIXME: deal with receive timeout - poller.wait(None)?; + poller.wait_interruptible(None)?; } } } diff --git a/services/libs/jinux-std/src/net/socket/ip/stream/init.rs b/services/libs/jinux-std/src/net/socket/ip/stream/init.rs index 447c1e6ec..b8a2a7d73 100644 --- a/services/libs/jinux-std/src/net/socket/ip/stream/init.rs +++ b/services/libs/jinux-std/src/net/socket/ip/stream/init.rs @@ -152,7 +152,7 @@ impl InitStream { return_errno_with_message!(Errno::EAGAIN, "try connect again"); } else { // FIXME: deal with connecting timeout - poller.wait(None)?; + poller.wait_interruptible(None)?; } } } diff --git a/services/libs/jinux-std/src/net/socket/ip/stream/listen.rs b/services/libs/jinux-std/src/net/socket/ip/stream/listen.rs index e4633a9af..94c4588ec 100644 --- a/services/libs/jinux-std/src/net/socket/ip/stream/listen.rs +++ b/services/libs/jinux-std/src/net/socket/ip/stream/listen.rs @@ -46,7 +46,7 @@ impl ListenStream { return_errno_with_message!(Errno::EAGAIN, "try accept again"); } // FIXME: deal with accept timeout - poller.wait(None)?; + poller.wait_interruptible(None)?; } continue; }; diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs b/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs index 991a37efe..752f70655 100644 --- a/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs +++ b/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs @@ -133,7 +133,7 @@ impl BacklogTable { // FIXME: deal with accept timeout if events.is_empty() { - poller.wait(None)?; + poller.wait_interruptible(None)?; } } } diff --git a/services/libs/jinux-std/src/process/signal/events.rs b/services/libs/jinux-std/src/process/signal/events.rs index f9e1bedaf..c7da9352f 100644 --- a/services/libs/jinux-std/src/process/signal/events.rs +++ b/services/libs/jinux-std/src/process/signal/events.rs @@ -117,6 +117,14 @@ impl SigQueueObserver { { Ok(self.wait_queue.wait_until(cond, timeout)?) } + + pub fn wake_all(&self) { + self.wait_queue.wake_all(); + } + + pub fn wake_one(&self) { + self.wait_queue.wake_one(); + } } impl Observer for SigQueueObserver { diff --git a/services/libs/jinux-std/src/syscall/poll.rs b/services/libs/jinux-std/src/syscall/poll.rs index 5a719c1cc..42351fe0f 100644 --- a/services/libs/jinux-std/src/syscall/poll.rs +++ b/services/libs/jinux-std/src/syscall/poll.rs @@ -91,7 +91,7 @@ pub fn do_poll(poll_fds: &[PollFd], timeout: Option) -> Result return Ok(0); } - poller.wait(timeout.as_ref())?; + poller.wait_interruptible(timeout.as_ref())?; } }