From 5aa3124e66b22006066dd68c3553cc12b67f8c26 Mon Sep 17 00:00:00 2001 From: Chuandong Li Date: Thu, 30 Nov 2023 23:47:56 +0800 Subject: [PATCH] Make the upgrade method of read-write locks atomic --- framework/jinux-frame/src/sync/mod.rs | 3 +- framework/jinux-frame/src/sync/rwlock.rs | 401 +++++++++++++++++---- framework/jinux-frame/src/sync/rwmutex.rs | 236 ++++++++++-- services/libs/jinux-std/src/fs/ramfs/fs.rs | 2 +- 4 files changed, 527 insertions(+), 115 deletions(-) diff --git a/framework/jinux-frame/src/sync/mod.rs b/framework/jinux-frame/src/sync/mod.rs index d079165a4..23b37ea10 100644 --- a/framework/jinux-frame/src/sync/mod.rs +++ b/framework/jinux-frame/src/sync/mod.rs @@ -11,6 +11,7 @@ mod wait; pub use self::atomic_bits::AtomicBits; pub use self::mutex::{Mutex, MutexGuard}; // pub use self::rcu::{pass_quiescent_state, OwnerPtr, Rcu, RcuReadGuard, RcuReclaimer}; -pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockUpgradeableGuard, RwLockWriteGuard}; +pub use self::rwmutex::{RwMutex, RwMutexReadGuard, RwMutexUpgradeableGuard, RwMutexWriteGuard}; pub use self::spin::{SpinLock, SpinLockGuard}; pub use self::wait::WaitQueue; diff --git a/framework/jinux-frame/src/sync/rwlock.rs b/framework/jinux-frame/src/sync/rwlock.rs index 1ae3f17a6..ce4ce64df 100644 --- a/framework/jinux-frame/src/sync/rwlock.rs +++ b/framework/jinux-frame/src/sync/rwlock.rs @@ -2,29 +2,111 @@ use core::cell::UnsafeCell; use core::fmt; use core::ops::{Deref, DerefMut}; use core::sync::atomic::AtomicUsize; -use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use crate::task::{disable_preempt, DisablePreemptGuard}; use crate::trap::disable_local; use crate::trap::DisabledLocalIrqGuard; -/// A read write lock, waiting by spinning. -/// Now, the lock's layout is simply like: +/// Spin-based Read-write Lock +/// +/// # Overview +/// +/// This lock allows for multiple readers, or at most one writer to access +/// at any point in time. The writer of this lock has exclusive access to +/// modify the underlying data, while the readers are allowed shared and +/// read-only access. +/// +/// The writing and reading portions cannot be active simultaneously, when +/// one portion is in progress, the other portion will spin-wait. This is +/// suitable for scenarios where the lock is expected to be held for short +/// periods of time, and the overhead of context switching is higher than +/// the cost of spinning. +/// +/// The lock provides methods to safely acquire locks with interrupts +/// disabled, preventing deadlocks in scenarios where locks are used within +/// interrupt handlers. +/// +/// In addition to traditional read and write locks, this implementation +/// provides the upgradeable read lock (`upread lock`). The `upread lock` +/// can be upgraded to write locks atomically, useful in scenarios +/// where a decision to write is made after reading. +/// +/// The type parameter `T` represents the data that this lock is protecting. +/// It is necessary for `T` to satisfy `Send` to be shared across tasks and +/// `Sync` to permit concurrent access via readers. The `Deref` method (and +/// `DerefMut` for the writer) is implemented for the RAII guards returned +/// by the locking methods, which allows for the access to the protected data +/// while the lock is held. +/// +/// # Usage +/// The lock can be used in scenarios where data needs to be read frequently +/// but written to occasionally. +/// +/// Use `upread lock` in scenarios where related checking is performed before +/// modification to effectively avoid deadlocks and improve efficiency. +/// +/// This lock should not be used in scenarios where lock-holding times are +/// long as it can lead to CPU resource wastage due to spinning. +/// +/// # Safety +/// +/// Use interrupt-disabled version methods when dealing with interrupt-related read-write locks, +/// as nested interrupts may lead to a deadlock if not properly handled. +/// +/// # Examples +/// /// ``` -/// bit: 63 | 62 ~ 0 -/// use: writer lock | reader lock & numbers +/// use jinux_frame::sync::RwLock; +/// +/// let lock = RwLock::new(5) +/// +/// // many read locks can be held at once +/// { +/// let r1 = lock.read(); +/// let r2 = lock.read(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// +/// // Upgradeable read lock can share access to data with read locks +/// let r3 = lock.upread(); +/// assert_eq!(*r3, 5); +/// drop(r1); +/// drop(r2); +/// // read locks are dropped at this point +/// +/// // An upread lock can only be upgraded successfully after all the +/// // read locks are released, otherwise it will spin-wait. +/// let mut w1 = r3.upgrade(); +/// *w1 += 1; +/// assert_eq!(*w1, 6); +/// } // upread lock are dropped at this point +/// +/// { +/// // Only one write lock can be held at a time +/// let mut w2 = lock.write(); +/// *w2 += 1; +/// assert_eq!(*w2, 7); +/// } // write lock is dropped at this point /// ``` pub struct RwLock { val: UnsafeCell, + /// The internal representation of the lock state is as follows: + /// - **Bit 63:** Writer lock. + /// - **Bit 62:** Upgradeable reader lock. + /// - **Bit 61:** Indicates if an upgradeable reader is being upgraded. + /// - **Bits 60-0:** Reader lock count. lock: AtomicUsize, } const READER: usize = 1; const WRITER: usize = 1 << (usize::BITS - 1); -const MAX_READER: usize = WRITER >> 1; +const UPGRADEABLE_READER: usize = 1 << (usize::BITS - 2); +const BEING_UPGRADED: usize = 1 << (usize::BITS - 3); +const MAX_READER: usize = 1 << (usize::BITS - 4); impl RwLock { - /// Creates a new read/write lock. + /// Creates a new spin-based read-write lock with an initial value. pub const fn new(val: T) -> Self { Self { val: UnsafeCell::new(val), @@ -32,12 +114,14 @@ impl RwLock { } } - /// Acquire a read lock with disabling the local IRQs. This is the most secure - /// locking method. + /// Acquire a read lock while disabling the local IRQs and spin-wait + /// until it can be acquired. /// - /// This method runs in a busy loop until the lock can be acquired (when there are - /// no writers). - /// After acquiring the spin lock, all interrupts are disabled. + /// The calling thread will spin-wait until there are no writers or + /// upgrading upreaders present. There is no guarantee for the order + /// in which other readers or writers waiting simultaneously will + /// obtain the lock. Once this lock is acquired, the calling thread + /// will not be interrupted. pub fn read_irq_disabled(&self) -> RwLockReadGuard { loop { if let Some(readguard) = self.try_read_irq_disabled() { @@ -48,12 +132,14 @@ impl RwLock { } } - /// Acquire a write lock with disabling local IRQs. This is the most secure - /// locking method. + /// Acquire a write lock while disabling the local IRQs and spin-wait + /// until it can be acquired. /// - /// This method runs in a busy loop until the lock can be acquired (when there are - /// no writers and readers). - /// After acquiring the spin lock, all interrupts are disabled. + /// The calling thread will spin-wait until there are no other writers, + /// , upreaders or readers present. There is no guarantee for the order + /// in which other readers or writers waiting simultaneously will + /// obtain the lock. Once this lock is acquired, the calling thread + /// will not be interrupted. pub fn write_irq_disabled(&self) -> RwLockWriteGuard { loop { if let Some(writeguard) = self.try_write_irq_disabled() { @@ -64,11 +150,38 @@ impl RwLock { } } - /// Try acquire a read lock with disabling local IRQs. + /// Acquire an upgradeable reader (upreader) while disabling local IRQs + /// and spin-wait until it can be acquired. + /// + /// The calling thread will spin-wait until there are no other writers, + /// or upreaders. There is no guarantee for the order in which other + /// readers or writers waiting simultaneously will obtain the lock. Once + /// this lock is acquired, the calling thread will not be interrupted. + /// + /// Upreader will not block new readers until it tries to upgrade. Upreader + /// and reader do not differ before invoking the upgread method. However, + /// only one upreader can exist at any time to avoid deadlock in the + /// upgread method. + pub fn upread_irq_disabled(&self) -> RwLockUpgradeableGuard { + loop { + if let Some(guard) = self.try_upread_irq_disabled() { + return guard; + } else { + core::hint::spin_loop(); + } + } + } + + /// Attempt to acquire a read lock while disabling local IRQs. + /// + /// This function will never spin-wait and will return immediately. When + /// multiple readers or writers attempt to acquire the lock, this method + /// does not guarantee any order. Interrupts will automatically be restored + /// when acquiring fails. pub fn try_read_irq_disabled(&self) -> Option> { let irq_guard = disable_local(); let lock = self.lock.fetch_add(READER, Acquire); - if lock & (WRITER | MAX_READER) == 0 { + if lock & (WRITER | MAX_READER | BEING_UPGRADED) == 0 { Some(RwLockReadGuard { inner: self, inner_guard: InnerGuard::IrqGuard(irq_guard), @@ -79,7 +192,12 @@ impl RwLock { } } - /// Try acquire a write lock with disabling local IRQs. + /// Attempt to acquire a write lock while disabling local IRQs. + /// + /// This function will never spin-wait and will return immediately. When + /// multiple readers or writers attempt to acquire the lock, this method + /// does not guarantee any order. Interrupts will automatically be restored + /// when acquiring fails. pub fn try_write_irq_disabled(&self) -> Option> { let irq_guard = disable_local(); if self @@ -96,13 +214,38 @@ impl RwLock { } } - /// Acquire a read lock without disabling local IRQs. + /// Attempt to acquire a upread lock while disabling local IRQs. /// - /// Prefer using this method over the `read_irq_disabled` method - /// when IRQ handlers are allowed to get executed while - /// holding this lock. For example, if a lock is never used - /// in the interrupt context, then it is ok to use this method - /// in the process context. + /// This function will never spin-wait and will return immediately. When + /// multiple readers or writers attempt to acquire the lock, this method + /// does not guarantee any order. Interrupts will automatically be restored + /// when acquiring fails. + pub fn try_upread_irq_disabled(&self) -> Option> { + let irq_guard = disable_local(); + let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire) & (WRITER | UPGRADEABLE_READER); + if lock == 0 { + return Some(RwLockUpgradeableGuard { + inner: self, + inner_guard: InnerGuard::IrqGuard(irq_guard), + }); + } else if lock == WRITER { + self.lock.fetch_sub(UPGRADEABLE_READER, Release); + } + None + } + + /// Acquire a read lock and spin-wait until it can be acquired. + /// + /// The calling thread will spin-wait until there are no writers or + /// upgrading upreaders present. There is no guarantee for the order + /// in which other readers or writers waiting simultaneously will + /// obtain the lock. + /// + /// This method does not disable interrupts, so any locks related to + /// interrupt context should avoid using this method, and use `read_irq_disabled` + /// instead. When IRQ handlers are allowed to be executed while holding + /// this lock, it is preferable to use this method over the `read_irq_disabled` + /// method as it has a higher efficiency. pub fn read(&self) -> RwLockReadGuard { loop { if let Some(readguard) = self.try_read() { @@ -113,7 +256,18 @@ impl RwLock { } } - /// Acquire a write lock without disabling local IRQs. + /// Acquire a write lock and spin-wait until it can be acquired. + /// + /// The calling thread will spin-wait until there are no other writers, + /// , upreaders or readers present. There is no guarantee for the order + /// in which other readers or writers waiting simultaneously will + /// obtain the lock. + /// + /// This method does not disable interrupts, so any locks related to + /// interrupt context should avoid using this method, and use `write_irq_disabled` + /// instead. When IRQ handlers are allowed to be executed while holding + /// this lock, it is preferable to use this method over the `write_irq_disabled` + /// method as it has a higher efficiency. pub fn write(&self) -> RwLockWriteGuard { loop { if let Some(writeguard) = self.try_write() { @@ -124,11 +278,46 @@ impl RwLock { } } - /// Try acquire a read lock without disabling the local IRQs. + /// Acquire an upreader and spin-wait until it can be acquired. + /// + /// The calling thread will spin-wait until there are no other writers, + /// or upreaders. There is no guarantee for the order in which other + /// readers or writers waiting simultaneously will obtain the lock. + /// + /// Upreader will not block new readers until it tries to upgrade. Upreader + /// and reader do not differ before invoking the upgread method. However, + /// only one upreader can exist at any time to avoid deadlock in the + /// upgread method. + /// + /// This method does not disable interrupts, so any locks related to + /// interrupt context should avoid using this method, and use `upread_irq_disabled` + /// instead. When IRQ handlers are allowed to be executed while holding + /// this lock, it is preferable to use this method over the `upread_irq_disabled` + /// method as it has a higher efficiency. + pub fn upread(&self) -> RwLockUpgradeableGuard { + loop { + if let Some(guard) = self.try_upread() { + return guard; + } else { + core::hint::spin_loop(); + } + } + } + + /// Attempt to acquire a read lock. + /// + /// This function will never spin-wait and will return immediately. + /// + /// This method does not disable interrupts, so any locks related to + /// interrupt context should avoid using this method, and use + /// `try_read_irq_disabled` instead. When IRQ handlers are allowed to + /// be executed while holding this lock, it is preferable to use this + /// method over the `try_read_irq_disabled` method as it has a higher + /// efficiency. pub fn try_read(&self) -> Option> { let guard = disable_preempt(); let lock = self.lock.fetch_add(READER, Acquire); - if lock & (WRITER | MAX_READER) == 0 { + if lock & (WRITER | MAX_READER | BEING_UPGRADED) == 0 { Some(RwLockReadGuard { inner: self, inner_guard: InnerGuard::PreemptGuard(guard), @@ -139,7 +328,16 @@ impl RwLock { } } - /// Try acquire a write lock without disabling the local IRQs. + /// Attempt to acquire a write lock. + /// + /// This function will never spin-wait and will return immediately. + /// + /// This method does not disable interrupts, so any locks related to + /// interrupt context should avoid using this method, and use + /// `try_write_irq_disabled` instead. When IRQ handlers are allowed to + /// be executed while holding this lock, it is preferable to use this + /// method over the `try_write_irq_disabled` method as it has a higher + /// efficiency. pub fn try_write(&self) -> Option> { let guard = disable_preempt(); if self @@ -155,6 +353,30 @@ impl RwLock { None } } + + /// Attempt to acquire an upread lock. + /// + /// This function will never spin-wait and will return immediately. + /// + /// This method does not disable interrupts, so any locks related to + /// interrupt context should avoid using this method, and use + /// `try_upread_irq_disabled` instead. When IRQ handlers are allowed to + /// be executed while holding this lock, it is preferable to use this + /// method over the `try_upread_irq_disabled` method as it has a higher + /// efficiency. + pub fn try_upread(&self) -> Option> { + let guard = disable_preempt(); + let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire) & (WRITER | UPGRADEABLE_READER); + if lock == 0 { + return Some(RwLockUpgradeableGuard { + inner: self, + inner_guard: InnerGuard::PreemptGuard(guard), + }); + } else if lock == WRITER { + self.lock.fetch_sub(UPGRADEABLE_READER, Release); + } + None + } } impl fmt::Debug for RwLock { @@ -174,43 +396,20 @@ unsafe impl Sync for RwLockWriteGuard<'_, T> {} impl<'a, T> !Send for RwLockReadGuard<'a, T> {} unsafe impl Sync for RwLockReadGuard<'_, T> {} +impl<'a, T> !Send for RwLockUpgradeableGuard<'a, T> {} +unsafe impl Sync for RwLockUpgradeableGuard<'_, T> {} + enum InnerGuard { IrqGuard(DisabledLocalIrqGuard), PreemptGuard(DisablePreemptGuard), } -/// The guard of the read lock. +/// A guard that provides immutable data access. pub struct RwLockReadGuard<'a, T> { inner: &'a RwLock, inner_guard: InnerGuard, } -/// Upgrade a read lock to a write lock. -/// -/// This method first release the old read lock and then aquire a new write lock. -/// So it may not return the write guard immidiately -/// due to other readers or another writer. -impl<'a, T> RwLockReadGuard<'a, T> { - pub fn upgrade(mut self) -> RwLockWriteGuard<'a, T> { - let inner = self.inner; - let inner_guard = match &mut self.inner_guard { - InnerGuard::IrqGuard(irq_guard) => InnerGuard::IrqGuard(irq_guard.transfer_to()), - InnerGuard::PreemptGuard(preempt_guard) => { - InnerGuard::PreemptGuard(preempt_guard.transfer_to()) - } - }; - drop(self); - while inner - .lock - .compare_exchange(0, WRITER, Acquire, Relaxed) - .is_err() - { - core::hint::spin_loop(); - } - RwLockWriteGuard { inner, inner_guard } - } -} - impl<'a, T> Deref for RwLockReadGuard<'a, T> { type Target = T; @@ -231,28 +430,12 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockReadGuard<'a, T> { } } +/// A guard that provides mutable data access. pub struct RwLockWriteGuard<'a, T> { inner: &'a RwLock, inner_guard: InnerGuard, } -/// Downgrade a write lock to a read lock. -/// -/// This method can return the read guard immidiately -/// due to there are no other users. -impl<'a, T> RwLockWriteGuard<'a, T> { - pub fn downgrade(mut self) -> RwLockReadGuard<'a, T> { - self.inner.lock.fetch_add(READER, Acquire); - let inner = self.inner; - let inner_guard = match &mut self.inner_guard { - InnerGuard::IrqGuard(irq_guard) => InnerGuard::IrqGuard(irq_guard.transfer_to()), - InnerGuard::PreemptGuard(preempt_guard) => InnerGuard::PreemptGuard(disable_preempt()), - }; - drop(self); - RwLockReadGuard { inner, inner_guard } - } -} - impl<'a, T> Deref for RwLockWriteGuard<'a, T> { type Target = T; @@ -269,7 +452,7 @@ impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> { impl<'a, T> Drop for RwLockWriteGuard<'a, T> { fn drop(&mut self) { - self.inner.lock.fetch_and(!(WRITER), Release); + self.inner.lock.fetch_and(!WRITER, Release); } } @@ -278,3 +461,71 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'a, T> { fmt::Debug::fmt(&**self, f) } } + +/// A guard that provides immutable data access but can be atomically +/// upgraded to `RwLockWriteGuard`. +pub struct RwLockUpgradeableGuard<'a, T> { + inner: &'a RwLock, + inner_guard: InnerGuard, +} + +impl<'a, T> RwLockUpgradeableGuard<'a, T> { + /// Upgrade this upread guard to a write guard atomically. + /// + /// After calling this method, subsequent readers will be blocked + /// while previous readers remain unaffected. The calling thread + /// will spin-wait until previous readers finish. + pub fn upgrade(mut self) -> RwLockWriteGuard<'a, T> { + self.inner.lock.fetch_or(BEING_UPGRADED, Acquire); + loop { + self = match self.try_upgrade() { + Ok(guard) => return guard, + Err(e) => e, + }; + } + } + /// Attempts to upgrade this upread guard to a write guard atomically. + /// + /// This function will never spin-wait and will return immediately. + pub fn try_upgrade(mut self) -> Result, Self> { + let inner = self.inner; + let res = self.inner.lock.compare_exchange( + UPGRADEABLE_READER | BEING_UPGRADED, + WRITER | UPGRADEABLE_READER, + AcqRel, + Relaxed, + ); + if res.is_ok() { + let inner_guard = match &mut self.inner_guard { + InnerGuard::IrqGuard(irq_guard) => InnerGuard::IrqGuard(irq_guard.transfer_to()), + InnerGuard::PreemptGuard(preempt_guard) => { + InnerGuard::PreemptGuard(preempt_guard.transfer_to()) + } + }; + drop(self); + Ok(RwLockWriteGuard { inner, inner_guard }) + } else { + Err(self) + } + } +} + +impl<'a, T> Deref for RwLockUpgradeableGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.inner.val.get() } + } +} + +impl<'a, T> Drop for RwLockUpgradeableGuard<'a, T> { + fn drop(&mut self) { + self.inner.lock.fetch_sub(UPGRADEABLE_READER, Release); + } +} + +impl<'a, T: fmt::Debug> fmt::Debug for RwLockUpgradeableGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} diff --git a/framework/jinux-frame/src/sync/rwmutex.rs b/framework/jinux-frame/src/sync/rwmutex.rs index 789a5fa05..573e5e791 100644 --- a/framework/jinux-frame/src/sync/rwmutex.rs +++ b/framework/jinux-frame/src/sync/rwmutex.rs @@ -2,29 +2,102 @@ use core::cell::UnsafeCell; use core::fmt; use core::ops::{Deref, DerefMut}; use core::sync::atomic::AtomicUsize; -use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use super::WaitQueue; -/// A read/write lock based on blocking, which is named `RwMutex`. +/// A mutex that provides data access to either one writer or many readers. +/// +/// # Overview +/// +/// This mutex allows for multiple readers, or at most one writer to access +/// at any point in time. The writer of this mutex has exclusive access to +/// modify the underlying data, while the readers are allowed shared and +/// read-only access. +/// +/// The writing and reading portions cannot be active simultaneously, when +/// one portion is in progress, the other portion will sleep. This is +/// suitable for scenarios where the mutex is expected to be held for a +/// period of time, which can avoid wasting CPU resources. +/// +/// This implementation provides the upgradeable read mutex (`upread mutex`). +/// The `upread mutex` can be upgraded to write mutex atomically, useful in +/// scenarios where a decision to write is made after reading. +/// +/// The type parameter `T` represents the data that this mutex is protecting. +/// It is necessary for `T` to satisfy `Send` to be shared across tasks and +/// `Sync` to permit concurrent access via readers. The `Deref` method (and +/// `DerefMut` for the writer) is implemented for the RAII guards returned +/// by the locking methods, which allows for the access to the protected data +/// while the mutex is held. +/// +/// # Usage +/// The mutex can be used in scenarios where data needs to be read frequently +/// but written to occasionally. +/// +/// Use `upread mutex` in scenarios where related checking is performed before +/// modification to effectively avoid deadlocks and improve efficiency. +/// +/// # Safety +/// +/// Avoid using `RwMutex` in an interrupt context, as it may result in sleeping +/// and never being awakened. +/// +/// # Examples /// /// ``` -/// Now, the mutex's layout is simply like: -/// bit: 63 | 62 ~ 0 -/// use: writer mutex | reader mutex & numbers +/// use jinux_frame::sync::RwMutex; +/// +/// let mutex = RwMutex::new(5) +/// +/// // many read mutexs can be held at once +/// { +/// let r1 = mutex.read(); +/// let r2 = mutex.read(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// +/// // Upgradeable read mutex can share access to data with read mutexs +/// let r3 = mutex.upread(); +/// assert_eq!(*r3, 5); +/// drop(r1); +/// drop(r2); +/// // read mutexs are dropped at this point +/// +/// // An upread mutex can only be upgraded successfully after all the +/// // read mutexs are released, otherwise it will spin-wait. +/// let mut w1 = r3.upgrade(); +/// *w1 += 1; +/// assert_eq!(*w1, 6); +/// } // upread mutex are dropped at this point +/// +/// { +/// // Only one write mutex can be held at a time +/// let mut w2 = mutex.write(); +/// *w2 += 1; +/// assert_eq!(*w2, 7); +/// } // write mutex is dropped at this point /// ``` pub struct RwMutex { val: UnsafeCell, + /// The internal representation of the mutex state is as follows: + /// - **Bit 63:** Writer mutex. + /// - **Bit 62:** Upgradeable reader mutex. + /// - **Bit 61:** Indicates if an upgradeable reader is being upgraded. + /// - **Bits 60-0:** Reader mutex count. lock: AtomicUsize, + /// Threads that fail to acquire the mutex will sleep on this waitqueue. queue: WaitQueue, } const READER: usize = 1; const WRITER: usize = 1 << (usize::BITS - 1); -const MAX_READER: usize = WRITER >> 1; +const UPGRADEABLE_READER: usize = 1 << (usize::BITS - 2); +const BEING_UPGRADED: usize = 1 << (usize::BITS - 3); +const MAX_READER: usize = 1 << (usize::BITS - 4); impl RwMutex { - /// Creates a new `RwMutex`. + /// Creates a new read-write mutex with an initial value. pub const fn new(val: T) -> Self { Self { val: UnsafeCell::new(val), @@ -33,20 +106,46 @@ impl RwMutex { } } - /// Acquire a read mutex, and if there is a writer, this thread will sleep in the wait queue. + /// Acquire a read mutex and sleep until it can be acquired. + /// + /// The calling thread will sleep until there are no writers or upgrading + /// upreaders present. The implementation of `WaitQueue` guarantees the + /// order in which other concurrent readers or writers waiting simultaneously + /// will acquire the mutex. pub fn read(&self) -> RwMutexReadGuard { self.queue.wait_until(|| self.try_read()) } - /// Acquire a write mutex, and if there is another writer or other readers, this thread will sleep in the wait queue. + /// Acquire a write mutex and sleep until it can be acquired. + /// + /// The calling thread will sleep until there are no writers, upreaders, + /// or readers present. The implementation of `WaitQueue` guarantees the + /// order in which other concurrent readers or writers waiting simultaneously + /// will acquire the mutex. pub fn write(&self) -> RwMutexWriteGuard { self.queue.wait_until(|| self.try_write()) } - /// Try acquire a read mutex and return immediately if it fails. + /// Acquire a upread mutex and sleep until it can be acquired. + /// + /// The calling thread will sleep until there are no writers or upreaders present. + /// The implementation of `WaitQueue` guarantees the order in which other concurrent + /// readers or writers waiting simultaneously will acquire the mutex. + /// + /// Upreader will not block new readers until it tries to upgrade. Upreader + /// and reader do not differ before invoking the upgread method. However, + /// only one upreader can exist at any time to avoid deadlock in the + /// upgread method. + pub fn upread(&self) -> RwMutexUpgradeableGuard { + self.queue.wait_until(|| self.try_upread()) + } + + /// Attempt to acquire a read mutex. + /// + /// This function will never sleep and will return immediately. pub fn try_read(&self) -> Option> { let lock = self.lock.fetch_add(READER, Acquire); - if lock & (WRITER | MAX_READER) == 0 { + if lock & (WRITER | BEING_UPGRADED | MAX_READER) == 0 { Some(RwMutexReadGuard { inner: self }) } else { self.lock.fetch_sub(READER, Release); @@ -54,7 +153,9 @@ impl RwMutex { } } - /// Try acquire a write mutex and return immediately if it fails. + /// Attempt to acquire a write mutex. + /// + /// This function will never sleep and will return immediately. pub fn try_write(&self) -> Option> { if self .lock @@ -66,6 +167,19 @@ impl RwMutex { None } } + + /// Attempt to acquire a upread mutex. + /// + /// This function will never sleep and will return immediately. + pub fn try_upread(&self) -> Option> { + let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire) & (WRITER | UPGRADEABLE_READER); + if lock == 0 { + return Some(RwMutexUpgradeableGuard { inner: self }); + } else if lock == WRITER { + self.lock.fetch_sub(UPGRADEABLE_READER, Release); + } + None + } } impl fmt::Debug for RwMutex { @@ -85,23 +199,14 @@ unsafe impl Sync for RwMutexWriteGuard<'_, T> {} impl<'a, T> !Send for RwMutexReadGuard<'a, T> {} unsafe impl Sync for RwMutexReadGuard<'_, T> {} -/// The guards of `RwMutex`. +impl<'a, T> !Send for RwMutexUpgradeableGuard<'a, T> {} +unsafe impl Sync for RwMutexUpgradeableGuard<'_, T> {} + +/// A guard that provides immutable data access. pub struct RwMutexReadGuard<'a, T> { inner: &'a RwMutex, } -/// Upgrade a read mutex to a write mutex. -/// -/// This method first release the old read mutex and then aquire a new write mutex. -/// So it may sleep while acquireing the write mutex. -impl<'a, T> RwMutexReadGuard<'a, T> { - pub fn upgrade(self) -> RwMutexWriteGuard<'a, T> { - let inner = self.inner; - drop(self); - inner.write() - } -} - impl<'a, T> Deref for RwMutexReadGuard<'a, T> { type Target = T; @@ -110,28 +215,20 @@ impl<'a, T> Deref for RwMutexReadGuard<'a, T> { } } -/// When there are no readers, wake up a waiting writer. impl<'a, T> Drop for RwMutexReadGuard<'a, T> { fn drop(&mut self) { - if self.inner.lock.fetch_sub(READER, Release) == 1 { + // When there are no readers, wake up a waiting writer. + if self.inner.lock.fetch_sub(READER, Release) == READER { self.inner.queue.wake_one(); } } } +/// A guard that provides mutable data access. pub struct RwMutexWriteGuard<'a, T> { inner: &'a RwMutex, } -impl<'a, T> RwMutexWriteGuard<'a, T> { - pub fn downgrade(self) -> RwMutexReadGuard<'a, T> { - self.inner.lock.fetch_add(READER, Acquire); - let inner = self.inner; - drop(self); - RwMutexReadGuard { inner } - } -} - impl<'a, T> Deref for RwMutexWriteGuard<'a, T> { type Target = T; @@ -146,14 +243,77 @@ impl<'a, T> DerefMut for RwMutexWriteGuard<'a, T> { } } -/// When the current writer releases, wake up all the sleeping threads. impl<'a, T> Drop for RwMutexWriteGuard<'a, T> { fn drop(&mut self) { - self.inner.lock.fetch_and(!(WRITER), Release); + self.inner.lock.fetch_and(!WRITER, Release); + // When the current writer releases, wake up all the sleeping threads. // All awakened threads may include readers and writers. // Thanks to the `wait_until` method, either all readers // continue to execute or one writer continues to execute. self.inner.queue.wake_all(); } } + +/// A guard that provides immutable data access but can be atomically +/// upgraded to `RwMutexWriteGuard`. +pub struct RwMutexUpgradeableGuard<'a, T> { + inner: &'a RwMutex, +} + +impl<'a, T> RwMutexUpgradeableGuard<'a, T> { + /// Upgrade this upread guard to a write guard atomically. + /// + /// After calling this method, subsequent readers will be blocked + /// while previous readers remain unaffected. + /// + /// The calling thread will not sleep, but spin to wait for the existing + /// reader to be released. There are two main reasons. + /// - First, it needs to sleep in an extra waiting queue and needs extra wake-up logic and overhead. + /// - Second, upgrading method usually requires a high response time (because the mutex is being used now). + pub fn upgrade(mut self) -> RwMutexWriteGuard<'a, T> { + self.inner.lock.fetch_or(BEING_UPGRADED, Acquire); + loop { + self = match self.try_upgrade() { + Ok(guard) => return guard, + Err(e) => e, + }; + } + } + + /// Attempts to upgrade this upread guard to a write guard atomically. + /// + /// This function will return immediately. + pub fn try_upgrade(self) -> Result, Self> { + let inner = self.inner; + let res = self.inner.lock.compare_exchange( + UPGRADEABLE_READER | BEING_UPGRADED, + WRITER | UPGRADEABLE_READER, + AcqRel, + Relaxed, + ); + if res.is_ok() { + drop(self); + Ok(RwMutexWriteGuard { inner }) + } else { + Err(self) + } + } +} + +impl<'a, T> Deref for RwMutexUpgradeableGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.inner.val.get() } + } +} + +impl<'a, T> Drop for RwMutexUpgradeableGuard<'a, T> { + fn drop(&mut self) { + let res = self.inner.lock.fetch_sub(UPGRADEABLE_READER, Release); + if res == 0 { + self.inner.queue.wake_all(); + } + } +} diff --git a/services/libs/jinux-std/src/fs/ramfs/fs.rs b/services/libs/jinux-std/src/fs/ramfs/fs.rs index d7bdc01e2..9d80d7d84 100644 --- a/services/libs/jinux-std/src/fs/ramfs/fs.rs +++ b/services/libs/jinux-std/src/fs/ramfs/fs.rs @@ -488,7 +488,7 @@ impl Inode for RamInode { return device.write(buf); } - let self_inode = self.0.read(); + let self_inode = self.0.upread(); let Some(page_cache) = self_inode.inner.as_file() else { return_errno_with_message!(Errno::EISDIR, "write is not supported"); };