Reorganize the codebase

This commit is contained in:
Jianfeng Jiang
2023-04-09 23:12:42 -04:00
committed by Tate, Hongliang Tian
parent 888853a6de
commit 271a16d492
416 changed files with 67 additions and 53 deletions

View File

@ -0,0 +1,391 @@
use core::fmt::{self};
use core::sync::atomic::{AtomicU64, Ordering::Relaxed};
use align_ext::AlignExt;
use crate::prelude::*;
/// A fixed number of bits taht can be safely shared between threads.
pub struct AtomicBits {
num_bits: usize,
u64s: Box<[AtomicU64]>,
}
impl AtomicBits {
/// Create a given number of bit 0s.
pub fn new_zeroes(num_bits: usize) -> Self {
Self::new(0, num_bits)
}
/// Create a given number of bit 1s.
pub fn new_ones(num_bits: usize) -> Self {
Self::new(!0, num_bits)
}
fn new(u64_val: u64, num_bits: usize) -> Self {
let num_u64s = num_bits.align_up(64) / 64;
let u64s = {
let mut u64s = Vec::with_capacity(num_u64s);
for _ in 0..num_u64s {
u64s.push(AtomicU64::new(u64_val));
}
u64s.into_boxed_slice()
};
Self { num_bits, u64s }
}
/// Returns the length in bits.
pub fn len(&self) -> usize {
self.num_bits
}
/// Get the bit at a given position.
pub fn get(&self, index: usize) -> bool {
assert!(index < self.num_bits);
let i = index / 64;
let j = index % 64;
// Safety. Variable i is in range as variable index is in range.
let u64_atomic = unsafe { self.u64s.get_unchecked(i) };
(u64_atomic.load(Relaxed) & 1 << j) != 0
}
/// Set the bit at a given position.
pub fn set(&self, index: usize, new_bit: bool) {
assert!(index < self.num_bits);
let i = index / 64;
let j = index % 64;
// Safety. Variable i is in range as variable index is in range.
let u64_atomic = unsafe { self.u64s.get_unchecked(i) };
if new_bit {
u64_atomic.fetch_or(1 << j, Relaxed);
} else {
u64_atomic.fetch_and(!(1 << j), Relaxed);
}
}
/// Clear all the bits.
pub fn clear(&self) {
todo!()
}
/// Are all bits ones.
pub fn is_full(&self) -> bool {
self.match_pattern(!0)
}
/// Are all bits zeroes.
pub fn is_empty(&self) -> bool {
self.match_pattern(0)
}
fn match_pattern(&self, pattern: u64) -> bool {
todo!()
}
/// Get an iterator for the bits.
pub fn iter<'a>(&'a self) -> Iter<'a> {
Iter::new(self)
}
/// Get an iterator that gives the positions of all 1s in the bits.
pub fn iter_ones<'a>(&'a self) -> OnesIter<'a> {
OnesIter::new(self)
}
/// Get an iterator that gives the positions of all 0s in the bits.
pub fn iter_zeroes<'a>(&'a self) -> ZeroesIter<'a> {
ZeroesIter::new(self)
}
}
/// An iterator that accesses the bits of an `AtomicBits`.
pub struct Iter<'a> {
bits: &'a AtomicBits,
bit_i: usize,
}
impl<'a> Iter<'a> {
fn new(bits: &'a AtomicBits) -> Self {
Self { bits, bit_i: 0 }
}
}
impl<'a> Iterator for Iter<'a> {
type Item = bool;
fn next(&mut self) -> Option<bool> {
if self.bit_i < self.bits.len() {
let bit = self.bits.get(self.bit_i);
self.bit_i += 1;
Some(bit)
} else {
None
}
}
}
/// An iterator that returns the positions of 1s in an `AtomicBits`.
pub struct OnesIter<'a> {
bits: &'a AtomicBits,
u64_idx: usize,
u64_val: u64,
num_garbage_bits_in_last_u64: u8,
}
impl<'a> OnesIter<'a> {
fn new(bits: &'a AtomicBits) -> Self {
let num_garbage_bits_in_last_u64 = {
if bits.len() % 64 != 0 {
64 - ((bits.len() % 64) as u8)
} else {
0
}
};
let mut new_self = Self {
bits,
u64_idx: 0,
u64_val: 0, // NOT initalized yet!
num_garbage_bits_in_last_u64,
};
new_self.u64_val = new_self.get_u64_val(0);
new_self
}
/// Get the u64 value at the given position, removing the garbage bits if any.
fn get_u64_val(&self, idx: usize) -> u64 {
let mut u64_val = self.bits.u64s[idx].load(Relaxed);
// Clear the garbage bits, if any, in the last u64 so that they
// won't affect the result of the iterator.
if idx == self.bits.u64s.len() - 1 && self.num_garbage_bits_in_last_u64 > 0 {
let num_valid_bits_in_last_u64 = 64 - self.num_garbage_bits_in_last_u64;
let valid_bits_mask = (1 << num_valid_bits_in_last_u64) - 1;
u64_val &= valid_bits_mask;
}
u64_val
}
}
impl<'a> Iterator for OnesIter<'a> {
type Item = usize;
fn next(&mut self) -> Option<usize> {
loop {
if self.u64_idx >= self.bits.u64s.len() {
return None;
}
let first_one_in_u64 = self.u64_val.trailing_zeros() as usize;
if first_one_in_u64 < 64 {
self.u64_val &= !(1 << first_one_in_u64);
let one_pos = self.u64_idx * 64 + first_one_in_u64;
return Some(one_pos);
}
self.u64_idx += 1;
if self.u64_idx < self.bits.u64s.len() {
self.u64_val = self.get_u64_val(self.u64_idx);
}
}
}
}
/// An iterator that returns the positions of 0s in an `AtomicBits`.
pub struct ZeroesIter<'a> {
bits: &'a AtomicBits,
u64_idx: usize,
u64_val: u64,
num_garbage_bits_in_last_u64: u8,
}
impl<'a> ZeroesIter<'a> {
fn new(bits: &'a AtomicBits) -> Self {
let num_garbage_bits_in_last_u64 = {
if bits.len() % 64 != 0 {
64 - ((bits.len() % 64) as u8)
} else {
0
}
};
let mut new_self = Self {
bits,
u64_idx: 0,
u64_val: 0, // NOT initalized yet!
num_garbage_bits_in_last_u64,
};
new_self.u64_val = new_self.get_u64_val(0);
new_self
}
/// Get the u64 value at the given position, removing the garbage bits if any.
fn get_u64_val(&self, idx: usize) -> u64 {
let mut u64_val = self.bits.u64s[idx].load(Relaxed);
// Set all garbage bits, if any, in the last u64 so that they
// won't affect the result of the iterator.
if idx == self.bits.u64s.len() - 1 && self.num_garbage_bits_in_last_u64 > 0 {
let num_valid_bits_in_last_u64 = 64 - self.num_garbage_bits_in_last_u64;
let garbage_bits_mask = !((1 << num_valid_bits_in_last_u64) - 1);
u64_val |= garbage_bits_mask;
}
u64_val
}
}
impl<'a> Iterator for ZeroesIter<'a> {
type Item = usize;
fn next(&mut self) -> Option<usize> {
loop {
if self.u64_idx >= self.bits.u64s.len() {
return None;
}
let first_zero_in_u64 = self.u64_val.trailing_ones() as usize;
if first_zero_in_u64 < 64 {
self.u64_val |= 1 << first_zero_in_u64;
let one_pos = self.u64_idx * 64 + first_zero_in_u64;
return Some(one_pos);
}
self.u64_idx += 1;
if self.u64_idx < self.bits.u64s.len() {
self.u64_val = self.get_u64_val(self.u64_idx);
}
}
}
}
impl Clone for AtomicBits {
fn clone(&self) -> Self {
let num_bits = self.num_bits;
let num_u64s = self.u64s.len();
let u64s = {
let mut u64s = Vec::with_capacity(num_u64s);
for u64_i in 0..num_u64s {
let u64_val = self.u64s[u64_i].load(Relaxed);
u64s.push(AtomicU64::new(u64_val));
}
u64s.into_boxed_slice()
};
Self { num_bits, u64s }
}
}
impl fmt::Debug for AtomicBits {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AtomicBits(")?;
for bit in self.iter() {
if bit {
write!(f, "1")?;
} else {
write!(f, "0")?;
}
}
write!(f, ")")
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn new() {
let bits = AtomicBits::new_zeroes(1);
assert!(bits.len() == 1);
let bits = AtomicBits::new_zeroes(128);
assert!(bits.len() == 128);
let bits = AtomicBits::new_ones(7);
assert!(bits.len() == 7);
let bits = AtomicBits::new_zeroes(65);
assert!(bits.len() == 65);
}
#[test]
fn set_get() {
let bits = AtomicBits::new_zeroes(128);
for i in 0..bits.len() {
assert!(bits.get(i) == false);
bits.set(i, true);
assert!(bits.get(i) == true);
bits.set(i, false);
assert!(bits.get(i) == false);
}
let bits = AtomicBits::new_ones(128);
for i in 0..bits.len() {
assert!(bits.get(i) == true);
bits.set(i, false);
assert!(bits.get(i) == false);
bits.set(i, true);
assert!(bits.get(i) == true);
}
}
#[test]
fn iter_ones() {
let bits = AtomicBits::new_zeroes(1);
assert!(bits.iter_ones().count() == 0);
let bits = AtomicBits::new_zeroes(400);
assert!(bits.iter_ones().count() == 0);
let bits = AtomicBits::new_ones(1);
assert!(bits.iter_ones().count() == 1);
let bits = AtomicBits::new_ones(24);
assert!(bits.iter_ones().count() == 24);
let bits = AtomicBits::new_ones(64);
assert!(bits.iter_ones().count() == 64);
let bits = AtomicBits::new_ones(77);
assert!(bits.iter_ones().count() == 77);
let bits = AtomicBits::new_ones(128);
assert!(bits.iter_ones().count() == 128);
let bits = AtomicBits::new_zeroes(8);
bits.set(1, true);
bits.set(3, true);
bits.set(5, true);
assert!(bits.iter_ones().count() == 3);
}
#[test]
fn iter_zeroes() {
let bits = AtomicBits::new_ones(1);
assert!(bits.iter_zeroes().count() == 0);
let bits = AtomicBits::new_ones(130);
assert!(bits.iter_zeroes().count() == 0);
let bits = AtomicBits::new_zeroes(1);
assert!(bits.iter_zeroes().count() == 1);
let bits = AtomicBits::new_zeroes(24);
assert!(bits.iter_zeroes().count() == 24);
let bits = AtomicBits::new_zeroes(64);
assert!(bits.iter_zeroes().count() == 64);
let bits = AtomicBits::new_zeroes(77);
assert!(bits.iter_zeroes().count() == 77);
let bits = AtomicBits::new_zeroes(128);
assert!(bits.iter_zeroes().count() == 128);
let bits = AtomicBits::new_ones(96);
bits.set(1, false);
bits.set(3, false);
bits.set(5, false);
bits.set(64, false);
bits.set(76, false);
assert!(bits.iter_zeroes().count() == 5);
}
#[test]
fn iter() {
let bits = AtomicBits::new_zeroes(7);
assert!(bits.iter().all(|bit| bit == false));
let bits = AtomicBits::new_ones(128);
assert!(bits.iter().all(|bit| bit == true));
}
}

View File

@ -0,0 +1,11 @@
mod atomic_bits;
mod mutex;
mod rcu;
mod spin;
mod wait;
pub use self::atomic_bits::AtomicBits;
pub use self::mutex::{Mutex, MutexGuard};
pub use self::rcu::{pass_quiescent_state, OwnerPtr, Rcu, RcuReadGuard, RcuReclaimer};
pub use self::spin::{SpinLock, SpinLockGuard};
pub use self::wait::WaitQueue;

View File

@ -0,0 +1,60 @@
use super::spin::{SpinLock, SpinLockGuard};
use core::ops::{Deref, DerefMut};
use core::fmt;
pub struct Mutex<T> {
inner: SpinLock<T>,
}
impl<T> Mutex<T> {
#[inline(always)]
pub const fn new(val: T) -> Self {
Self {
inner: SpinLock::new(val),
}
}
pub fn lock(&self) -> MutexGuard<T> {
MutexGuard {
lock: self.inner.lock(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.inner, f)
}
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
pub struct MutexGuard<'a, T> {
lock: SpinLockGuard<'a, T>,
}
impl<'a, T> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.lock.deref()
}
}
impl<'a, T> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.lock.deref_mut()
}
}
impl<'a, T: fmt::Debug> fmt::Debug for MutexGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<'a, T> !Send for MutexGuard<'a, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}

