Replace the Pausers' usage with Waiter/WaitQueue

This commit is contained in:
Chen Chengjun
2024-09-14 10:12:31 +08:00
committed by Tate, Hongliang Tian
parent 822caf34f4
commit 42e28763c5
12 changed files with 110 additions and 91 deletions

View File

@ -2,6 +2,8 @@
use core::sync::atomic::{AtomicUsize, Ordering}; use core::sync::atomic::{AtomicUsize, Ordering};
use ostd::sync::WaitQueue;
use super::{ use super::{
connected::{combine_io_events, Connected}, connected::{combine_io_events, Connected},
init::Init, init::Init,
@ -15,7 +17,7 @@ use crate::{
SockShutdownCmd, SocketAddr, SockShutdownCmd, SocketAddr,
}, },
prelude::*, prelude::*,
process::signal::{Pauser, Pollee, Poller}, process::signal::{Pollee, Poller},
}; };
pub(super) struct Listener { pub(super) struct Listener {
@ -159,7 +161,7 @@ pub(super) struct Backlog {
pollee: Pollee, pollee: Pollee,
backlog: AtomicUsize, backlog: AtomicUsize,
incoming_conns: Mutex<Option<VecDeque<Connected>>>, incoming_conns: Mutex<Option<VecDeque<Connected>>>,
pauser: Arc<Pauser>, wait_queue: WaitQueue,
} }
impl Backlog { impl Backlog {
@ -175,7 +177,7 @@ impl Backlog {
pollee, pollee,
backlog: AtomicUsize::new(backlog), backlog: AtomicUsize::new(backlog),
incoming_conns: Mutex::new(incoming_sockets), incoming_conns: Mutex::new(incoming_sockets),
pauser: Pauser::new(), wait_queue: WaitQueue::new(),
} }
} }
@ -198,7 +200,7 @@ impl Backlog {
drop(locked_incoming_conns); drop(locked_incoming_conns);
if conn.is_some() { if conn.is_some() {
self.pauser.resume_one(); self.wait_queue.wake_one();
} }
conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available")) conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
@ -208,7 +210,7 @@ impl Backlog {
let old_backlog = self.backlog.swap(backlog, Ordering::Relaxed); let old_backlog = self.backlog.swap(backlog, Ordering::Relaxed);
if old_backlog < backlog { if old_backlog < backlog {
self.pauser.resume_all(); self.wait_queue.wake_all();
} }
} }
@ -221,7 +223,7 @@ impl Backlog {
drop(incoming_conns); drop(incoming_conns);
self.pauser.resume_all(); self.wait_queue.wake_all();
} }
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents { fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
@ -284,7 +286,7 @@ impl Backlog {
where where
F: FnMut() -> Result<()>, F: FnMut() -> Result<()>,
{ {
self.pauser.pause_until(|| match cond() { self.wait_queue.pause_until(|| match cond() {
Err(err) if err.error() == Errno::EAGAIN => None, Err(err) if err.error() == Errno::EAGAIN => None,
result => Some(result), result => Some(result),
})? })?

View File

@ -59,7 +59,7 @@ pub fn do_exit_group(term_status: TermStatus) {
// Notify parent // Notify parent
let signal = KernelSignal::new(SIGCHLD); let signal = KernelSignal::new(SIGCHLD);
parent.enqueue_signal(signal); parent.enqueue_signal(signal);
parent.children_pauser().resume_all(); parent.children_wait_queue().wake_all();
} }
} }

View File

@ -2,13 +2,14 @@
#![allow(unused_variables)] #![allow(unused_variables)]
use ostd::sync::WaitQueue;
use crate::{ use crate::{
prelude::*, prelude::*,
process::{ process::{
signal::{ signal::{
constants::{SIGCONT, SIGHUP}, constants::{SIGCONT, SIGHUP},
signals::kernel::KernelSignal, signals::kernel::KernelSignal,
Pauser,
}, },
ProcessGroup, Session, ProcessGroup, Session,
}, },
@ -22,7 +23,7 @@ use crate::{
pub struct JobControl { pub struct JobControl {
foreground: SpinLock<Weak<ProcessGroup>>, foreground: SpinLock<Weak<ProcessGroup>>,
session: SpinLock<Weak<Session>>, session: SpinLock<Weak<Session>>,
pauser: Arc<Pauser>, wait_queue: WaitQueue,
} }
impl JobControl { impl JobControl {
@ -31,7 +32,7 @@ impl JobControl {
Self { Self {
foreground: SpinLock::new(Weak::new()), foreground: SpinLock::new(Weak::new()),
session: SpinLock::new(Weak::new()), session: SpinLock::new(Weak::new()),
pauser: Pauser::new(), wait_queue: WaitQueue::new(),
} }
} }
@ -73,7 +74,7 @@ impl JobControl {
let session = current.session().unwrap(); let session = current.session().unwrap();
*self.session.lock() = Arc::downgrade(&session); *self.session.lock() = Arc::downgrade(&session);
self.pauser.resume_all(); self.wait_queue.wake_all();
Ok(()) Ok(())
} }
@ -129,7 +130,7 @@ impl JobControl {
} }
*self.foreground.lock() = Arc::downgrade(process_group); *self.foreground.lock() = Arc::downgrade(process_group);
self.pauser.resume_all(); self.wait_queue.wake_all();
Ok(()) Ok(())
} }
@ -146,7 +147,7 @@ impl JobControl {
} }
// Slow path // Slow path
self.pauser.pause_until(|| { self.wait_queue.pause_until(|| {
if self.current_belongs_to_foreground() { if self.current_belongs_to_foreground() {
Some(()) Some(())
} else { } else {

View File

@ -9,11 +9,9 @@ use super::{
process_vm::{Heap, InitStackReader, ProcessVm}, process_vm::{Heap, InitStackReader, ProcessVm},
rlimit::ResourceLimits, rlimit::ResourceLimits,
signal::{ signal::{
constants::SIGCHLD,
sig_disposition::SigDispositions, sig_disposition::SigDispositions,
sig_num::{AtomicSigNum, SigNum}, sig_num::{AtomicSigNum, SigNum},
signals::Signal, signals::Signal,
Pauser,
}, },
status::ProcessStatus, status::ProcessStatus,
Credentials, TermStatus, Credentials, TermStatus,
@ -39,6 +37,7 @@ use aster_rights::Full;
use atomic::Atomic; use atomic::Atomic;
pub use builder::ProcessBuilder; pub use builder::ProcessBuilder;
pub use job_control::JobControl; pub use job_control::JobControl;
use ostd::sync::WaitQueue;
pub use process_group::ProcessGroup; pub use process_group::ProcessGroup;
pub use session::Session; pub use session::Session;
pub use terminal::Terminal; pub use terminal::Terminal;
@ -63,7 +62,7 @@ pub struct Process {
process_vm: ProcessVm, process_vm: ProcessVm,
/// Wait for child status changed /// Wait for child status changed
children_pauser: Arc<Pauser>, children_wait_queue: WaitQueue,
// Mutable Part // Mutable Part
/// The executable path. /// The executable path.
@ -189,7 +188,7 @@ impl Process {
) -> Arc<Self> { ) -> Arc<Self> {
// SIGCHID does not interrupt pauser. Child process will // SIGCHID does not interrupt pauser. Child process will
// resume paused parent when doing exit. // resume paused parent when doing exit.
let children_pauser = Pauser::new_with_mask(SIGCHLD.into()); let children_wait_queue = WaitQueue::new();
let prof_clock = ProfClock::new(); let prof_clock = ProfClock::new();
@ -198,7 +197,7 @@ impl Process {
threads: Mutex::new(threads), threads: Mutex::new(threads),
executable_path: RwLock::new(executable_path), executable_path: RwLock::new(executable_path),
process_vm, process_vm,
children_pauser, children_wait_queue,
status: ProcessStatus::new_uninit(), status: ProcessStatus::new_uninit(),
parent: ParentProcess::new(parent), parent: ParentProcess::new(parent),
children: Mutex::new(BTreeMap::new()), children: Mutex::new(BTreeMap::new()),
@ -343,8 +342,8 @@ impl Process {
self.children.lock().contains_key(pid) self.children.lock().contains_key(pid)
} }
pub fn children_pauser(&self) -> &Arc<Pauser> { pub fn children_wait_queue(&self) -> &WaitQueue {
&self.children_pauser &self.children_wait_queue
} }
// *********** Process group & Session*********** // *********** Process group & Session***********

View File

@ -5,10 +5,11 @@ use core::{
time::Duration, time::Duration,
}; };
use ostd::sync::{Waiter, Waker};
use crate::{ use crate::{
events::{IoEvents, Observer, Subject}, events::{IoEvents, Observer, Subject},
prelude::*, prelude::*,
process::signal::Pauser,
}; };
/// A pollee maintains a set of active events, which can be polled with /// A pollee maintains a set of active events, which can be polled with
@ -136,6 +137,8 @@ pub struct Poller {
event_counter: Arc<EventCounter>, event_counter: Arc<EventCounter>,
// All pollees that are interesting to this poller // All pollees that are interesting to this poller
pollees: Vec<Weak<PolleeInner>>, pollees: Vec<Weak<PolleeInner>>,
// A waiter used to pause the current thread.
waiter: Waiter,
} }
impl Default for Poller { impl Default for Poller {
@ -147,23 +150,25 @@ impl Default for Poller {
impl Poller { impl Poller {
/// Constructs a new `Poller`. /// Constructs a new `Poller`.
pub fn new() -> Self { pub fn new() -> Self {
let (waiter, waker) = Waiter::new_pair();
Self { Self {
event_counter: Arc::new(EventCounter::new()), event_counter: Arc::new(EventCounter::new(waker)),
pollees: Vec::new(), pollees: Vec::new(),
waiter,
} }
} }
/// Wait until there are any interesting events happen since last `wait`. The `wait` /// Wait until there are any interesting events happen since last `wait`. The `wait`
/// can be interrupted by signal. /// can be interrupted by signal.
pub fn wait(&self) -> Result<()> { pub fn wait(&self) -> Result<()> {
self.event_counter.read(None)?; self.event_counter.read(&self.waiter, None)?;
Ok(()) Ok(())
} }
/// Wait until there are any interesting events happen since last `wait` or a given timeout /// Wait until there are any interesting events happen since last `wait` or a given timeout
/// is expired. This method can be interrupted by signal. /// is expired. This method can be interrupted by signal.
pub fn wait_timeout(&self, timeout: &Duration) -> Result<()> { pub fn wait_timeout(&self, timeout: &Duration) -> Result<()> {
self.event_counter.read(Some(timeout))?; self.event_counter.read(&self.waiter, Some(timeout))?;
Ok(()) Ok(())
} }
@ -188,20 +193,18 @@ impl Drop for Poller {
/// A counter for wait and wakeup. /// A counter for wait and wakeup.
struct EventCounter { struct EventCounter {
counter: AtomicUsize, counter: AtomicUsize,
pauser: Arc<Pauser>, waker: Arc<Waker>,
} }
impl EventCounter { impl EventCounter {
pub fn new() -> Self { pub fn new(waker: Arc<Waker>) -> Self {
let pauser = Pauser::new();
Self { Self {
counter: AtomicUsize::new(0), counter: AtomicUsize::new(0),
pauser, waker,
} }
} }
pub fn read(&self, timeout: Option<&Duration>) -> Result<usize> { pub fn read(&self, waiter: &Waiter, timeout: Option<&Duration>) -> Result<usize> {
let cond = || { let cond = || {
let val = self.counter.swap(0, Ordering::Relaxed); let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 { if val > 0 {
@ -212,15 +215,15 @@ impl EventCounter {
}; };
if let Some(timeout) = timeout { if let Some(timeout) = timeout {
self.pauser.pause_until_or_timeout(cond, timeout) waiter.pause_until_or_timeout(cond, timeout)
} else { } else {
self.pauser.pause_until(cond) waiter.pause_until(cond)
} }
} }
pub fn write(&self) { pub fn write(&self) {
self.counter.fetch_add(1, Ordering::Relaxed); self.counter.fetch_add(1, Ordering::Relaxed);
self.pauser.resume_one(); self.waker.wake_up();
} }
} }

View File

@ -2,7 +2,11 @@
#![allow(dead_code)] #![allow(dead_code)]
use super::{process_filter::ProcessFilter, ExitCode, Pid, Process}; use super::{
process_filter::ProcessFilter,
signal::{constants::SIGCHLD, with_signal_blocked},
ExitCode, Pid, Process,
};
use crate::{prelude::*, process::process_table, thread::thread_table}; use crate::{prelude::*, process::process_table, thread::thread_table};
// The definition of WaitOptions is from Occlum // The definition of WaitOptions is from Occlum
@ -27,9 +31,11 @@ impl WaitOptions {
pub fn wait_child_exit( pub fn wait_child_exit(
child_filter: ProcessFilter, child_filter: ProcessFilter,
wait_options: WaitOptions, wait_options: WaitOptions,
ctx: &Context,
) -> Result<Option<Arc<Process>>> { ) -> Result<Option<Arc<Process>>> {
let current = current!(); let current = ctx.process;
let zombie_child = current.children_pauser().pause_until(|| { let zombie_child = with_signal_blocked(ctx, SIGCHLD.into(), || {
current.children_wait_queue().pause_until(|| {
let unwaited_children = current let unwaited_children = current
.children() .children()
.lock() .lock()
@ -58,7 +64,7 @@ pub fn wait_child_exit(
// does not reap child, directly return // does not reap child, directly return
return Some(Ok(Some(zombie_child.clone()))); return Some(Ok(Some(zombie_child.clone())));
} else { } else {
reap_zombie_child(&current, zombie_pid); reap_zombie_child(current, zombie_pid);
return Some(Ok(Some(zombie_child.clone()))); return Some(Ok(Some(zombie_child.clone())));
} }
} }
@ -69,6 +75,7 @@ pub fn wait_child_exit(
// wait // wait
None None
})
})??; })??;
Ok(zombie_child) Ok(zombie_child)

View File

@ -14,6 +14,8 @@
//! refer to the man 2 eventfd documentation. //! refer to the man 2 eventfd documentation.
//! //!
use ostd::sync::WaitQueue;
use super::SyscallReturn; use super::SyscallReturn;
use crate::{ use crate::{
events::{IoEvents, Observer}, events::{IoEvents, Observer},
@ -24,7 +26,7 @@ use crate::{
}, },
prelude::*, prelude::*,
process::{ process::{
signal::{Pauser, Pollable, Pollee, Poller}, signal::{Pollable, Pollee, Poller},
Gid, Uid, Gid, Uid,
}, },
time::clocks::RealTimeClock, time::clocks::RealTimeClock,
@ -75,7 +77,7 @@ struct EventFile {
counter: Mutex<u64>, counter: Mutex<u64>,
pollee: Pollee, pollee: Pollee,
flags: Mutex<Flags>, flags: Mutex<Flags>,
write_pauser: Arc<Pauser>, write_wait_queue: WaitQueue,
} }
impl EventFile { impl EventFile {
@ -84,12 +86,12 @@ impl EventFile {
fn new(init_val: u64, flags: Flags) -> Self { fn new(init_val: u64, flags: Flags) -> Self {
let counter = Mutex::new(init_val); let counter = Mutex::new(init_val);
let pollee = Pollee::new(IoEvents::OUT); let pollee = Pollee::new(IoEvents::OUT);
let write_pauser = Pauser::new(); let write_wait_queue = WaitQueue::new();
Self { Self {
counter, counter,
pollee, pollee,
flags: Mutex::new(flags), flags: Mutex::new(flags),
write_pauser, write_wait_queue,
} }
} }
@ -112,7 +114,7 @@ impl EventFile {
self.pollee.del_events(IoEvents::IN); self.pollee.del_events(IoEvents::IN);
} }
self.write_pauser.resume_all(); self.write_wait_queue.wake_all();
return; return;
} }
@ -215,7 +217,7 @@ impl FileLike for EventFile {
} }
// Wait until counter can be added val to // Wait until counter can be added val to
self.write_pauser self.write_wait_queue
.pause_until(|| self.add_counter_val(supplied_value).ok())?; .pause_until(|| self.add_counter_val(supplied_value).ok())?;
Ok(write_len) Ok(write_len)

View File

@ -2,10 +2,11 @@
use core::time::Duration; use core::time::Duration;
use ostd::sync::Waiter;
use super::{clock_gettime::read_clock, ClockId, SyscallReturn}; use super::{clock_gettime::read_clock, ClockId, SyscallReturn};
use crate::{ use crate::{
prelude::*, prelude::*,
process::signal::Pauser,
time::{clockid_t, timespec_t, TIMER_ABSTIME}, time::{clockid_t, timespec_t, TIMER_ABSTIME},
}; };
@ -81,9 +82,9 @@ fn do_clock_nanosleep(
// FIXME: sleeping thread can only be interrupted by signals that will call signal handler or terminate // FIXME: sleeping thread can only be interrupted by signals that will call signal handler or terminate
// current process. i.e., the signals that should be ignored will not interrupt sleeping thread. // current process. i.e., the signals that should be ignored will not interrupt sleeping thread.
let pauser = Pauser::new(); let waiter = Waiter::new_pair().0;
let res = pauser.pause_until_or_timeout(|| None, &timeout); let res = waiter.pause_until_or_timeout(|| None, &timeout);
match res { match res {
Err(e) if e.error() == Errno::ETIME => Ok(SyscallReturn::Return(0)), Err(e) if e.error() == Errno::ETIME => Ok(SyscallReturn::Return(0)),
Err(e) if e.error() == Errno::EINTR => { Err(e) if e.error() == Errno::EINTR => {

View File

@ -1,14 +1,16 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use ostd::sync::Waiter;
use super::SyscallReturn; use super::SyscallReturn;
use crate::{prelude::*, process::signal::Pauser}; use crate::prelude::*;
pub fn sys_pause(_ctx: &Context) -> Result<SyscallReturn> { pub fn sys_pause(_ctx: &Context) -> Result<SyscallReturn> {
// FIXME: like sleep, paused thread can only be interrupted by signals that will call signal // FIXME: like sleep, paused thread can only be interrupted by signals that will call signal
// handler or terminate current process // handler or terminate current process
let pauser = Pauser::new(); let waiter = Waiter::new_pair().0;
pauser.pause_until(|| None)?; waiter.pause_until(|| None)?;
unreachable!("[Internal Error] pause should always return EINTR"); unreachable!("[Internal Error] pause should always return EINTR");
} }

View File

@ -1,12 +1,14 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use ostd::sync::Waiter;
use super::SyscallReturn; use super::SyscallReturn;
use crate::{ use crate::{
prelude::*, prelude::*,
process::signal::{ process::signal::{
constants::{SIGKILL, SIGSTOP}, constants::{SIGKILL, SIGSTOP},
sig_mask::SigMask, sig_mask::SigMask,
Pauser, with_signal_blocked,
}, },
}; };
@ -34,9 +36,9 @@ pub fn sys_rt_sigsuspend(
mask mask
}; };
// Pause until receiving any signal // Wait until receiving any signal
let pauser = Pauser::new_with_mask(sigmask); let waiter = Waiter::new_pair().0;
pauser.pause_until(|| None::<()>)?; with_signal_blocked(ctx, sigmask, || waiter.pause_until(|| None::<()>))?;
// This syscall should always return `Err(EINTR)`. This path should never be reached. // This syscall should always return `Err(EINTR)`. This path should never be reached.
unreachable!("rt_sigsuspend always return EINTR"); unreachable!("rt_sigsuspend always return EINTR");

View File

@ -22,7 +22,7 @@ pub fn sys_wait4(
debug!("wait4 current pid = {}", ctx.process.pid()); debug!("wait4 current pid = {}", ctx.process.pid());
let process_filter = ProcessFilter::from_id(wait_pid as _); let process_filter = ProcessFilter::from_id(wait_pid as _);
let waited_process = wait_child_exit(process_filter, wait_options)?; let waited_process = wait_child_exit(process_filter, wait_options, ctx)?;
let Some(process) = waited_process else { let Some(process) = waited_process else {
return Ok(SyscallReturn::Return(0 as _)); return Ok(SyscallReturn::Return(0 as _));
}; };

View File

@ -12,13 +12,13 @@ pub fn sys_waitid(
_infoq_addr: u64, _infoq_addr: u64,
options: u64, options: u64,
_rusage_addr: u64, _rusage_addr: u64,
_ctx: &Context, ctx: &Context,
) -> Result<SyscallReturn> { ) -> Result<SyscallReturn> {
// FIXME: what does infoq and rusage use for? // FIXME: what does infoq and rusage use for?
let process_filter = ProcessFilter::from_which_and_id(which, upid)?; let process_filter = ProcessFilter::from_which_and_id(which, upid)?;
let wait_options = WaitOptions::from_bits(options as u32) let wait_options = WaitOptions::from_bits(options as u32)
.ok_or(Error::with_message(Errno::EINVAL, "invalid options"))?; .ok_or(Error::with_message(Errno::EINVAL, "invalid options"))?;
let waited_process = wait_child_exit(process_filter, wait_options)?; let waited_process = wait_child_exit(process_filter, wait_options, ctx)?;
let pid = waited_process.map_or(0, |process| process.pid()); let pid = waited_process.map_or(0, |process| process.pid());
Ok(SyscallReturn::Return(pid as _)) Ok(SyscallReturn::Return(pid as _))
} }