Support wait_interruptible with SigQueueObserver

This commit is contained in:
Jianfeng Jiang
2023-09-26 17:14:21 +08:00
committed by Tate, Hongliang Tian
parent ec857e5205
commit d2aa06cbe2
9 changed files with 270 additions and 68 deletions

View File

@ -1,4 +1,5 @@
use crate::{ use crate::{
events::Observer,
prelude::*, prelude::*,
process::{ process::{
do_exit_group, do_exit_group,
@ -10,7 +11,9 @@ use crate::{
}; };
use super::{ use super::{
signal::{sig_mask::SigMask, sig_queues::SigQueues, signals::Signal}, signal::{
sig_mask::SigMask, sig_queues::SigQueues, signals::Signal, SigEvents, SigEventsFilter,
},
Process, Process,
}; };
@ -71,10 +74,6 @@ impl PosixThread {
&self.sig_mask &self.sig_mask
} }
pub fn sig_queues(&self) -> &Mutex<SigQueues> {
&self.sig_queues
}
pub fn has_pending_signal(&self) -> bool { pub fn has_pending_signal(&self) -> bool {
self.sig_queues.lock().is_empty() self.sig_queues.lock().is_empty()
} }
@ -87,6 +86,18 @@ impl PosixThread {
self.sig_queues.lock().dequeue(mask) self.sig_queues.lock().dequeue(mask)
} }
pub fn register_sigqueue_observer(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
) {
self.sig_queues.lock().register_observer(observer, filter);
}
pub fn unregiser_sigqueue_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.sig_queues.lock().unregister_observer(observer);
}
pub fn sig_context(&self) -> &Mutex<Option<Vaddr>> { pub fn sig_context(&self) -> &Mutex<Option<Vaddr>> {
&self.sig_context &self.sig_context
} }

View File