View File

@ -0,0 +1,102 @@
//! Read-copy update (RCU).
use core::marker::PhantomData;
use core::ops::Deref;
use core::sync::atomic::{
AtomicPtr,
Ordering::{AcqRel, Acquire},
};
use self::monitor::RcuMonitor;
use crate::prelude::*;
use crate::sync::WaitQueue;
mod monitor;
mod owner_ptr;
pub use owner_ptr::OwnerPtr;
pub struct Rcu<P: OwnerPtr> {
ptr: AtomicPtr<<P as OwnerPtr>::Target>,
marker: PhantomData<P::Target>,
}
impl<P: OwnerPtr> Rcu<P> {
pub fn new(ptr: P) -> Self {
let ptr = AtomicPtr::new(OwnerPtr::into_raw(ptr) as *mut _);
Self {
ptr,
marker: PhantomData,
}
}
pub fn get(&self) -> RcuReadGuard<'_, P> {
let obj = unsafe { &*self.ptr.load(Acquire) };
RcuReadGuard { obj, rcu: self }
}
}
impl<P: OwnerPtr + Send> Rcu<P> {
pub fn replace(&self, new_ptr: P) -> RcuReclaimer<P> {
let new_ptr = <P as OwnerPtr>::into_raw(new_ptr) as *mut _;
let old_ptr = {
let old_raw_ptr = self.ptr.swap(new_ptr, AcqRel);
unsafe { <P as OwnerPtr>::from_raw(old_raw_ptr) }
};
RcuReclaimer { ptr: old_ptr }
}
}
pub struct RcuReadGuard<'a, P: OwnerPtr> {
obj: &'a <P as OwnerPtr>::Target,
rcu: &'a Rcu<P>,
}
impl<'a, P: OwnerPtr> Deref for RcuReadGuard<'a, P> {
type Target = <P as OwnerPtr>::Target;
fn deref(&self) -> &Self::Target {
self.obj
}
}
#[repr(transparent)]
pub struct RcuReclaimer<P> {
ptr: P,
}
impl<P: Send + 'static> RcuReclaimer<P> {
pub fn delay(mut self) {
let ptr: P = unsafe {
let ptr = core::mem::replace(&mut self.ptr, core::mem::uninitialized());
core::mem::forget(self);
ptr
};
get_singleton().after_grace_period(move || {
drop(ptr);
});
}
}
impl<P> Drop for RcuReclaimer<P> {
fn drop(&mut self) {
let wq = Arc::new(WaitQueue::new());
get_singleton().after_grace_period({
let wq = wq.clone();
move || {
wq.wake_one();
}
});
wq.wait_until(|| Some(0u8));
}
}
pub unsafe fn pass_quiescent_state() {
get_singleton().pass_quiescent_state()
}
fn get_singleton() -> &'static RcuMonitor {
todo!()
}

