diff --git a/kernel/src/net/socket/unix/stream/listener.rs b/kernel/src/net/socket/unix/stream/listener.rs index c7f296fec..9c5682e0c 100644 --- a/kernel/src/net/socket/unix/stream/listener.rs +++ b/kernel/src/net/socket/unix/stream/listener.rs @@ -2,6 +2,8 @@ use core::sync::atomic::{AtomicUsize, Ordering}; +use ostd::sync::WaitQueue; + use super::{ connected::{combine_io_events, Connected}, init::Init, @@ -15,7 +17,7 @@ use crate::{ SockShutdownCmd, SocketAddr, }, prelude::*, - process::signal::{Pauser, Pollee, Poller}, + process::signal::{Pollee, Poller}, }; pub(super) struct Listener { @@ -159,7 +161,7 @@ pub(super) struct Backlog { pollee: Pollee, backlog: AtomicUsize, incoming_conns: Mutex>>, - pauser: Arc, + wait_queue: WaitQueue, } impl Backlog { @@ -175,7 +177,7 @@ impl Backlog { pollee, backlog: AtomicUsize::new(backlog), incoming_conns: Mutex::new(incoming_sockets), - pauser: Pauser::new(), + wait_queue: WaitQueue::new(), } } @@ -198,7 +200,7 @@ impl Backlog { drop(locked_incoming_conns); if conn.is_some() { - self.pauser.resume_one(); + self.wait_queue.wake_one(); } conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available")) @@ -208,7 +210,7 @@ impl Backlog { let old_backlog = self.backlog.swap(backlog, Ordering::Relaxed); if old_backlog < backlog { - self.pauser.resume_all(); + self.wait_queue.wake_all(); } } @@ -221,7 +223,7 @@ impl Backlog { drop(incoming_conns); - self.pauser.resume_all(); + self.wait_queue.wake_all(); } fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents { @@ -284,7 +286,7 @@ impl Backlog { where F: FnMut() -> Result<()>, { - self.pauser.pause_until(|| match cond() { + self.wait_queue.pause_until(|| match cond() { Err(err) if err.error() == Errno::EAGAIN => None, result => Some(result), })? diff --git a/kernel/src/process/exit.rs b/kernel/src/process/exit.rs index ed716b720..8580e49fc 100644 --- a/kernel/src/process/exit.rs +++ b/kernel/src/process/exit.rs @@ -59,7 +59,7 @@ pub fn do_exit_group(term_status: TermStatus) { // Notify parent let signal = KernelSignal::new(SIGCHLD); parent.enqueue_signal(signal); - parent.children_pauser().resume_all(); + parent.children_wait_queue().wake_all(); } } diff --git a/kernel/src/process/process/job_control.rs b/kernel/src/process/process/job_control.rs index 2b6679074..5226cb5c0 100644 --- a/kernel/src/process/process/job_control.rs +++ b/kernel/src/process/process/job_control.rs @@ -2,13 +2,14 @@ #![allow(unused_variables)] +use ostd::sync::WaitQueue; + use crate::{ prelude::*, process::{ signal::{ constants::{SIGCONT, SIGHUP}, signals::kernel::KernelSignal, - Pauser, }, ProcessGroup, Session, }, @@ -22,7 +23,7 @@ use crate::{ pub struct JobControl { foreground: SpinLock>, session: SpinLock>, - pauser: Arc, + wait_queue: WaitQueue, } impl JobControl { @@ -31,7 +32,7 @@ impl JobControl { Self { foreground: SpinLock::new(Weak::new()), session: SpinLock::new(Weak::new()), - pauser: Pauser::new(), + wait_queue: WaitQueue::new(), } } @@ -73,7 +74,7 @@ impl JobControl { let session = current.session().unwrap(); *self.session.lock() = Arc::downgrade(&session); - self.pauser.resume_all(); + self.wait_queue.wake_all(); Ok(()) } @@ -129,7 +130,7 @@ impl JobControl { } *self.foreground.lock() = Arc::downgrade(process_group); - self.pauser.resume_all(); + self.wait_queue.wake_all(); Ok(()) } @@ -146,7 +147,7 @@ impl JobControl { } // Slow path - self.pauser.pause_until(|| { + self.wait_queue.pause_until(|| { if self.current_belongs_to_foreground() { Some(()) } else { diff --git a/kernel/src/process/process/mod.rs b/kernel/src/process/process/mod.rs index 688e250cf..68381cf6c 100644 --- a/kernel/src/process/process/mod.rs +++ b/kernel/src/process/process/mod.rs @@ -9,11 +9,9 @@ use super::{ process_vm::{Heap, InitStackReader, ProcessVm}, rlimit::ResourceLimits, signal::{ - constants::SIGCHLD, sig_disposition::SigDispositions, sig_num::{AtomicSigNum, SigNum}, signals::Signal, - Pauser, }, status::ProcessStatus, Credentials, TermStatus, @@ -39,6 +37,7 @@ use aster_rights::Full; use atomic::Atomic; pub use builder::ProcessBuilder; pub use job_control::JobControl; +use ostd::sync::WaitQueue; pub use process_group::ProcessGroup; pub use session::Session; pub use terminal::Terminal; @@ -63,7 +62,7 @@ pub struct Process { process_vm: ProcessVm, /// Wait for child status changed - children_pauser: Arc, + children_wait_queue: WaitQueue, // Mutable Part /// The executable path. @@ -189,7 +188,7 @@ impl Process { ) -> Arc { // SIGCHID does not interrupt pauser. Child process will // resume paused parent when doing exit. - let children_pauser = Pauser::new_with_mask(SIGCHLD.into()); + let children_wait_queue = WaitQueue::new(); let prof_clock = ProfClock::new(); @@ -198,7 +197,7 @@ impl Process { threads: Mutex::new(threads), executable_path: RwLock::new(executable_path), process_vm, - children_pauser, + children_wait_queue, status: ProcessStatus::new_uninit(), parent: ParentProcess::new(parent), children: Mutex::new(BTreeMap::new()), @@ -343,8 +342,8 @@ impl Process { self.children.lock().contains_key(pid) } - pub fn children_pauser(&self) -> &Arc { - &self.children_pauser + pub fn children_wait_queue(&self) -> &WaitQueue { + &self.children_wait_queue } // *********** Process group & Session*********** diff --git a/kernel/src/process/signal/poll.rs b/kernel/src/process/signal/poll.rs index 9a3f5b46c..dc2957dea 100644 --- a/kernel/src/process/signal/poll.rs +++ b/kernel/src/process/signal/poll.rs @@ -5,10 +5,11 @@ use core::{ time::Duration, }; +use ostd::sync::{Waiter, Waker}; + use crate::{ events::{IoEvents, Observer, Subject}, prelude::*, - process::signal::Pauser, }; /// A pollee maintains a set of active events, which can be polled with @@ -136,6 +137,8 @@ pub struct Poller { event_counter: Arc, // All pollees that are interesting to this poller pollees: Vec>, + // A waiter used to pause the current thread. + waiter: Waiter, } impl Default for Poller { @@ -147,23 +150,25 @@ impl Default for Poller { impl Poller { /// Constructs a new `Poller`. pub fn new() -> Self { + let (waiter, waker) = Waiter::new_pair(); Self { - event_counter: Arc::new(EventCounter::new()), + event_counter: Arc::new(EventCounter::new(waker)), pollees: Vec::new(), + waiter, } } /// Wait until there are any interesting events happen since last `wait`. The `wait` /// can be interrupted by signal. pub fn wait(&self) -> Result<()> { - self.event_counter.read(None)?; + self.event_counter.read(&self.waiter, None)?; Ok(()) } /// Wait until there are any interesting events happen since last `wait` or a given timeout /// is expired. This method can be interrupted by signal. pub fn wait_timeout(&self, timeout: &Duration) -> Result<()> { - self.event_counter.read(Some(timeout))?; + self.event_counter.read(&self.waiter, Some(timeout))?; Ok(()) } @@ -188,20 +193,18 @@ impl Drop for Poller { /// A counter for wait and wakeup. struct EventCounter { counter: AtomicUsize, - pauser: Arc, + waker: Arc, } impl EventCounter { - pub fn new() -> Self { - let pauser = Pauser::new(); - + pub fn new(waker: Arc) -> Self { Self { counter: AtomicUsize::new(0), - pauser, + waker, } } - pub fn read(&self, timeout: Option<&Duration>) -> Result { + pub fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result { let cond = || { let val = self.counter.swap(0, Ordering::Relaxed); if val > 0 { @@ -212,15 +215,15 @@ impl EventCounter { }; if let Some(timeout) = timeout { - self.pauser.pause_until_or_timeout(cond, timeout) + waiter.pause_until_or_timeout(cond, timeout) } else { - self.pauser.pause_until(cond) + waiter.pause_until(cond) } } pub fn write(&self) { self.counter.fetch_add(1, Ordering::Relaxed); - self.pauser.resume_one(); + self.waker.wake_up(); } } diff --git a/kernel/src/process/wait.rs b/kernel/src/process/wait.rs index e7b3c0a08..668c340bc 100644 --- a/kernel/src/process/wait.rs +++ b/kernel/src/process/wait.rs @@ -2,7 +2,11 @@ #![allow(dead_code)] -use super::{process_filter::ProcessFilter, ExitCode, Pid, Process}; +use super::{ + process_filter::ProcessFilter, + signal::{constants::SIGCHLD, with_signal_blocked}, + ExitCode, Pid, Process, +}; use crate::{prelude::*, process::process_table, thread::thread_table}; // The definition of WaitOptions is from Occlum @@ -27,48 +31,51 @@ impl WaitOptions { pub fn wait_child_exit( child_filter: ProcessFilter, wait_options: WaitOptions, + ctx: &Context, ) -> Result>> { - let current = current!(); - let zombie_child = current.children_pauser().pause_until(|| { - let unwaited_children = current - .children() - .lock() - .values() - .filter(|child| match child_filter { - ProcessFilter::Any => true, - ProcessFilter::WithPid(pid) => child.pid() == pid, - ProcessFilter::WithPgid(pgid) => child.pgid() == pgid, - }) - .cloned() - .collect::>(); + let current = ctx.process; + let zombie_child = with_signal_blocked(ctx, SIGCHLD.into(), || { + current.children_wait_queue().pause_until(|| { + let unwaited_children = current + .children() + .lock() + .values() + .filter(|child| match child_filter { + ProcessFilter::Any => true, + ProcessFilter::WithPid(pid) => child.pid() == pid, + ProcessFilter::WithPgid(pgid) => child.pgid() == pgid, + }) + .cloned() + .collect::>(); - if unwaited_children.is_empty() { - return Some(Err(Error::with_message( - Errno::ECHILD, - "the process has no child to wait", - ))); - } - - // return immediately if we find a zombie child - let zombie_child = unwaited_children.iter().find(|child| child.is_zombie()); - - if let Some(zombie_child) = zombie_child { - let zombie_pid = zombie_child.pid(); - if wait_options.contains(WaitOptions::WNOWAIT) { - // does not reap child, directly return - return Some(Ok(Some(zombie_child.clone()))); - } else { - reap_zombie_child(¤t, zombie_pid); - return Some(Ok(Some(zombie_child.clone()))); + if unwaited_children.is_empty() { + return Some(Err(Error::with_message( + Errno::ECHILD, + "the process has no child to wait", + ))); } - } - if wait_options.contains(WaitOptions::WNOHANG) { - return Some(Ok(None)); - } + // return immediately if we find a zombie child + let zombie_child = unwaited_children.iter().find(|child| child.is_zombie()); - // wait - None + if let Some(zombie_child) = zombie_child { + let zombie_pid = zombie_child.pid(); + if wait_options.contains(WaitOptions::WNOWAIT) { + // does not reap child, directly return + return Some(Ok(Some(zombie_child.clone()))); + } else { + reap_zombie_child(current, zombie_pid); + return Some(Ok(Some(zombie_child.clone()))); + } + } + + if wait_options.contains(WaitOptions::WNOHANG) { + return Some(Ok(None)); + } + + // wait + None + }) })??; Ok(zombie_child) diff --git a/kernel/src/syscall/eventfd.rs b/kernel/src/syscall/eventfd.rs index 0e3b93714..e26628a03 100644 --- a/kernel/src/syscall/eventfd.rs +++ b/kernel/src/syscall/eventfd.rs @@ -14,6 +14,8 @@ //! refer to the man 2 eventfd documentation. //! +use ostd::sync::WaitQueue; + use super::SyscallReturn; use crate::{ events::{IoEvents, Observer}, @@ -24,7 +26,7 @@ use crate::{ }, prelude::*, process::{ - signal::{Pauser, Pollable, Pollee, Poller}, + signal::{Pollable, Pollee, Poller}, Gid, Uid, }, time::clocks::RealTimeClock, @@ -75,7 +77,7 @@ struct EventFile { counter: Mutex, pollee: Pollee, flags: Mutex, - write_pauser: Arc, + write_wait_queue: WaitQueue, } impl EventFile { @@ -84,12 +86,12 @@ impl EventFile { fn new(init_val: u64, flags: Flags) -> Self { let counter = Mutex::new(init_val); let pollee = Pollee::new(IoEvents::OUT); - let write_pauser = Pauser::new(); + let write_wait_queue = WaitQueue::new(); Self { counter, pollee, flags: Mutex::new(flags), - write_pauser, + write_wait_queue, } } @@ -112,7 +114,7 @@ impl EventFile { self.pollee.del_events(IoEvents::IN); } - self.write_pauser.resume_all(); + self.write_wait_queue.wake_all(); return; } @@ -215,7 +217,7 @@ impl FileLike for EventFile { } // Wait until counter can be added val to - self.write_pauser + self.write_wait_queue .pause_until(|| self.add_counter_val(supplied_value).ok())?; Ok(write_len) diff --git a/kernel/src/syscall/nanosleep.rs b/kernel/src/syscall/nanosleep.rs index 241b94420..bd579df0b 100644 --- a/kernel/src/syscall/nanosleep.rs +++ b/kernel/src/syscall/nanosleep.rs @@ -2,10 +2,11 @@ use core::time::Duration; +use ostd::sync::Waiter; + use super::{clock_gettime::read_clock, ClockId, SyscallReturn}; use crate::{ prelude::*, - process::signal::Pauser, time::{clockid_t, timespec_t, TIMER_ABSTIME}, }; @@ -81,9 +82,9 @@ fn do_clock_nanosleep( // FIXME: sleeping thread can only be interrupted by signals that will call signal handler or terminate // current process. i.e., the signals that should be ignored will not interrupt sleeping thread. - let pauser = Pauser::new(); + let waiter = Waiter::new_pair().0; - let res = pauser.pause_until_or_timeout(|| None, &timeout); + let res = waiter.pause_until_or_timeout(|| None, &timeout); match res { Err(e) if e.error() == Errno::ETIME => Ok(SyscallReturn::Return(0)), Err(e) if e.error() == Errno::EINTR => { diff --git a/kernel/src/syscall/pause.rs b/kernel/src/syscall/pause.rs index 04f5d5087..03463dd8d 100644 --- a/kernel/src/syscall/pause.rs +++ b/kernel/src/syscall/pause.rs @@ -1,14 +1,16 @@ // SPDX-License-Identifier: MPL-2.0 +use ostd::sync::Waiter; + use super::SyscallReturn; -use crate::{prelude::*, process::signal::Pauser}; +use crate::prelude::*; pub fn sys_pause(_ctx: &Context) -> Result { // FIXME: like sleep, paused thread can only be interrupted by signals that will call signal // handler or terminate current process - let pauser = Pauser::new(); + let waiter = Waiter::new_pair().0; - pauser.pause_until(|| None)?; + waiter.pause_until(|| None)?; unreachable!("[Internal Error] pause should always return EINTR"); } diff --git a/kernel/src/syscall/rt_sigsuspend.rs b/kernel/src/syscall/rt_sigsuspend.rs index 29cd75373..10a34089a 100644 --- a/kernel/src/syscall/rt_sigsuspend.rs +++ b/kernel/src/syscall/rt_sigsuspend.rs @@ -1,12 +1,14 @@ // SPDX-License-Identifier: MPL-2.0 +use ostd::sync::Waiter; + use super::SyscallReturn; use crate::{ prelude::*, process::signal::{ constants::{SIGKILL, SIGSTOP}, sig_mask::SigMask, - Pauser, + with_signal_blocked, }, }; @@ -34,9 +36,9 @@ pub fn sys_rt_sigsuspend( mask }; - // Pause until receiving any signal - let pauser = Pauser::new_with_mask(sigmask); - pauser.pause_until(|| None::<()>)?; + // Wait until receiving any signal + let waiter = Waiter::new_pair().0; + with_signal_blocked(ctx, sigmask, || waiter.pause_until(|| None::<()>))?; // This syscall should always return `Err(EINTR)`. This path should never be reached. unreachable!("rt_sigsuspend always return EINTR"); diff --git a/kernel/src/syscall/wait4.rs b/kernel/src/syscall/wait4.rs index e3982adc4..b815f225e 100644 --- a/kernel/src/syscall/wait4.rs +++ b/kernel/src/syscall/wait4.rs @@ -22,7 +22,7 @@ pub fn sys_wait4( debug!("wait4 current pid = {}", ctx.process.pid()); let process_filter = ProcessFilter::from_id(wait_pid as _); - let waited_process = wait_child_exit(process_filter, wait_options)?; + let waited_process = wait_child_exit(process_filter, wait_options, ctx)?; let Some(process) = waited_process else { return Ok(SyscallReturn::Return(0 as _)); }; diff --git a/kernel/src/syscall/waitid.rs b/kernel/src/syscall/waitid.rs index 3e78f4a3c..d54c98796 100644 --- a/kernel/src/syscall/waitid.rs +++ b/kernel/src/syscall/waitid.rs @@ -12,13 +12,13 @@ pub fn sys_waitid( _infoq_addr: u64, options: u64, _rusage_addr: u64, - _ctx: &Context, + ctx: &Context, ) -> Result { // FIXME: what does infoq and rusage use for? let process_filter = ProcessFilter::from_which_and_id(which, upid)?; let wait_options = WaitOptions::from_bits(options as u32) .ok_or(Error::with_message(Errno::EINVAL, "invalid options"))?; - let waited_process = wait_child_exit(process_filter, wait_options)?; + let waited_process = wait_child_exit(process_filter, wait_options, ctx)?; let pid = waited_process.map_or(0, |process| process.pid()); Ok(SyscallReturn::Return(pid as _)) }