@ -9,9 +9,11 @@ use super::signal::sig_disposition::SigDispositions;
use super::signal::sig_mask::SigMask; use super::signal::sig_mask::SigMask;
use super::signal::sig_queues::SigQueues; use super::signal::sig_queues::SigQueues;
use super::signal::signals::Signal; use super::signal::signals::Signal;
use super::signal::{SigEvents, SigEventsFilter};
use super::status::ProcessStatus; use super::status::ProcessStatus;
use super::{process_table, TermStatus}; use super::{process_table, TermStatus};
use crate::device::tty::get_n_tty; use crate::device::tty::get_n_tty;
use crate::events::Observer;
use crate::fs::file_table::FileTable; use crate::fs::file_table::FileTable;
use crate::fs::fs_resolver::FsResolver; use crate::fs::fs_resolver::FsResolver;
use crate::fs::utils::FileCreationMask; use crate::fs::utils::FileCreationMask;
@ -263,6 +265,18 @@ impl Process {
self.sig_queues.lock().dequeue(mask) self.sig_queues.lock().dequeue(mask)
} }
pub fn register_sigqueue_observer(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
) {
self.sig_queues.lock().register_observer(observer, filter);
}
pub fn unregiser_sigqueue_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.sig_queues.lock().unregister_observer(observer);
}
// ******************* Status ******************** // ******************* Status ********************
fn set_runnable(&self) { fn set_runnable(&self) {

View File

@ -0,0 +1,127 @@
use core::sync::atomic::{AtomicBool, Ordering};
use core::time::Duration;
use jinux_frame::sync::WaitQueue;
use crate::events::{Events, EventsFilter, Observer};
use crate::prelude::*;
use crate::process::posix_thread::PosixThreadExt;
use super::sig_mask::SigMask;
use super::sig_num::SigNum;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SigEvents(SigNum);
impl SigEvents {
pub fn new(sig_num: SigNum) -> Self {
Self(sig_num)
}
}
impl Events for SigEvents {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SigEventsFilter(SigMask);
impl SigEventsFilter {
pub fn new(mask: SigMask) -> Self {
Self(mask)
}
}
impl EventsFilter<SigEvents> for SigEventsFilter {
fn filter(&self, event: &SigEvents) -> bool {
self.0.contains(event.0)
}
}
pub struct SigQueueObserver {
wait_queue: WaitQueue,
mask: SigMask,
is_interrupted: AtomicBool,
}
impl SigQueueObserver {
pub fn new(mask: SigMask) -> Arc<Self> {
let wait_queue = WaitQueue::new();
Arc::new(Self {
wait_queue,
mask,
is_interrupted: AtomicBool::new(false),
})
}
/// Wait until cond() returns Some(_).
///
/// If some signal is caught before cond() returns Some(_), it will returns EINTR.
pub fn wait_until_interruptible<F, R>(
self: &Arc<Self>,
mut cond: F,
timeout: Option<&Duration>,
) -> Result<R>
where
F: FnMut() -> Option<R>,
{
self.is_interrupted.store(false, Ordering::Release);
// Register observers on sigqueues
let observer = Arc::downgrade(self) as Weak<dyn Observer<SigEvents>>;
let filter = SigEventsFilter::new(self.mask);
let current = current!();
current.register_sigqueue_observer(observer.clone(), filter);
let current_thread = current_thread!();
let posix_thread = current_thread.as_posix_thread().unwrap();
posix_thread.register_sigqueue_observer(observer.clone(), filter);
// Some signal may come before we register observer, so we do another check here.
if posix_thread.has_pending_signal() || current.has_pending_signal() {
self.is_interrupted.store(true, Ordering::Release);
}
enum Res<R> {
Ok(R),
Interrupted,
}
let res = self.wait_queue.wait_until(
|| {
if let Some(res) = cond() {
return Some(Res::Ok(res));
}
if self.is_interrupted.load(Ordering::Acquire) {
return Some(Res::Interrupted);
}
None
},
timeout,
)?;
current.unregiser_sigqueue_observer(&observer);
posix_thread.unregiser_sigqueue_observer(&observer);
match res {
Res::Ok(r) => Ok(r),
Res::Interrupted => return_errno_with_message!(Errno::EINTR, "interrupted by signal"),
}
}
pub fn wait_until_uninterruptible<F, R>(&self, cond: F, timeout: Option<&Duration>) -> Result<R>
where
F: FnMut() -> Option<R>,
{
Ok(self.wait_queue.wait_until(cond, timeout)?)
}
}
impl Observer<SigEvents> for SigQueueObserver {
fn on_events(&self, events: &SigEvents) {
self.is_interrupted.store(true, Ordering::Release);
self.wait_queue.wake_all();
}
}

View File

@ -1,5 +1,6 @@
pub mod c_types; pub mod c_types;
pub mod constants; pub mod constants;
mod events;
pub mod sig_action; pub mod sig_action;
pub mod sig_disposition; pub mod sig_disposition;
pub mod sig_mask; pub mod sig_mask;
@ -7,6 +8,8 @@ pub mod sig_num;
pub mod sig_queues; pub mod sig_queues;
pub mod signals; pub mod signals;
pub use events::{SigEvents, SigEventsFilter, SigQueueObserver};
use core::mem; use core::mem;
use align_ext::AlignExt; use align_ext::AlignExt;

View File

@ -1,4 +1,5 @@
use super::constants::*; use super::{constants::*, SigEvents, SigEventsFilter};
use crate::events::{Observer, Subject};
use crate::prelude::*; use crate::prelude::*;
use super::sig_mask::SigMask; use super::sig_mask::SigMask;
@ -9,6 +10,7 @@ pub struct SigQueues {
count: usize, count: usize,
std_queues: Vec<Option<Box<dyn Signal>>>, std_queues: Vec<Option<Box<dyn Signal>>>,
rt_queues: Vec<VecDeque<Box<dyn Signal>>>, rt_queues: Vec<VecDeque<Box<dyn Signal>>>,
subject: Subject<SigEvents, SigEventsFilter>,
} }
impl SigQueues { impl SigQueues {
@ -16,11 +18,12 @@ impl SigQueues {
let count = 0; let count = 0;
let std_queues = (0..COUNT_STD_SIGS).map(|_| None).collect(); let std_queues = (0..COUNT_STD_SIGS).map(|_| None).collect();
let rt_queues = (0..COUNT_RT_SIGS).map(|_| Default::default()).collect(); let rt_queues = (0..COUNT_RT_SIGS).map(|_| Default::default()).collect();
// let notifier = Notifier::new(); let subject = Subject::new();
SigQueues { SigQueues {
count, count,
std_queues, std_queues,
rt_queues, rt_queues,
subject,
} }
} }
@ -58,7 +61,7 @@ impl SigQueues {
self.count += 1; self.count += 1;
} }
// self.notifier.broadcast(&signum); self.subject.notify_observers(&SigEvents::new(signum));
} }
pub fn dequeue(&mut self, blocked: &SigMask) -> Option<Box<dyn Signal>> { pub fn dequeue(&mut self, blocked: &SigMask) -> Option<Box<dyn Signal>> {
@ -134,6 +137,18 @@ impl SigQueues {
let idx = (signum.as_u8() - MIN_RT_SIG_NUM) as usize; let idx = (signum.as_u8() - MIN_RT_SIG_NUM) as usize;
&mut self.rt_queues[idx] &mut self.rt_queues[idx]
} }
pub fn register_observer(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
) {
self.subject.register_observer(observer, filter);
}
pub fn unregister_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.subject.unregister_observer(observer);
}
} }
impl Default for SigQueues { impl Default for SigQueues {

View File

@ -1,13 +1,10 @@
use core::time::Duration;
use jinux_frame::timer::read_monotonic_milli_seconds;
use super::SyscallReturn; use super::SyscallReturn;
use super::SYS_CLOCK_GETTIME; use super::SYS_CLOCK_GETTIME;
use crate::time::now_as_duration;
use crate::{ use crate::{
log_syscall_entry, log_syscall_entry,
prelude::*, prelude::*,
time::{clockid_t, timespec_t, ClockID, SystemTime}, time::{clockid_t, timespec_t, ClockID},
util::write_val_to_user, util::write_val_to_user,
}; };
@ -16,26 +13,7 @@ pub fn sys_clock_gettime(clockid: clockid_t, timespec_addr: Vaddr) -> Result<Sys
let clock_id = ClockID::try_from(clockid)?; let clock_id = ClockID::try_from(clockid)?;
debug!("clockid = {:?}", clock_id); debug!("clockid = {:?}", clock_id);
let now = SystemTime::now(); let time_duration = now_as_duration(&clock_id)?;
let time_duration = match clock_id {
ClockID::CLOCK_REALTIME | ClockID::CLOCK_REALTIME_COARSE => {
now.duration_since(&SystemTime::UNIX_EPOCH)?
}
ClockID::CLOCK_MONOTONIC => {
let time_ms = read_monotonic_milli_seconds();
let secs = time_ms / 1000;
let nanos = (time_ms % 1000) * 1000;
Duration::new(secs, nanos as u32)
}
// TODO: Respect other type of clock_id
_ => {
warn!(
"unsupported clock_id: {:?}, treat it as CLOCK_REALTIME",
clock_id
);
now.duration_since(&SystemTime::UNIX_EPOCH)?
}
};
let timespec = timespec_t::from(time_duration); let timespec = timespec_t::from(time_duration);
write_val_to_user(timespec_addr, &timespec)?; write_val_to_user(timespec_addr, &timespec)?;

View File

@ -2,13 +2,12 @@ use core::time::Duration;
use super::SyscallReturn; use super::SyscallReturn;
use super::SYS_CLOCK_NANOSLEEP; use super::SYS_CLOCK_NANOSLEEP;
use crate::{ use crate::log_syscall_entry;
log_syscall_entry, use crate::prelude::*;
prelude::*, use crate::process::signal::sig_mask::SigMask;
thread::Thread, use crate::process::signal::SigQueueObserver;
time::{clockid_t, timespec_t, ClockID, TIMER_ABSTIME}, use crate::time::{clockid_t, now_as_duration, timespec_t, ClockID, TIMER_ABSTIME};
util::{read_val_from_user, write_val_to_user}, use crate::util::{read_val_from_user, write_val_to_user};
};
pub fn sys_clock_nanosleep( pub fn sys_clock_nanosleep(
clockid: clockid_t, clockid: clockid_t,
@ -25,20 +24,47 @@ pub fn sys_clock_nanosleep(
} else { } else {
unreachable!() unreachable!()
}; };
let request_timespec = read_val_from_user::<timespec_t>(request_timespec_addr)?;
let duration = {
let timespec = read_val_from_user::<timespec_t>(request_timespec_addr)?;
if abs_time {
todo!("deal with abs time");
}
Duration::from(timespec)
};
debug!( debug!(
"clockid = {:?}, abs_time = {}, request_timespec = {:?}, remain timespec addr = 0x{:x}", "clockid = {:?}, abs_time = {}, duration = {:?}, remain_timespec_addr = 0x{:x}",
clock_id, abs_time, request_timespec, remain_timespec_addr clock_id, abs_time, duration, remain_timespec_addr
); );
// FIXME: do real sleep. Here we simply yield the execution of current thread since we does not have timeout support now.
// If the sleep is interrupted by a signal, this syscall should return error.
Thread::yield_now();
if remain_timespec_addr != 0 {
let remain_duration = Duration::new(0, 0);
let remain_timespec = timespec_t::from(remain_duration);
write_val_to_user(remain_timespec_addr, &remain_timespec)?;
}
Ok(SyscallReturn::Return(0)) let start_time = now_as_duration(&clock_id)?;
let sigqueue_observer = {
// FIXME: sleeping thread can only be interrupted by signals that will call signal handler or terminate
// current process. i.e., the signals that should be ignored will not interrupt sleeping thread.
let sigmask = SigMask::new_full();
SigQueueObserver::new(sigmask)
};
let res = sigqueue_observer.wait_until_interruptible(|| None, Some(&duration));
match res {
Err(e) if e.error() == Errno::ETIME => Ok(SyscallReturn::Return(0)),
Err(e) if e.error() == Errno::EINTR => {
let end_time = now_as_duration(&clock_id)?;
if end_time >= start_time + duration {
return Ok(SyscallReturn::Return(0));
}
if remain_timespec_addr != 0 {
let remaining_duration = (start_time + duration) - end_time;
let remaining_timespec = timespec_t::from(remaining_duration);
write_val_to_user(remain_timespec_addr, &remaining_timespec)?;
}
return_errno_with_message!(Errno::EINTR, "sleep was interrupted");
}
Ok(()) | Err(_) => unreachable!(),
}
} }

View File

@ -1,20 +1,21 @@
use crate::{prelude::*, process::posix_thread::PosixThreadExt, thread::Thread}; use crate::log_syscall_entry;
use crate::prelude::*;
use crate::process::signal::sig_mask::SigMask;
use crate::process::signal::SigQueueObserver;
use super::SyscallReturn; use super::{SyscallReturn, SYS_PAUSE};
pub fn sys_pause() -> Result<SyscallReturn> { pub fn sys_pause() -> Result<SyscallReturn> {
loop { log_syscall_entry!(SYS_PAUSE);
let current_thread = current_thread!();
// check sig_queue of current thread and process, let sigqueue_observer = {
// if there's any pending signal, break loop // FIXME: like sleep, paused thread can only be interrupted by signals that will call signal
let posix_thread = current_thread.as_posix_thread().unwrap(); // handler or terminate current process
if posix_thread.has_pending_signal() || current!().has_pending_signal() { let sigmask = SigMask::new_full();
break; SigQueueObserver::new(sigmask)
} };
// there's no pending signal, yield execution
// FIXME: set current thread interruptible here sigqueue_observer.wait_until_interruptible(|| None, None)?;
Thread::yield_now();
} unreachable!("[Internal Error] pause should always return EINTR");
// handle signal before returning to user space
return_errno_with_message!(Errno::ERESTART, "catch signal")
} }

