Add condition variable implementation

This commit is contained in:
Fabing Li
2024-05-31 15:48:12 +08:00
committed by Tate, Hongliang Tian
parent c04fd0c9a0
commit 34e9d71fe4
6 changed files with 386 additions and 3 deletions

View File

@ -124,3 +124,9 @@ impl<T: ?Sized + fmt::Debug, R: Deref<Target = Mutex<T>>> fmt::Debug for MutexGu
impl<T: ?Sized, R: Deref<Target = Mutex<T>>> !Send for MutexGuard_<T, R> {} impl<T: ?Sized, R: Deref<Target = Mutex<T>>> !Send for MutexGuard_<T, R> {}
unsafe impl<T: ?Sized + Sync, R: Deref<Target = Mutex<T>> + Sync> Sync for MutexGuard_<T, R> {} unsafe impl<T: ?Sized + Sync, R: Deref<Target = Mutex<T>> + Sync> Sync for MutexGuard_<T, R> {}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
pub fn get_lock(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
guard.mutex
}
}

View File

@ -14,6 +14,7 @@ mod program_loader;
mod rlimit; mod rlimit;
pub mod signal; pub mod signal;
mod status; mod status;
pub mod sync;
mod term_status; mod term_status;
mod wait; mod wait;

View File

@ -0,0 +1,363 @@
// SPDX-License-Identifier: MPL-2.0
use alloc::sync::Arc;
use core::time::Duration;
use aster_frame::sync::{MutexGuard, SpinLock, WaitQueue};
use crate::time::wait::WaitTimeout;
/// Represents potential errors during lock operations on synchronization primitives,
/// specifically for operations associated with a `Condvar` (Condition Variable).
pub enum LockErr<Guard> {
Timeout(Guard),
Unknown(Guard),
}
/// LockResult, different from Rust std.
/// The result of a lock operation.
pub type LockResult<Guard> = Result<Guard, LockErr<Guard>>;
impl<Guard> LockErr<Guard> {
pub fn into_guard(self) -> Guard {
match self {
LockErr::Timeout(guard) => guard,
LockErr::Unknown(guard) => guard,
}
}
}
/// A `Condvar` (Condition Variable) is a synchronization primitive that can block threads
/// until a certain condition becomes true.
///
/// Although a `Condvar` can block threads, it is primarily used to achieve thread synchronization.
/// Threads waiting on a `Condvar` must acquire a mutex before proceeding. This setup is commonly
/// used with a shared mutable state to ensure safe concurrent access. A typical use involves one
/// or more threads waiting for a condition to become true to proceed with their operations.
///
/// # Usage
///
/// Pair a `Condvar` with a `Mutex` to allow threads to wait for certain conditions safely.
/// A waiting thread will sleep and atomically release the associated mutex.
/// Another thread can then update the shared state and notify the `Condvar`, allowing the
/// waiting thread to reacquire the mutex and proceed.
///
/// ## Example
///
/// This example demonstrates how a `Condvar` can synchronize threads:
///
/// ```rust
/// use alloc::sync::Arc;
/// use aster_frame::sync::Mutex;
/// use crate::{process::sync::Condvar, thread::{kernel_thread::KernelThreadExt, Thread}};
///
/// // Initializing a shared condition between threads
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// // Spawning a new kernel thread to change a shared state and notify the Condvar
/// Thread::spawn_kernel_thread(ThreadOptions::new(move || {
/// let (lock, cvar) = &*pair2;
/// Thread::yield_now();
/// let mut started = lock.lock();
/// *started = true; // Modifying the shared state
/// cvar.notify_one(); // Notifying one waiting thread
/// }));
///
/// // Main thread waiting for the shared state to be set to true
/// {
/// let (lock, cvar) = &*pair;
/// let mut started = lock.lock();
/// while !*started {
/// started = cvar.wait(started).unwrap_or_else(|err| err.into_guard());
/// }
/// }
/// ```
///
/// In this example, the main thread and a child thread synchronize access to a boolean flag
/// using a `Mutex` and a `Condvar`.
/// The main thread waits for the flag to be set to `true`,
/// utilizing the `Condvar` to sleep efficiently until the condition is met.
pub struct Condvar {
waitqueue: Arc<WaitQueue>,
counter: SpinLock<Inner>,
}
struct Inner {
waiter_count: u64,
notify_count: u64,
}
impl Condvar {
/// Creates a new condition variable.
pub fn new() -> Self {
Condvar {
waitqueue: Arc::new(WaitQueue::new()),
counter: SpinLock::new(Inner {
waiter_count: 0,
notify_count: 0,
}),
}
}
/// Atomically releases the given `MutexGuard`,
/// blocking the current thread until the condition variable
/// is notified, after which the mutex will be reacquired.
///
/// Returns a new `MutexGuard` if the operation is successful,
/// or returns the provided guard
/// within a `LockErr` if the waiting operation fails.
pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> LockResult<MutexGuard<'a, T>> {
let cond = || {
// Check if the notify counter is greater than 0.
let mut counter = self.counter.lock();
if counter.notify_count > 0 {
// Decrement the notify counter.
counter.notify_count -= 1;
Some(())
} else {
None
}
};
{
let mut counter = self.counter.lock();
counter.waiter_count += 1;
}
let lock = MutexGuard::get_lock(&guard);
drop(guard);
self.waitqueue.wait_until(cond);
Ok(lock.lock())
}
/// Waits for the condition variable to be signaled or broadcasted,
/// or a timeout to elapse.
/// bool is true if the timeout is reached.
///
/// The function returns a tuple containing a `MutexGuard`
/// and a boolean that is true if the timeout elapsed
/// before the condition variable was notified.
pub fn wait_timeout<'a, T>(
&self,
guard: MutexGuard<'a, T>,
timeout: Duration,
) -> LockResult<(MutexGuard<'a, T>, bool)> {
let cond = || {
// Check if the notify counter is greater than 0.
let mut counter = self.counter.lock();
if counter.notify_count > 0 {
// Decrement the notify counter.
counter.notify_count -= 1;
Some(())
} else {
None
}
};
{
let mut counter = self.counter.lock();
counter.waiter_count += 1;
}
let lock = MutexGuard::get_lock(&guard);
drop(guard);
// Wait until the condition becomes true, we're explicitly woken up, or the timeout elapses.
let res = self.waitqueue.wait_until_or_timeout(cond, &timeout);
match res {
Some(_) => Ok((lock.lock(), false)),
None => {
let mut counter = self.counter.lock();
counter.waiter_count -= 1;
Err(LockErr::Timeout((lock.lock(), true)))
}
}
}
/// Wait for the condition to become true,
/// or until the timeout elapses,
/// or until the condition is explicitly woken up.
/// bool is true if the timeout is reached.
///
/// Similar to `wait_timeout`,
/// it returns a tuple containing the `MutexGuard`
/// and a boolean value indicating
/// whether the wait operation terminated due to a timeout.
pub fn wait_timeout_while<'a, T, F>(
&self,
mut guard: MutexGuard<'a, T>,
timeout: Duration,
mut condition: F,
) -> LockResult<(MutexGuard<'a, T>, bool)>
where
F: FnMut(&mut T) -> bool,
{
loop {
if !condition(&mut *guard) {
return Ok((guard, false));
}
guard = match self.wait_timeout(guard, timeout) {
Ok((guard, timeout_flag)) => guard,
Err(LockErr::Timeout((guard, timeout_flag))) => {
return Err(LockErr::Timeout((guard, timeout_flag)))
}
Err(LockErr::Unknown(guard)) => return Err(LockErr::Unknown(guard)),
}
}
}
/// Wait for the condition to become true,
/// and until the condition is explicitly woken up or interupted.
///
/// This function blocks until either the condition becomes false
/// or the condition variable is explicitly notified.
/// Returns the `MutexGuard` if the operation completes successfully.
pub fn wait_while<'a, T, F>(
&self,
mut guard: MutexGuard<'a, T>,
mut condition: F,
) -> LockResult<MutexGuard<'a, T>>
where
F: FnMut(&mut T) -> bool,
{
loop {
if !condition(&mut *guard) {
return Ok(guard);
}
guard = match self.wait(guard) {
Ok(guard) => guard,
Err(LockErr::Unknown(guard)) => return Err(LockErr::Unknown(guard)),
_ => unreachable!(),
}
}
}
/// Wakes up one blocked thread waiting on this condition variable.
///
/// If there is a waiting thread, it will be unblocked
/// and allowed to reacquire the associated mutex.
/// If no threads are waiting, this function is a no-op.
pub fn notify_one(&self) {
let mut counter = self.counter.lock();
if counter.waiter_count == 0 {
return;
}
counter.notify_count += 1;
self.waitqueue.wake_one();
counter.waiter_count -= 1;
}
/// Wakes up all blocked threads waiting on this condition variable.
///
/// This method will unblock all waiting threads
/// and they will be allowed to reacquire the associated mutex.
/// If no threads are waiting, this function is a no-op.
pub fn notify_all(&self) {
let mut counter = self.counter.lock();
if counter.waiter_count == 0 {
return;
}
counter.notify_count = counter.waiter_count;
self.waitqueue.wake_all();
counter.waiter_count = 0;
}
}
#[cfg(ktest)]
mod test {
use aster_frame::sync::Mutex;
use super::*;
use crate::thread::{
kernel_thread::{KernelThreadExt, ThreadOptions},
Thread,
};
#[ktest]
fn test_condvar_wait() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
Thread::spawn_kernel_thread(ThreadOptions::new(move || {
Thread::yield_now();
let (lock, cvar) = &*pair2;
let mut started = lock.lock();
*started = true;
cvar.notify_one();
}));
{
let (lock, cvar) = &*pair;
let mut started = lock.lock();
while !*started {
started = cvar.wait(started).unwrap_or_else(|err| err.into_guard());
}
assert_eq!(*started, true);
}
}
#[ktest]
fn test_condvar_wait_timeout() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
Thread::spawn_kernel_thread(ThreadOptions::new(move || {
Thread::yield_now();
let (lock, cvar) = &*pair2;
let mut started = lock.lock();
*started = true;
cvar.notify_one();
}));
{
let (lock, cvar) = &*pair;
let mut started = lock.lock();
while !*started {
(started, _) = cvar
.wait_timeout(started, Duration::from_secs(1))
.unwrap_or_else(|err| err.into_guard());
}
assert_eq!(*started, true);
}
}
#[ktest]
fn test_condvar_wait_while() {
let pair = Arc::new((Mutex::new(true), Condvar::new()));
let pair2 = Arc::clone(&pair);
Thread::spawn_kernel_thread(ThreadOptions::new(move || {
Thread::yield_now();
let (lock, cvar) = &*pair2;
let mut started = lock.lock();
*started = false;
cvar.notify_one();
}));
{
let (lock, cvar) = &*pair;
let started = cvar
.wait_while(lock.lock(), |started| *started)
.unwrap_or_else(|err| err.into_guard());
assert_eq!(*started, false);
}
}
#[ktest]
fn test_condvar_wait_timeout_while() {
let pair = Arc::new((Mutex::new(true), Condvar::new()));
let pair2 = Arc::clone(&pair);
Thread::spawn_kernel_thread(ThreadOptions::new(move || {
Thread::yield_now();
let (lock, cvar) = &*pair2;
let mut started = lock.lock();
*started = false;
cvar.notify_one();
}));
{
let (lock, cvar) = &*pair;
let (started, _) = cvar
.wait_timeout_while(lock.lock(), Duration::from_secs(1), |started| *started)
.unwrap_or_else(|err| err.into_guard());
assert_eq!(*started, false);
}
}
}