View File

@ -0,0 +1,139 @@
use alloc::collections::VecDeque;
use core::sync::atomic::{
AtomicBool,
Ordering::{Acquire, Relaxed, Release},
};
#[cfg(target_arch = "x86_64")]
use crate::arch::x86::cpu;
use crate::prelude::*;
use crate::sync::AtomicBits;
use crate::sync::SpinLock;
/// A RCU monitor ensures the completion of _grace periods_ by keeping track
/// of each CPU's passing _quiescent states_.
pub struct RcuMonitor {
is_monitoring: AtomicBool,
state: SpinLock<State>,
}
impl RcuMonitor {
pub fn new(num_cpus: u32) -> Self {
Self {
is_monitoring: AtomicBool::new(false),
state: SpinLock::new(State::new(num_cpus)),
}
}
pub unsafe fn pass_quiescent_state(&self) {
// Fast path
if !self.is_monitoring.load(Relaxed) {
return;
}
// Check if the current GP is complete after passing the quiescent state
// on the current CPU. If GP is complete, take the callbacks of the current
// GP.
let callbacks = {
let mut state = self.state.lock();
if state.current_gp.is_complete() {
return;
}
state.current_gp.pass_quiescent_state();
if !state.current_gp.is_complete() {
return;
}
// Now that the current GP is complete, take its callbacks
let current_callbacks = state.current_gp.take_callbacks();
// Check if we need to70G watch for a next GP
if !state.next_callbacks.is_empty() {
let callbacks = core::mem::take(&mut state.next_callbacks);
state.current_gp.restart(callbacks);
} else {
self.is_monitoring.store(false, Relaxed);
}
current_callbacks
};
// Invoke the callbacks to notify the completion of GP
for f in callbacks {
(f)();
}
}
pub fn after_grace_period<F>(&self, f: F)
where
F: FnOnce() -> () + Send + 'static,
{
let mut state = self.state.lock();
state.next_callbacks.push_back(Box::new(f));
if !state.current_gp.is_complete() {
return;
}
let callbacks = core::mem::take(&mut state.next_callbacks);
state.current_gp.restart(callbacks);
self.is_monitoring.store(true, Relaxed);
}
}
struct State {
current_gp: GracePeriod,
next_callbacks: Callbacks,
}
impl State {
pub fn new(num_cpus: u32) -> Self {
Self {
current_gp: GracePeriod::new(num_cpus),
next_callbacks: VecDeque::new(),
}
}
}
type Callbacks = VecDeque<Box<dyn FnOnce() -> () + Send + 'static>>;
struct GracePeriod {
callbacks: Callbacks,
cpu_mask: AtomicBits,
is_complete: bool,
}
impl GracePeriod {
pub fn new(num_cpus: u32) -> Self {
Self {
callbacks: Default::default(),
cpu_mask: AtomicBits::new_zeroes(num_cpus as usize),
is_complete: false,
}
}
pub fn is_complete(&self) -> bool {
self.is_complete
}
pub unsafe fn pass_quiescent_state(&mut self) {
let this_cpu = cpu::this_cpu();
self.cpu_mask.set(this_cpu as usize, true);
if self.cpu_mask.is_full() {
self.is_complete = true;
}
}
pub fn take_callbacks(&mut self) -> Callbacks {
core::mem::take(&mut self.callbacks)
}
pub fn restart(&mut self, callbacks: Callbacks) {
self.is_complete = false;
self.cpu_mask.clear();
self.callbacks = callbacks;
}
}