View File

@ -4,6 +4,7 @@ use core::time::Duration;
use crate::prelude::*; use crate::prelude::*;
mod system_time; mod system_time;
use jinux_frame::timer::read_monotonic_milli_seconds;
pub use system_time::SystemTime; pub use system_time::SystemTime;
pub type clockid_t = i32; pub type clockid_t = i32;
@ -70,3 +71,29 @@ impl From<timeval_t> for Duration {
/// The various flags for setting POSIX.1b interval timers: /// The various flags for setting POSIX.1b interval timers:
pub const TIMER_ABSTIME: i32 = 0x01; pub const TIMER_ABSTIME: i32 = 0x01;
pub fn now_as_duration(clock_id: &ClockID) -> Result<Duration> {
match clock_id {
ClockID::CLOCK_MONOTONIC
| ClockID::CLOCK_MONOTONIC_COARSE
| ClockID::CLOCK_MONOTONIC_RAW => {
let time_ms = read_monotonic_milli_seconds();
let seconds = time_ms / 1000;
let nanos = time_ms % 1000 * 1_000_000;
Ok(Duration::new(seconds, nanos as u32))
}
ClockID::CLOCK_REALTIME | ClockID::CLOCK_REALTIME_COARSE => {
let now = SystemTime::now();
now.duration_since(&SystemTime::UNIX_EPOCH)
}
_ => {
warn!(
"unsupported clock_id: {:?}, treat it as CLOCK_REALTIME",
clock_id
);
let now = SystemTime::now();
now.duration_since(&SystemTime::UNIX_EPOCH)
}
}
}