Support timeout for WaitQueue

This commit is contained in:
Jianfeng Jiang 2023-09-21 10:16:33 +08:00 committed by Tate, Hongliang Tian
parent d28f0db419
commit bd6a4d34ff
11 changed files with 142 additions and 71 deletions

View File

@ -35,11 +35,14 @@ pub fn init() {
}
fn timer_callback(trap_frame: &TrapFrame) {
let current_ms = TICK.fetch_add(1, Ordering::SeqCst);
let current_ticks = TICK.fetch_add(1, Ordering::SeqCst);
let mut timeout_list = TIMEOUT_LIST.get().unwrap().lock();
let mut callbacks: Vec<Arc<TimerCallback>> = Vec::new();
while let Some(t) = timeout_list.peek() {
if t.expire_ms <= current_ms && t.is_enable() {
if t.is_cancelled() {
// Just ignore the cancelled callback
timeout_list.pop();
} else if t.expire_ticks <= current_ticks && t.is_enable() {
callbacks.push(timeout_list.pop().unwrap());
} else {
break;
@ -57,23 +60,25 @@ fn timer_callback(trap_frame: &TrapFrame) {
static TIMEOUT_LIST: Once<SpinLock<BinaryHeap<Arc<TimerCallback>>>> = Once::new();
pub struct TimerCallback {
expire_ms: u64,
expire_ticks: u64,
data: Arc<dyn Any + Send + Sync>,
callback: Box<dyn Fn(&TimerCallback) + Send + Sync>,
enable: AtomicBool,
is_cancelled: AtomicBool,
}
impl TimerCallback {
fn new(
timeout_ms: u64,
timeout_ticks: u64,
data: Arc<dyn Any + Send + Sync>,
callback: Box<dyn Fn(&TimerCallback) + Send + Sync>,
) -> Self {
Self {
expire_ms: timeout_ms,
expire_ticks: timeout_ticks,
data,
callback,
enable: AtomicBool::new(true),
is_cancelled: AtomicBool::new(false),
}
}
@ -94,11 +99,28 @@ impl TimerCallback {
pub fn is_enable(&self) -> bool {
self.enable.load(Ordering::Acquire)
}
/// Whether the set timeout is reached
pub fn is_expired(&self) -> bool {
let current_tick = TICK.load(Ordering::Acquire);
self.expire_ticks <= current_tick
}
/// Cancel a timer callback. If the callback function has not been called,
/// it will never be called again.
pub fn cancel(&self) {
self.is_cancelled.store(true, Ordering::Release);
}
// Whether the timer callback is cancelled.
fn is_cancelled(&self) -> bool {
self.is_cancelled.load(Ordering::Acquire)
}
}
impl PartialEq for TimerCallback {
fn eq(&self, other: &Self) -> bool {
self.expire_ms == other.expire_ms
self.expire_ticks == other.expire_ticks
}
}
@ -112,7 +134,7 @@ impl PartialOrd for TimerCallback {
impl Ord for TimerCallback {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.expire_ms.cmp(&other.expire_ms).reverse()
self.expire_ticks.cmp(&other.expire_ticks).reverse()
}
}

View File

@ -6,7 +6,6 @@ pub enum Error {
PageFault,
AccessDenied,
IoError,
InvalidVmpermBits,
NotEnoughResources,
NoChild,
TimeOut,
}

View File

@ -14,6 +14,7 @@
#![feature(generators)]
#![feature(iter_from_generator)]
#![feature(const_mut_refs)]
#![feature(let_chains)]
extern crate alloc;

View File

@ -26,7 +26,7 @@ impl<T> Mutex<T> {
///
/// This method runs in a block way until the mutex can be acquired.
pub fn lock(&self) -> MutexGuard<T> {
self.queue.wait_until(|| self.try_lock())
self.queue.wait_until(|| self.try_lock(), None).unwrap()
}
/// Try Acquire the mutex immedidately.

View File

@ -35,12 +35,12 @@ impl<T> RwMutex<T> {
/// Acquire a read mutex, and if there is a writer, this thread will sleep in the wait queue.
pub fn read(&self) -> RwMutexReadGuard<T> {
self.queue.wait_until(|| self.try_read())
self.queue.wait_until(|| self.try_read(), None).unwrap()
}
/// Acquire a write mutex, and if there is another writer or other readers, this thread will sleep in the wait queue.
pub fn write(&self) -> RwMutexWriteGuard<T> {
self.queue.wait_until(|| self.try_write())
self.queue.wait_until(|| self.try_write(), None).unwrap()
}
/// Try acquire a read mutex and return immediately if it fails.

View File

@ -1,8 +1,12 @@
use core::sync::atomic::{AtomicBool, Ordering};
use super::SpinLock;
use crate::arch::timer::add_timeout_list;
use crate::config::TIMER_FREQ;
use crate::error::Error;
use crate::prelude::*;
use alloc::{collections::VecDeque, sync::Arc};
use bitflags::bitflags;
use core::sync::atomic::{AtomicBool, Ordering};
use core::time::Duration;
use crate::task::{add_task, current_task, schedule, Task, TaskStatus};
@ -33,21 +37,57 @@ impl WaitQueue {
///
/// By taking a condition closure, his wait-wakeup mechanism becomes
/// more efficient and robust.
pub fn wait_until<F, R>(&self, mut cond: F) -> R
///
/// The only error result is Error::TIMEOUT, which means the timeout is readched
/// and the condition returns false.
pub fn wait_until<F, R>(&self, mut cond: F, timeout: Option<&Duration>) -> Result<R>
where
F: FnMut() -> Option<R>,
{
if let Some(res) = cond() {
return res;
return Ok(res);
}
let waiter = Arc::new(Waiter::new());
self.enqueue(&waiter);
let timer_callback = timeout.map(|timeout| {
let remaining_ticks = {
let ms_per_tick = 1000 / TIMER_FREQ;
// FIXME: We currently require 1000 to be a multiple of TIMER_FREQ, but
// this may not hold true in the future, because TIMER_FREQ can be greater
// than 1000. Then, the code need to be refactored.
debug_assert!(ms_per_tick * TIMER_FREQ == 1000);
// The ticks should be equal to or greater than timeout
(timeout.as_millis() as u64 + ms_per_tick - 1) / ms_per_tick
};
add_timeout_list(remaining_ticks, waiter.clone(), |timer_call_back| {
let waiter = timer_call_back
.data()
.downcast_ref::<Arc<Waiter>>()
.unwrap();
waiter.wake_up();
})
});
loop {
if let Some(res) = cond() {
self.dequeue(&waiter);
return res;
if let Some(timer_callback) = timer_callback {
timer_callback.cancel();
}
return Ok(res);
};
if let Some(ref timer_callback) = timer_callback && timer_callback.is_expired() {
return Err(Error::TimeOut);
}
waiter.wait();
}
}
@ -91,7 +131,7 @@ impl WaitQueue {
}
struct Waiter {
/// Whether the
/// Whether the waiter is woken_up
is_woken_up: AtomicBool,
/// To respect different wait condition
flag: WaiterFlag,

View File

@ -2,7 +2,7 @@
#[cfg(target_arch = "x86_64")]
use crate::arch::x86::timer::{add_timeout_list, TimerCallback, TICK};
use crate::sync::Mutex;
use crate::sync::SpinLock;
use crate::{config::TIMER_FREQ, prelude::*};
use core::{sync::atomic::Ordering, time::Duration};
@ -19,7 +19,7 @@ pub use crate::arch::x86::timer::read_monotonic_milli_seconds;
/// in order to trigger the callback again.
pub struct Timer {
function: Arc<dyn Fn(Arc<Self>) + Send + Sync>,
inner: Mutex<TimerInner>,
inner: SpinLock<TimerInner>,
}
#[derive(Default)]
struct TimerInner {
@ -48,7 +48,7 @@ impl Timer {
{
Ok(Arc::new(Self {
function: Arc::new(f),
inner: Mutex::new(TimerInner::default()),
inner: SpinLock::new(TimerInner::default()),
}))
}
@ -57,7 +57,7 @@ impl Timer {
/// If a timeout value is already set, the timeout value will be refreshed.
///
pub fn set(self: Arc<Self>, timeout: Duration) {
let mut lock = self.inner.lock();
let mut lock = self.inner.lock_irq_disabled();
match &lock.timer_callback {
Some(callback) => {
callback.disable();
@ -76,7 +76,7 @@ impl Timer {
///
/// If the timer is not set, then the remaining timeout value is zero.
pub fn remain(&self) -> Duration {
let lock = self.inner.lock();
let lock = self.inner.lock_irq_disabled();
let tick_remain = {
let tick = TICK.load(Ordering::SeqCst) as i64;
lock.timeout_tick as i64 - tick
@ -92,7 +92,7 @@ impl Timer {
/// Clear the timeout value.
pub fn clear(&self) {
let mut lock = self.inner.lock();
let mut lock = self.inner.lock_irq_disabled();
if let Some(callback) = &lock.timer_callback {
callback.disable();
}

View File

@ -243,7 +243,7 @@ impl TryFrom<u64> for VmPerm {
type Error = Error;
fn try_from(value: u64) -> Result<Self> {
VmPerm::from_bits(value as u8).ok_or(Error::InvalidVmpermBits)
VmPerm::from_bits(value as u8).ok_or(Error::InvalidArgs)
}
}

View File

@ -187,8 +187,7 @@ impl From<jinux_frame::Error> for Error {
jinux_frame::Error::IoError => Error::new(Errno::EIO),
jinux_frame::Error::NotEnoughResources => Error::new(Errno::EBUSY),
jinux_frame::Error::PageFault => Error::new(Errno::EFAULT),
jinux_frame::Error::InvalidVmpermBits => Error::new(Errno::EINVAL),
jinux_frame::Error::NoChild => Error::new(Errno::ECHILD),
jinux_frame::Error::TimeOut => Error::new(Errno::ETIME),
}
}
}

View File

@ -203,14 +203,19 @@ impl EventCounter {
}
pub fn read(&self) -> usize {
self.wait_queue.wait_until(|| {
let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 {
Some(val)
} else {
None
}
})
self.wait_queue
.wait_until(
|| {
let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 {
Some(val)
} else {
None
}
},
None,
)
.unwrap()
}
pub fn write(&self) {

View File

@ -26,46 +26,51 @@ pub fn wait_child_exit(
wait_options: WaitOptions,
) -> Result<(Pid, ExitCode)> {
let current = current!();
let (pid, exit_code) = current.waiting_children().wait_until(|| {
let children_lock = current.children().lock();
let unwaited_children = children_lock
.iter()
.filter(|(pid, child)| match child_filter {
ProcessFilter::Any => true,
ProcessFilter::WithPid(pid) => child.pid() == pid,
ProcessFilter::WithPgid(pgid) => child.pgid() == pgid,
})
.map(|(_, child)| child.clone())
.collect::<Vec<_>>();
// we need to drop the lock here, since reap child process need to acquire this lock again
drop(children_lock);
let (pid, exit_code) = current.waiting_children().wait_until(
|| {
let unwaited_children = current
.children()
.lock()
.values()
.filter(|child| match child_filter {
ProcessFilter::Any => true,
ProcessFilter::WithPid(pid) => child.pid() == pid,
ProcessFilter::WithPgid(pgid) => child.pgid() == pgid,
})
.cloned()
.collect::<Vec<_>>();
if unwaited_children.is_empty() {
return Some(Err(jinux_frame::Error::NoChild));
}
// return immediately if we find a zombie child
let zombie_child = unwaited_children.iter().find(|child| child.is_zombie());
if let Some(zombie_child) = zombie_child {
let zombie_pid = zombie_child.pid();
let exit_code = zombie_child.exit_code().unwrap();
if wait_options.contains(WaitOptions::WNOWAIT) {
// does not reap child, directly return
return Some(Ok((zombie_pid, exit_code)));
} else {
let exit_code = reap_zombie_child(&current, zombie_pid);
return Some(Ok((zombie_pid, exit_code)));
if unwaited_children.is_empty() {
return Some(Err(Error::with_message(
Errno::ECHILD,
"the process has no child to wait",
)));
}
}
if wait_options.contains(WaitOptions::WNOHANG) {
return Some(Ok((0, 0)));
}
// return immediately if we find a zombie child
let zombie_child = unwaited_children.iter().find(|child| child.is_zombie());
// wait
None
})?;
if let Some(zombie_child) = zombie_child {
let zombie_pid = zombie_child.pid();
let exit_code = zombie_child.exit_code().unwrap();
if wait_options.contains(WaitOptions::WNOWAIT) {
// does not reap child, directly return
return Some(Ok((zombie_pid, exit_code)));
} else {
let exit_code = reap_zombie_child(&current, zombie_pid);
return Some(Ok((zombie_pid, exit_code)));
}
}
if wait_options.contains(WaitOptions::WNOHANG) {
return Some(Ok((0, 0)));
}
// wait
None
},
None,
)??;
Ok((pid, exit_code as _))
}