View File

@ -0,0 +1,78 @@
use core::ptr::NonNull;
use crate::prelude::*;
/// A trait that abstracts pointers that have the ownership of the objects they
/// refer to.
///
/// The most typical examples smart pointer types like `Box<T>` and `Arc<T>`.
///
/// which can be converted to and from the raw pointer type of `*const T`.
pub trait OwnerPtr {
/// The target type that this pointer refers to.
// TODO: allow ?Sized
type Target;
/// Converts to a raw pointer.
///
/// If `Self` owns the object that it refers to (e.g., `Box<_>`), then
/// each call to `into_raw` must be paired with a call to `from_raw`
/// in order to avoid memory leakage.
fn into_raw(self) -> *const Self::Target;
/// Converts back from a raw pointer.
///
/// # Safety
///
/// The raw pointer must have been previously returned by a call to `into_raw`.
unsafe fn from_raw(ptr: *const Self::Target) -> Self;
}
impl<T> OwnerPtr for Box<T> {
type Target = T;
fn into_raw(self) -> *const Self::Target {
Box::into_raw(self) as *const _
}
unsafe fn from_raw(ptr: *const Self::Target) -> Self {
Box::from_raw(ptr as *mut _)
}
}
impl<T> OwnerPtr for Arc<T> {
type Target = T;
fn into_raw(self) -> *const Self::Target {
Arc::into_raw(self)
}
unsafe fn from_raw(ptr: *const Self::Target) -> Self {
Arc::from_raw(ptr)
}
}
impl<P> OwnerPtr for Option<P>
where
P: OwnerPtr,
// We cannot support fat pointers, e.g., when `Target: dyn Trait`.
// This is because Rust does not allow fat null pointers. Yet,
// we need the null pointer to represent `None`.
// See https://github.com/rust-lang/rust/issues/66316.
<P as OwnerPtr>::Target: Sized,
{
type Target = P::Target;
fn into_raw(self) -> *const Self::Target {
self.map(|p| <P as OwnerPtr>::into_raw(p))
.unwrap_or(core::ptr::null())
}
unsafe fn from_raw(ptr: *const Self::Target) -> Self {
if ptr.is_null() {
Some(<P as OwnerPtr>::from_raw(ptr))
} else {
None
}
}
}

