Make the upgrade method of read-write locks atomic

This commit is contained in:
Chuandong Li 2023-11-30 23:47:56 +08:00 committed by Tate, Hongliang Tian
parent 07dd0fbd38
commit 5aa3124e66
4 changed files with 527 additions and 115 deletions

View File

@ -11,6 +11,7 @@ mod wait;
pub use self::atomic_bits::AtomicBits;
pub use self::mutex::{Mutex, MutexGuard};
// pub use self::rcu::{pass_quiescent_state, OwnerPtr, Rcu, RcuReadGuard, RcuReclaimer};
pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockUpgradeableGuard, RwLockWriteGuard};
pub use self::rwmutex::{RwMutex, RwMutexReadGuard, RwMutexUpgradeableGuard, RwMutexWriteGuard};
pub use self::spin::{SpinLock, SpinLockGuard};
pub use self::wait::WaitQueue;

View File

@ -2,29 +2,111 @@ use core::cell::UnsafeCell;
use core::fmt;
use core::ops::{Deref, DerefMut};
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use crate::task::{disable_preempt, DisablePreemptGuard};
use crate::trap::disable_local;
use crate::trap::DisabledLocalIrqGuard;
/// A read write lock, waiting by spinning.
/// Now, the lock's layout is simply like:
/// Spin-based Read-write Lock
///
/// # Overview
///
/// This lock allows for multiple readers, or at most one writer to access
/// at any point in time. The writer of this lock has exclusive access to
/// modify the underlying data, while the readers are allowed shared and
/// read-only access.
///
/// The writing and reading portions cannot be active simultaneously, when
/// one portion is in progress, the other portion will spin-wait. This is
/// suitable for scenarios where the lock is expected to be held for short
/// periods of time, and the overhead of context switching is higher than
/// the cost of spinning.
///
/// The lock provides methods to safely acquire locks with interrupts
/// disabled, preventing deadlocks in scenarios where locks are used within
/// interrupt handlers.
///
/// In addition to traditional read and write locks, this implementation
/// provides the upgradeable read lock (`upread lock`). The `upread lock`
/// can be upgraded to write locks atomically, useful in scenarios
/// where a decision to write is made after reading.
///
/// The type parameter `T` represents the data that this lock is protecting.
/// It is necessary for `T` to satisfy `Send` to be shared across tasks and
/// `Sync` to permit concurrent access via readers. The `Deref` method (and
/// `DerefMut` for the writer) is implemented for the RAII guards returned
/// by the locking methods, which allows for the access to the protected data
/// while the lock is held.
///
/// # Usage
/// The lock can be used in scenarios where data needs to be read frequently
/// but written to occasionally.
///
/// Use `upread lock` in scenarios where related checking is performed before
/// modification to effectively avoid deadlocks and improve efficiency.
///
/// This lock should not be used in scenarios where lock-holding times are
/// long as it can lead to CPU resource wastage due to spinning.
///
/// # Safety
///
/// Use interrupt-disabled version methods when dealing with interrupt-related read-write locks,
/// as nested interrupts may lead to a deadlock if not properly handled.
///
/// # Examples
///
/// ```
/// bit: 63 | 62 ~ 0
/// use: writer lock | reader lock & numbers
/// use jinux_frame::sync::RwLock;
///
/// let lock = RwLock::new(5)
///
/// // many read locks can be held at once
/// {
/// let r1 = lock.read();
/// let r2 = lock.read();
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
///
/// // Upgradeable read lock can share access to data with read locks
/// let r3 = lock.upread();
/// assert_eq!(*r3, 5);
/// drop(r1);
/// drop(r2);
/// // read locks are dropped at this point
///
/// // An upread lock can only be upgraded successfully after all the
/// // read locks are released, otherwise it will spin-wait.
/// let mut w1 = r3.upgrade();
/// *w1 += 1;
/// assert_eq!(*w1, 6);
/// } // upread lock are dropped at this point
///
/// {
/// // Only one write lock can be held at a time
/// let mut w2 = lock.write();
/// *w2 += 1;
/// assert_eq!(*w2, 7);
/// } // write lock is dropped at this point
/// ```
pub struct RwLock<T> {
val: UnsafeCell<T>,
/// The internal representation of the lock state is as follows:
/// - **Bit 63:** Writer lock.
/// - **Bit 62:** Upgradeable reader lock.
/// - **Bit 61:** Indicates if an upgradeable reader is being upgraded.
/// - **Bits 60-0:** Reader lock count.
lock: AtomicUsize,
}
const READER: usize = 1;
const WRITER: usize = 1 << (usize::BITS - 1);
const MAX_READER: usize = WRITER >> 1;
const UPGRADEABLE_READER: usize = 1 << (usize::BITS - 2);
const BEING_UPGRADED: usize = 1 << (usize::BITS - 3);
const MAX_READER: usize = 1 << (usize::BITS - 4);
impl<T> RwLock<T> {
/// Creates a new read/write lock.
/// Creates a new spin-based read-write lock with an initial value.
pub const fn new(val: T) -> Self {
Self {
val: UnsafeCell::new(val),
@ -32,12 +114,14 @@ impl<T> RwLock<T> {
}
}
/// Acquire a read lock with disabling the local IRQs. This is the most secure
/// locking method.
/// Acquire a read lock while disabling the local IRQs and spin-wait
/// until it can be acquired.
///
/// This method runs in a busy loop until the lock can be acquired (when there are
/// no writers).
/// After acquiring the spin lock, all interrupts are disabled.
/// The calling thread will spin-wait until there are no writers or
/// upgrading upreaders present. There is no guarantee for the order
/// in which other readers or writers waiting simultaneously will
/// obtain the lock. Once this lock is acquired, the calling thread
/// will not be interrupted.
pub fn read_irq_disabled(&self) -> RwLockReadGuard<T> {
loop {
if let Some(readguard) = self.try_read_irq_disabled() {
@ -48,12 +132,14 @@ impl<T> RwLock<T> {
}
}
/// Acquire a write lock with disabling local IRQs. This is the most secure
/// locking method.
/// Acquire a write lock while disabling the local IRQs and spin-wait
/// until it can be acquired.
///
/// This method runs in a busy loop until the lock can be acquired (when there are
/// no writers and readers).
/// After acquiring the spin lock, all interrupts are disabled.
/// The calling thread will spin-wait until there are no other writers,
/// , upreaders or readers present. There is no guarantee for the order
/// in which other readers or writers waiting simultaneously will
/// obtain the lock. Once this lock is acquired, the calling thread
/// will not be interrupted.
pub fn write_irq_disabled(&self) -> RwLockWriteGuard<T> {
loop {
if let Some(writeguard) = self.try_write_irq_disabled() {
@ -64,11 +150,38 @@ impl<T> RwLock<T> {
}
}
/// Try acquire a read lock with disabling local IRQs.
/// Acquire an upgradeable reader (upreader) while disabling local IRQs
/// and spin-wait until it can be acquired.
///
/// The calling thread will spin-wait until there are no other writers,
/// or upreaders. There is no guarantee for the order in which other
/// readers or writers waiting simultaneously will obtain the lock. Once
/// this lock is acquired, the calling thread will not be interrupted.
///
/// Upreader will not block new readers until it tries to upgrade. Upreader
/// and reader do not differ before invoking the upgread method. However,
/// only one upreader can exist at any time to avoid deadlock in the
/// upgread method.
pub fn upread_irq_disabled(&self) -> RwLockUpgradeableGuard<T> {
loop {
if let Some(guard) = self.try_upread_irq_disabled() {
return guard;
} else {
core::hint::spin_loop();
}
}
}
/// Attempt to acquire a read lock while disabling local IRQs.
///
/// This function will never spin-wait and will return immediately. When
/// multiple readers or writers attempt to acquire the lock, this method
/// does not guarantee any order. Interrupts will automatically be restored
/// when acquiring fails.
pub fn try_read_irq_disabled(&self) -> Option<RwLockReadGuard<T>> {
let irq_guard = disable_local();
let lock = self.lock.fetch_add(READER, Acquire);
if lock & (WRITER | MAX_READER) == 0 {
if lock & (WRITER | MAX_READER | BEING_UPGRADED) == 0 {
Some(RwLockReadGuard {
inner: self,
inner_guard: InnerGuard::IrqGuard(irq_guard),
@ -79,7 +192,12 @@ impl<T> RwLock<T> {
}
}
/// Try acquire a write lock with disabling local IRQs.
/// Attempt to acquire a write lock while disabling local IRQs.
///
/// This function will never spin-wait and will return immediately. When
/// multiple readers or writers attempt to acquire the lock, this method
/// does not guarantee any order. Interrupts will automatically be restored
/// when acquiring fails.
pub fn try_write_irq_disabled(&self) -> Option<RwLockWriteGuard<T>> {
let irq_guard = disable_local();
if self
@ -96,13 +214,38 @@ impl<T> RwLock<T> {
}
}
/// Acquire a read lock without disabling local IRQs.
/// Attempt to acquire a upread lock while disabling local IRQs.
///
/// Prefer using this method over the `read_irq_disabled` method
/// when IRQ handlers are allowed to get executed while
/// holding this lock. For example, if a lock is never used
/// in the interrupt context, then it is ok to use this method
/// in the process context.
/// This function will never spin-wait and will return immediately. When
/// multiple readers or writers attempt to acquire the lock, this method
/// does not guarantee any order. Interrupts will automatically be restored
/// when acquiring fails.
pub fn try_upread_irq_disabled(&self) -> Option<RwLockUpgradeableGuard<T>> {
let irq_guard = disable_local();
let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire) & (WRITER | UPGRADEABLE_READER);
if lock == 0 {
return Some(RwLockUpgradeableGuard {
inner: self,
inner_guard: InnerGuard::IrqGuard(irq_guard),
});
} else if lock == WRITER {
self.lock.fetch_sub(UPGRADEABLE_READER, Release);
}
None
}
/// Acquire a read lock and spin-wait until it can be acquired.
///
/// The calling thread will spin-wait until there are no writers or
/// upgrading upreaders present. There is no guarantee for the order
/// in which other readers or writers waiting simultaneously will
/// obtain the lock.
///
/// This method does not disable interrupts, so any locks related to
/// interrupt context should avoid using this method, and use `read_irq_disabled`
/// instead. When IRQ handlers are allowed to be executed while holding
/// this lock, it is preferable to use this method over the `read_irq_disabled`
/// method as it has a higher efficiency.
pub fn read(&self) -> RwLockReadGuard<T> {
loop {
if let Some(readguard) = self.try_read() {
@ -113,7 +256,18 @@ impl<T> RwLock<T> {
}
}
/// Acquire a write lock without disabling local IRQs.
/// Acquire a write lock and spin-wait until it can be acquired.
///
/// The calling thread will spin-wait until there are no other writers,
/// , upreaders or readers present. There is no guarantee for the order
/// in which other readers or writers waiting simultaneously will
/// obtain the lock.
///
/// This method does not disable interrupts, so any locks related to
/// interrupt context should avoid using this method, and use `write_irq_disabled`
/// instead. When IRQ handlers are allowed to be executed while holding
/// this lock, it is preferable to use this method over the `write_irq_disabled`
/// method as it has a higher efficiency.
pub fn write(&self) -> RwLockWriteGuard<T> {
loop {
if let Some(writeguard) = self.try_write() {
@ -124,11 +278,46 @@ impl<T> RwLock<T> {
}
}
/// Try acquire a read lock without disabling the local IRQs.
/// Acquire an upreader and spin-wait until it can be acquired.
///
/// The calling thread will spin-wait until there are no other writers,
/// or upreaders. There is no guarantee for the order in which other
/// readers or writers waiting simultaneously will obtain the lock.
///
/// Upreader will not block new readers until it tries to upgrade. Upreader
/// and reader do not differ before invoking the upgread method. However,
/// only one upreader can exist at any time to avoid deadlock in the
/// upgread method.
///
/// This method does not disable interrupts, so any locks related to
/// interrupt context should avoid using this method, and use `upread_irq_disabled`
/// instead. When IRQ handlers are allowed to be executed while holding
/// this lock, it is preferable to use this method over the `upread_irq_disabled`
/// method as it has a higher efficiency.
pub fn upread(&self) -> RwLockUpgradeableGuard<T> {
loop {
if let Some(guard) = self.try_upread() {
return guard;
} else {
core::hint::spin_loop();
}
}
}
/// Attempt to acquire a read lock.
///
/// This function will never spin-wait and will return immediately.
///
/// This method does not disable interrupts, so any locks related to
/// interrupt context should avoid using this method, and use
/// `try_read_irq_disabled` instead. When IRQ handlers are allowed to
/// be executed while holding this lock, it is preferable to use this
/// method over the `try_read_irq_disabled` method as it has a higher
/// efficiency.
pub fn try_read(&self) -> Option<RwLockReadGuard<T>> {
let guard = disable_preempt();
let lock = self.lock.fetch_add(READER, Acquire);
if lock & (WRITER | MAX_READER) == 0 {
if lock & (WRITER | MAX_READER | BEING_UPGRADED) == 0 {
Some(RwLockReadGuard {
inner: self,
inner_guard: InnerGuard::PreemptGuard(guard),
@ -139,7 +328,16 @@ impl<T> RwLock<T> {
}
}
/// Try acquire a write lock without disabling the local IRQs.
/// Attempt to acquire a write lock.
///
/// This function will never spin-wait and will return immediately.
///
/// This method does not disable interrupts, so any locks related to
/// interrupt context should avoid using this method, and use
/// `try_write_irq_disabled` instead. When IRQ handlers are allowed to
/// be executed while holding this lock, it is preferable to use this
/// method over the `try_write_irq_disabled` method as it has a higher
/// efficiency.
pub fn try_write(&self) -> Option<RwLockWriteGuard<T>> {
let guard = disable_preempt();
if self
@ -155,6 +353,30 @@ impl<T> RwLock<T> {
None
}
}
/// Attempt to acquire an upread lock.
///
/// This function will never spin-wait and will return immediately.
///
/// This method does not disable interrupts, so any locks related to
/// interrupt context should avoid using this method, and use
/// `try_upread_irq_disabled` instead. When IRQ handlers are allowed to
/// be executed while holding this lock, it is preferable to use this
/// method over the `try_upread_irq_disabled` method as it has a higher
/// efficiency.
pub fn try_upread(&self) -> Option<RwLockUpgradeableGuard<T>> {
let guard = disable_preempt();
let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire) & (WRITER | UPGRADEABLE_READER);
if lock == 0 {
return Some(RwLockUpgradeableGuard {
inner: self,
inner_guard: InnerGuard::PreemptGuard(guard),
});
} else if lock == WRITER {
self.lock.fetch_sub(UPGRADEABLE_READER, Release);
}
None
}
}
impl<T: fmt::Debug> fmt::Debug for RwLock<T> {
@ -174,43 +396,20 @@ unsafe impl<T: Sync> Sync for RwLockWriteGuard<'_, T> {}
impl<'a, T> !Send for RwLockReadGuard<'a, T> {}
unsafe impl<T: Sync> Sync for RwLockReadGuard<'_, T> {}
impl<'a, T> !Send for RwLockUpgradeableGuard<'a, T> {}
unsafe impl<T: Sync> Sync for RwLockUpgradeableGuard<'_, T> {}
enum InnerGuard {
IrqGuard(DisabledLocalIrqGuard),
PreemptGuard(DisablePreemptGuard),
}
/// The guard of the read lock.
/// A guard that provides immutable data access.
pub struct RwLockReadGuard<'a, T> {
inner: &'a RwLock<T>,
inner_guard: InnerGuard,
}
/// Upgrade a read lock to a write lock.
///
/// This method first release the old read lock and then aquire a new write lock.
/// So it may not return the write guard immidiately
/// due to other readers or another writer.
impl<'a, T> RwLockReadGuard<'a, T> {
pub fn upgrade(mut self) -> RwLockWriteGuard<'a, T> {
let inner = self.inner;
let inner_guard = match &mut self.inner_guard {
InnerGuard::IrqGuard(irq_guard) => InnerGuard::IrqGuard(irq_guard.transfer_to()),
InnerGuard::PreemptGuard(preempt_guard) => {
InnerGuard::PreemptGuard(preempt_guard.transfer_to())
}
};
drop(self);
while inner
.lock
.compare_exchange(0, WRITER, Acquire, Relaxed)
.is_err()
{
core::hint::spin_loop();
}
RwLockWriteGuard { inner, inner_guard }
}
}
impl<'a, T> Deref for RwLockReadGuard<'a, T> {
type Target = T;
@ -231,28 +430,12 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockReadGuard<'a, T> {
}
}
/// A guard that provides mutable data access.
pub struct RwLockWriteGuard<'a, T> {
inner: &'a RwLock<T>,
inner_guard: InnerGuard,
}
/// Downgrade a write lock to a read lock.
///
/// This method can return the read guard immidiately
/// due to there are no other users.
impl<'a, T> RwLockWriteGuard<'a, T> {
pub fn downgrade(mut self) -> RwLockReadGuard<'a, T> {
self.inner.lock.fetch_add(READER, Acquire);
let inner = self.inner;
let inner_guard = match &mut self.inner_guard {
InnerGuard::IrqGuard(irq_guard) => InnerGuard::IrqGuard(irq_guard.transfer_to()),
InnerGuard::PreemptGuard(preempt_guard) => InnerGuard::PreemptGuard(disable_preempt()),
};
drop(self);
RwLockReadGuard { inner, inner_guard }
}
}
impl<'a, T> Deref for RwLockWriteGuard<'a, T> {
type Target = T;
@ -269,7 +452,7 @@ impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> {
impl<'a, T> Drop for RwLockWriteGuard<'a, T> {
fn drop(&mut self) {
self.inner.lock.fetch_and(!(WRITER), Release);
self.inner.lock.fetch_and(!WRITER, Release);
}
}
@ -278,3 +461,71 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'a, T> {
fmt::Debug::fmt(&**self, f)
}
}
/// A guard that provides immutable data access but can be atomically
/// upgraded to `RwLockWriteGuard`.
pub struct RwLockUpgradeableGuard<'a, T> {
inner: &'a RwLock<T>,
inner_guard: InnerGuard,
}
impl<'a, T> RwLockUpgradeableGuard<'a, T> {
/// Upgrade this upread guard to a write guard atomically.
///
/// After calling this method, subsequent readers will be blocked
/// while previous readers remain unaffected. The calling thread
/// will spin-wait until previous readers finish.
pub fn upgrade(mut self) -> RwLockWriteGuard<'a, T> {
self.inner.lock.fetch_or(BEING_UPGRADED, Acquire);
loop {
self = match self.try_upgrade() {
Ok(guard) => return guard,
Err(e) => e,
};
}
}
/// Attempts to upgrade this upread guard to a write guard atomically.
///
/// This function will never spin-wait and will return immediately.
pub fn try_upgrade(mut self) -> Result<RwLockWriteGuard<'a, T>, Self> {
let inner = self.inner;
let res = self.inner.lock.compare_exchange(
UPGRADEABLE_READER | BEING_UPGRADED,
WRITER | UPGRADEABLE_READER,
AcqRel,
Relaxed,
);
if res.is_ok() {
let inner_guard = match &mut self.inner_guard {
InnerGuard::IrqGuard(irq_guard) => InnerGuard::IrqGuard(irq_guard.transfer_to()),
InnerGuard::PreemptGuard(preempt_guard) => {
InnerGuard::PreemptGuard(preempt_guard.transfer_to())
}
};
drop(self);
Ok(RwLockWriteGuard { inner, inner_guard })
} else {
Err(self)
}
}
}
impl<'a, T> Deref for RwLockUpgradeableGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.inner.val.get() }
}
}
impl<'a, T> Drop for RwLockUpgradeableGuard<'a, T> {
fn drop(&mut self) {
self.inner.lock.fetch_sub(UPGRADEABLE_READER, Release);
}
}
impl<'a, T: fmt::Debug> fmt::Debug for RwLockUpgradeableGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

View File

@ -2,29 +2,102 @@ use core::cell::UnsafeCell;
use core::fmt;
use core::ops::{Deref, DerefMut};
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use super::WaitQueue;
/// A read/write lock based on blocking, which is named `RwMutex`.
/// A mutex that provides data access to either one writer or many readers.
///
/// # Overview
///
/// This mutex allows for multiple readers, or at most one writer to access
/// at any point in time. The writer of this mutex has exclusive access to
/// modify the underlying data, while the readers are allowed shared and
/// read-only access.
///
/// The writing and reading portions cannot be active simultaneously, when
/// one portion is in progress, the other portion will sleep. This is
/// suitable for scenarios where the mutex is expected to be held for a
/// period of time, which can avoid wasting CPU resources.
///
/// This implementation provides the upgradeable read mutex (`upread mutex`).
/// The `upread mutex` can be upgraded to write mutex atomically, useful in
/// scenarios where a decision to write is made after reading.
///
/// The type parameter `T` represents the data that this mutex is protecting.
/// It is necessary for `T` to satisfy `Send` to be shared across tasks and
/// `Sync` to permit concurrent access via readers. The `Deref` method (and
/// `DerefMut` for the writer) is implemented for the RAII guards returned
/// by the locking methods, which allows for the access to the protected data
/// while the mutex is held.
///
/// # Usage
/// The mutex can be used in scenarios where data needs to be read frequently
/// but written to occasionally.
///
/// Use `upread mutex` in scenarios where related checking is performed before
/// modification to effectively avoid deadlocks and improve efficiency.
///
/// # Safety
///
/// Avoid using `RwMutex` in an interrupt context, as it may result in sleeping
/// and never being awakened.
///
/// # Examples
///
/// ```
/// Now, the mutex's layout is simply like:
/// bit: 63 | 62 ~ 0
/// use: writer mutex | reader mutex & numbers
/// use jinux_frame::sync::RwMutex;
///
/// let mutex = RwMutex::new(5)
///
/// // many read mutexs can be held at once
/// {
/// let r1 = mutex.read();
/// let r2 = mutex.read();
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
///
/// // Upgradeable read mutex can share access to data with read mutexs
/// let r3 = mutex.upread();
/// assert_eq!(*r3, 5);
/// drop(r1);
/// drop(r2);
/// // read mutexs are dropped at this point
///
/// // An upread mutex can only be upgraded successfully after all the
/// // read mutexs are released, otherwise it will spin-wait.
/// let mut w1 = r3.upgrade();
/// *w1 += 1;
/// assert_eq!(*w1, 6);
/// } // upread mutex are dropped at this point
///
/// {
/// // Only one write mutex can be held at a time
/// let mut w2 = mutex.write();
/// *w2 += 1;
/// assert_eq!(*w2, 7);
/// } // write mutex is dropped at this point
/// ```
pub struct RwMutex<T> {
val: UnsafeCell<T>,
/// The internal representation of the mutex state is as follows:
/// - **Bit 63:** Writer mutex.
/// - **Bit 62:** Upgradeable reader mutex.
/// - **Bit 61:** Indicates if an upgradeable reader is being upgraded.
/// - **Bits 60-0:** Reader mutex count.
lock: AtomicUsize,
/// Threads that fail to acquire the mutex will sleep on this waitqueue.
queue: WaitQueue,
}
const READER: usize = 1;
const WRITER: usize = 1 << (usize::BITS - 1);
const MAX_READER: usize = WRITER >> 1;
const UPGRADEABLE_READER: usize = 1 << (usize::BITS - 2);
const BEING_UPGRADED: usize = 1 << (usize::BITS - 3);
const MAX_READER: usize = 1 << (usize::BITS - 4);
impl<T> RwMutex<T> {
/// Creates a new `RwMutex`.
/// Creates a new read-write mutex with an initial value.
pub const fn new(val: T) -> Self {
Self {
val: UnsafeCell::new(val),
@ -33,20 +106,46 @@ impl<T> RwMutex<T> {
}
}
/// Acquire a read mutex, and if there is a writer, this thread will sleep in the wait queue.
/// Acquire a read mutex and sleep until it can be acquired.
///
/// The calling thread will sleep until there are no writers or upgrading
/// upreaders present. The implementation of `WaitQueue` guarantees the
/// order in which other concurrent readers or writers waiting simultaneously
/// will acquire the mutex.
pub fn read(&self) -> RwMutexReadGuard<T> {
self.queue.wait_until(|| self.try_read())
}
/// Acquire a write mutex, and if there is another writer or other readers, this thread will sleep in the wait queue.
/// Acquire a write mutex and sleep until it can be acquired.
///
/// The calling thread will sleep until there are no writers, upreaders,
/// or readers present. The implementation of `WaitQueue` guarantees the
/// order in which other concurrent readers or writers waiting simultaneously
/// will acquire the mutex.
pub fn write(&self) -> RwMutexWriteGuard<T> {
self.queue.wait_until(|| self.try_write())
}
/// Try acquire a read mutex and return immediately if it fails.
/// Acquire a upread mutex and sleep until it can be acquired.
///
/// The calling thread will sleep until there are no writers or upreaders present.
/// The implementation of `WaitQueue` guarantees the order in which other concurrent
/// readers or writers waiting simultaneously will acquire the mutex.
///
/// Upreader will not block new readers until it tries to upgrade. Upreader
/// and reader do not differ before invoking the upgread method. However,
/// only one upreader can exist at any time to avoid deadlock in the
/// upgread method.
pub fn upread(&self) -> RwMutexUpgradeableGuard<T> {
self.queue.wait_until(|| self.try_upread())
}
/// Attempt to acquire a read mutex.
///
/// This function will never sleep and will return immediately.
pub fn try_read(&self) -> Option<RwMutexReadGuard<T>> {
let lock = self.lock.fetch_add(READER, Acquire);
if lock & (WRITER | MAX_READER) == 0 {
if lock & (WRITER | BEING_UPGRADED | MAX_READER) == 0 {
Some(RwMutexReadGuard { inner: self })
} else {
self.lock.fetch_sub(READER, Release);
@ -54,7 +153,9 @@ impl<T> RwMutex<T> {
}
}
/// Try acquire a write mutex and return immediately if it fails.
/// Attempt to acquire a write mutex.
///
/// This function will never sleep and will return immediately.
pub fn try_write(&self) -> Option<RwMutexWriteGuard<T>> {
if self
.lock
@ -66,6 +167,19 @@ impl<T> RwMutex<T> {
None
}
}
/// Attempt to acquire a upread mutex.
///
/// This function will never sleep and will return immediately.
pub fn try_upread(&self) -> Option<RwMutexUpgradeableGuard<T>> {
let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire) & (WRITER | UPGRADEABLE_READER);
if lock == 0 {
return Some(RwMutexUpgradeableGuard { inner: self });
} else if lock == WRITER {
self.lock.fetch_sub(UPGRADEABLE_READER, Release);
}
None
}
}
impl<T: fmt::Debug> fmt::Debug for RwMutex<T> {
@ -85,23 +199,14 @@ unsafe impl<T: Sync> Sync for RwMutexWriteGuard<'_, T> {}
impl<'a, T> !Send for RwMutexReadGuard<'a, T> {}
unsafe impl<T: Sync> Sync for RwMutexReadGuard<'_, T> {}
/// The guards of `RwMutex`.
impl<'a, T> !Send for RwMutexUpgradeableGuard<'a, T> {}
unsafe impl<T: Sync> Sync for RwMutexUpgradeableGuard<'_, T> {}
/// A guard that provides immutable data access.
pub struct RwMutexReadGuard<'a, T> {
inner: &'a RwMutex<T>,
}
/// Upgrade a read mutex to a write mutex.
///
/// This method first release the old read mutex and then aquire a new write mutex.
/// So it may sleep while acquireing the write mutex.
impl<'a, T> RwMutexReadGuard<'a, T> {
pub fn upgrade(self) -> RwMutexWriteGuard<'a, T> {
let inner = self.inner;
drop(self);
inner.write()
}
}
impl<'a, T> Deref for RwMutexReadGuard<'a, T> {
type Target = T;
@ -110,28 +215,20 @@ impl<'a, T> Deref for RwMutexReadGuard<'a, T> {
}
}
/// When there are no readers, wake up a waiting writer.
impl<'a, T> Drop for RwMutexReadGuard<'a, T> {
fn drop(&mut self) {
if self.inner.lock.fetch_sub(READER, Release) == 1 {
// When there are no readers, wake up a waiting writer.
if self.inner.lock.fetch_sub(READER, Release) == READER {
self.inner.queue.wake_one();
}
}
}
/// A guard that provides mutable data access.
pub struct RwMutexWriteGuard<'a, T> {
inner: &'a RwMutex<T>,
}
impl<'a, T> RwMutexWriteGuard<'a, T> {
pub fn downgrade(self) -> RwMutexReadGuard<'a, T> {
self.inner.lock.fetch_add(READER, Acquire);
let inner = self.inner;
drop(self);
RwMutexReadGuard { inner }
}
}
impl<'a, T> Deref for RwMutexWriteGuard<'a, T> {
type Target = T;
@ -146,14 +243,77 @@ impl<'a, T> DerefMut for RwMutexWriteGuard<'a, T> {
}
}
/// When the current writer releases, wake up all the sleeping threads.
impl<'a, T> Drop for RwMutexWriteGuard<'a, T> {
fn drop(&mut self) {
self.inner.lock.fetch_and(!(WRITER), Release);
self.inner.lock.fetch_and(!WRITER, Release);
// When the current writer releases, wake up all the sleeping threads.
// All awakened threads may include readers and writers.
// Thanks to the `wait_until` method, either all readers
// continue to execute or one writer continues to execute.
self.inner.queue.wake_all();
}
}
/// A guard that provides immutable data access but can be atomically
/// upgraded to `RwMutexWriteGuard`.
pub struct RwMutexUpgradeableGuard<'a, T> {
inner: &'a RwMutex<T>,
}
impl<'a, T> RwMutexUpgradeableGuard<'a, T> {
/// Upgrade this upread guard to a write guard atomically.
///
/// After calling this method, subsequent readers will be blocked
/// while previous readers remain unaffected.
///
/// The calling thread will not sleep, but spin to wait for the existing
/// reader to be released. There are two main reasons.
/// - First, it needs to sleep in an extra waiting queue and needs extra wake-up logic and overhead.
/// - Second, upgrading method usually requires a high response time (because the mutex is being used now).
pub fn upgrade(mut self) -> RwMutexWriteGuard<'a, T> {
self.inner.lock.fetch_or(BEING_UPGRADED, Acquire);
loop {
self = match self.try_upgrade() {
Ok(guard) => return guard,
Err(e) => e,
};
}
}
/// Attempts to upgrade this upread guard to a write guard atomically.
///
/// This function will return immediately.
pub fn try_upgrade(self) -> Result<RwMutexWriteGuard<'a, T>, Self> {
let inner = self.inner;
let res = self.inner.lock.compare_exchange(
UPGRADEABLE_READER | BEING_UPGRADED,
WRITER | UPGRADEABLE_READER,
AcqRel,
Relaxed,
);
if res.is_ok() {
drop(self);
Ok(RwMutexWriteGuard { inner })
} else {
Err(self)
}
}
}
impl<'a, T> Deref for RwMutexUpgradeableGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.inner.val.get() }
}
}
impl<'a, T> Drop for RwMutexUpgradeableGuard<'a, T> {
fn drop(&mut self) {
let res = self.inner.lock.fetch_sub(UPGRADEABLE_READER, Release);
if res == 0 {
self.inner.queue.wake_all();
}
}
}

View File

@ -488,7 +488,7 @@ impl Inode for RamInode {
return device.write(buf);
}
let self_inode = self.0.read();
let self_inode = self.0.upread();
let Some(page_cache) = self_inode.inner.as_file() else {
return_errno_with_message!(Errno::EISDIR, "write is not supported");
};