View File

@ -0,0 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
mod condvar;
#[allow(unused_imports)]
pub use self::condvar::{Condvar, LockErr};

View File

@ -288,4 +288,10 @@ pub fn init_for_ktest() {
let clock = RealTimeClock { _private: () }; let clock = RealTimeClock { _private: () };
TimerManager::new(Arc::new(clock)) TimerManager::new(Arc::new(clock))
}); });
CLOCK_REALTIME_COARSE_INSTANCE.call_once(|| Arc::new(RealTimeCoarseClock { _private: () }));
RealTimeCoarseClock::current_ref().call_once(|| SpinLock::new(Duration::from_secs(0)));
JIFFIES_TIMER_MANAGER.call_once(|| {
let clock = JiffiesClock { _private: () };
TimerManager::new(Arc::new(clock))
});
} }

View File

@ -12,7 +12,7 @@
//! use. It also hooks up the VDSO data update routine to the time management subsystem for periodic updates. //! use. It also hooks up the VDSO data update routine to the time management subsystem for periodic updates.
use alloc::{boxed::Box, sync::Arc}; use alloc::{boxed::Box, sync::Arc};
use core::time::Duration; use core::{mem::ManuallyDrop, time::Duration};
use aster_frame::{ use aster_frame::{
sync::SpinLock, sync::SpinLock,
@ -319,8 +319,9 @@ pub(super) fn init() {
// Coarse resolution clock IDs directly read the instant stored in VDSO data without // Coarse resolution clock IDs directly read the instant stored in VDSO data without
// using coefficients for calculation, thus the related instant requires more frequent updating. // using coefficients for calculation, thus the related instant requires more frequent updating.
let coarse_instant_timer = let coarse_instant_timer = ManuallyDrop::new(
MonotonicClock::timer_manager().create_timer(update_vdso_coarse_res_instant); MonotonicClock::timer_manager().create_timer(update_vdso_coarse_res_instant),
);
coarse_instant_timer.set_interval(Duration::from_millis(100)); coarse_instant_timer.set_interval(Duration::from_millis(100));
coarse_instant_timer.set_timeout(Duration::from_millis(100)); coarse_instant_timer.set_timeout(Duration::from_millis(100));
} }