View File

@ -0,0 +1,116 @@
use core::cell::UnsafeCell;
use core::fmt;
use core::ops::{Deref, DerefMut};
use core::sync::atomic::{AtomicBool, Ordering};
use crate::trap::disable_local;
use crate::trap::DisabledLocalIrqGuard;
/// A spin lock.
pub struct SpinLock<T> {
val: UnsafeCell<T>,
lock: AtomicBool,
}
impl<T> SpinLock<T> {
/// Creates a new spin lock.
pub const fn new(val: T) -> Self {
Self {
val: UnsafeCell::new(val),
lock: AtomicBool::new(false),
}
}
/// Acquire the spin lock.
///
/// This method runs in a busy loop until the lock can be acquired.
/// After acquiring the spin lock, all interrupts are disabled.
pub fn lock(&self) -> SpinLockGuard<T> {
// FIXME: add disable_preemption
let guard = disable_local();
self.acquire_lock();
SpinLockGuard {
lock: &self,
irq_guard: guard,
}
}
/// Try Acquire the spin lock immedidately.
pub fn try_lock(&self) -> Option<SpinLockGuard<T>> {
// FIXME: add disable_preemption
let irq_guard = disable_local();
if self.try_acquire_lock() {
let lock_guard = SpinLockGuard {
lock: &self,
irq_guard,
};
return Some(lock_guard);
}
return None;
}
/// Access the spin lock, otherwise busy waiting
fn acquire_lock(&self) {
while !self.try_acquire_lock() {
core::hint::spin_loop();
}
}
fn try_acquire_lock(&self) -> bool {
self.lock
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
}
fn release_lock(&self) {
self.lock.store(false, Ordering::SeqCst);
}
}
impl<T: fmt::Debug> fmt::Debug for SpinLock<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.val, f)
}
}
// Safety. Only a single lock holder is permitted to access the inner data of Spinlock.
unsafe impl<T: Send> Send for SpinLock<T> {}
unsafe impl<T: Send> Sync for SpinLock<T> {}
/// The guard of a spin lock.
pub struct SpinLockGuard<'a, T> {
lock: &'a SpinLock<T>,
irq_guard: DisabledLocalIrqGuard,
}
impl<'a, T> Deref for SpinLockGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &mut *self.lock.val.get() }
}
}
impl<'a, T> DerefMut for SpinLockGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.lock.val.get() }
}
}
impl<'a, T> Drop for SpinLockGuard<'a, T> {
fn drop(&mut self) {
self.lock.release_lock();
}
}
impl<'a, T: fmt::Debug> fmt::Debug for SpinLockGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<'a, T> !Send for SpinLockGuard<'a, T> {}
// Safety. SpinLockGuard can be shared between tasks/threads in same CPU.
// As SpinLock disables interrupts to prevent race conditions caused by interrupts.
unsafe impl<T: Sync> Sync for SpinLockGuard<'_, T> {}

View File

@ -0,0 +1,140 @@
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::{collections::VecDeque, sync::Arc};
use bitflags::bitflags;
use spin::mutex::Mutex;
use crate::task::schedule;
/// A wait queue.
///
/// One may wait on a wait queue to put its executing thread to sleep.
/// Multiple threads may be the waiters of a wait queue.
/// Other threads may invoke the `wake`-family methods of a wait queue to
/// wake up one or many waiter threads.
pub struct WaitQueue {
waiters: Mutex<VecDeque<Arc<Waiter>>>,
}
impl WaitQueue {
/// Creates a new instance.
pub fn new() -> Self {
WaitQueue {
waiters: Mutex::new(VecDeque::new()),
}
}
/// Wait until some condition becomes true.
///
/// This method takes a closure that tests a user-given condition.
/// The method only returns if the condition returns Some(_).
/// A waker thread should first make the condition Some(_), then invoke the
/// `wake`-family method. This ordering is important to ensure that waiter
/// threads do not lose any wakeup notifiations.
///
/// By taking a condition closure, his wait-wakeup mechanism becomes
/// more efficient and robust.
pub fn wait_until<F, R>(&self, mut cond: F) -> R
where
F: FnMut() -> Option<R>,
{
let waiter = Arc::new(Waiter::new());
self.enqueue(&waiter);
loop {
if let Some(res) = cond() {
waiter.set_finished();
self.finish_wait();
return res;
};
waiter.wait();
}
}
/// Wake one waiter thread, if there is one.
pub fn wake_one(&self) {
if let Some(waiter) = self.waiters.lock().front() {
waiter.wake_up();
}
}
/// Wake all not-exclusive waiter threads and at most one exclusive waiter.
pub fn wake_all(&self) {
for waiter in self.waiters.lock().iter() {
waiter.wake_up();
if waiter.is_exclusive() {
break;
}
}
}
// enqueue a waiter into current waitqueue. If waiter is exclusive, add to the back of waitqueue.
// Otherwise, add to the front of waitqueue
fn enqueue(&self, waiter: &Arc<Waiter>) {
if waiter.is_exclusive() {
self.waiters.lock().push_back(waiter.clone())
} else {
self.waiters.lock().push_front(waiter.clone());
}
}
/// removes all waiters that have finished wait
fn finish_wait(&self) {
self.waiters.lock().retain(|waiter| !waiter.is_finished())
}
}
#[derive(Debug)]
struct Waiter {
/// Whether the
is_woken_up: AtomicBool,
/// To respect different wait condition
flag: WaiterFlag,
/// if the wait condition is ture, then the waiter is finished and can be removed from waitqueue
wait_finished: AtomicBool,
}
impl Waiter {
pub fn new() -> Self {
Waiter {
is_woken_up: AtomicBool::new(false),
flag: WaiterFlag::empty(),
wait_finished: AtomicBool::new(false),
}
}
/// make self into wait status until be called wake up
pub fn wait(&self) {
self.is_woken_up.store(false, Ordering::SeqCst);
while !self.is_woken_up.load(Ordering::SeqCst) {
// yield the execution, to allow other task to continue
schedule();
}
}
pub fn is_woken_up(&self) -> bool {
self.is_woken_up.load(Ordering::SeqCst)
}
pub fn wake_up(&self) {
self.is_woken_up.store(true, Ordering::SeqCst);
}
pub fn set_finished(&self) {
self.wait_finished.store(true, Ordering::SeqCst);
}
pub fn is_finished(&self) -> bool {
self.wait_finished.load(Ordering::SeqCst)
}
pub fn is_exclusive(&self) -> bool {
self.flag.contains(WaiterFlag::EXCLUSIVE)
}
}
bitflags! {
pub struct WaiterFlag: u32 {
const EXCLUSIVE = 0x1;
const INTERRUPTIABLE = 0x10;
}
}