From bcf1e69878540e81ab15d79eee2f81f1f0787bba Mon Sep 17 00:00:00 2001 From: Ruihan Li Date: Mon, 16 Jun 2025 01:00:24 +0800 Subject: [PATCH] Expose `head`/`tail` of `RingBuffer` --- kernel/src/util/ring_buffer.rs | 181 ++++++++++++++++++--------------- 1 file changed, 98 insertions(+), 83 deletions(-) diff --git a/kernel/src/util/ring_buffer.rs b/kernel/src/util/ring_buffer.rs index e2fa75192..92c38b13a 100644 --- a/kernel/src/util/ring_buffer.rs +++ b/kernel/src/util/ring_buffer.rs @@ -2,11 +2,11 @@ use core::{ marker::PhantomData, + num::Wrapping, ops::Deref, sync::atomic::{AtomicUsize, Ordering}, }; -use align_ext::AlignExt; use inherit_methods_macro::inherit_methods; use ostd::mm::{FrameAllocOptions, Segment, UntypedMem, VmIo}; @@ -50,7 +50,6 @@ pub struct RingBuffer { capacity: usize, tail: AtomicUsize, head: AtomicUsize, - len: AtomicUsize, phantom: PhantomData, } @@ -77,17 +76,21 @@ impl RingBuffer { capacity.is_power_of_two(), "capacity must be a power of two" ); - let nframes = capacity.saturating_mul(Self::T_SIZE).align_up(PAGE_SIZE) / PAGE_SIZE; + + let nframes = capacity + .checked_mul(Self::T_SIZE) + .unwrap() + .div_ceil(PAGE_SIZE); let segment = FrameAllocOptions::new() .zeroed(false) .alloc_segment(nframes) .unwrap(); + Self { segment, capacity, tail: AtomicUsize::new(0), head: AtomicUsize::new(0), - len: AtomicUsize::new(0), phantom: PhantomData, } } @@ -110,11 +113,6 @@ impl RingBuffer { self.capacity } - /// Gets the number of items in the `RingBuffer`. - pub fn len(&self) -> usize { - self.len.load(Ordering::Acquire) - } - /// Checks if the `RingBuffer` is empty. pub fn is_empty(&self) -> bool { self.len() == 0 @@ -122,13 +120,44 @@ impl RingBuffer { /// Checks if the `RingBuffer` is full. pub fn is_full(&self) -> bool { - self.len() == self.capacity + self.free_len() == 0 + } + + /// Gets the number of items in the `RingBuffer`. + pub fn len(&self) -> usize { + // Implementation notes: This subtraction only makes sense if either the head or the tail + // is considered frozen; if both are volatile, the number of the items may become negative + // due to race conditions. This is always true with a `RingBuffer` or a pair of + // `RbProducer` and `RbConsumer`. + (self.tail() - self.head()).0 } /// Gets the number of free items in the `RingBuffer`. - fn free_len(&self) -> usize { + pub fn free_len(&self) -> usize { self.capacity - self.len() } + + /// Gets the head number of the `RingBuffer`. + /// + /// This is the number of items read from the ring buffer. The number wraps when crossing + /// [`usize`] boundaries. + pub fn head(&self) -> Wrapping { + Wrapping(self.head.load(Ordering::Acquire)) + } + + /// Gets the tail number of the `RingBuffer`. + /// + /// This is the number of items written into the ring buffer. The number wraps when crossing + /// [`usize`] boundaries. + pub fn tail(&self) -> Wrapping { + Wrapping(self.tail.load(Ordering::Acquire)) + } + + /// Clears the `RingBuffer`. + pub fn clear(&mut self) { + self.tail.store(0, Ordering::Release); + self.head.store(0, Ordering::Release); + } } impl RingBuffer { @@ -180,33 +209,14 @@ impl RingBuffer { consumer.pop_slice(items) } - /// Clears the `RingBuffer`. - pub fn clear(&mut self) { - self.tail.store(0, Ordering::Relaxed); - self.head.store(0, Ordering::Relaxed); - self.len.store(0, Ordering::Release); + pub(self) fn advance_tail(&self, mut tail: Wrapping, len: usize) { + tail += len; + self.tail.store(tail.0, Ordering::Release); } - fn tail(&self) -> usize { - self.tail.load(Ordering::Acquire) - } - - fn head(&self) -> usize { - self.head.load(Ordering::Acquire) - } - - fn advance_tail(&self, curr_pos: usize, len: usize) { - let next_pos = (curr_pos + len) & (self.capacity - 1); - self.tail.store(next_pos, Ordering::Release); - - self.len.fetch_add(len, Ordering::Release); - } - - fn advance_head(&self, curr_pos: usize, len: usize) { - let next_pos = (curr_pos + len) & (self.capacity - 1); - self.head.store(next_pos, Ordering::Release); - - self.len.fetch_sub(len, Ordering::Release); + pub(self) fn advance_head(&self, mut head: Wrapping, len: usize) { + head += len; + self.head.store(head.0, Ordering::Release); } } @@ -249,11 +259,11 @@ impl>> Producer { } let tail = rb.tail(); - debug_assert!(tail < rb.capacity); + let offset = tail.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; - let segment_offset = tail * Self::T_SIZE; let mut writer = rb.segment.writer(); - writer.skip(segment_offset); + writer.skip(byte_offset); writer.write_val(&item).unwrap(); rb.advance_tail(tail, 1); @@ -265,27 +275,26 @@ impl>> Producer { /// Returns `Some` on success, all items are pushed to the ring buffer. /// Returns `None` if the ring buffer is full or cannot fit all items. pub fn push_slice(&mut self, items: &[T]) -> Option<()> { - let nitems = items.len(); let rb = &self.rb; - let free_len = rb.free_len(); - if free_len < nitems { + let nitems = items.len(); + if rb.free_len() < nitems { return None; } let tail = rb.tail(); - debug_assert!(tail < rb.capacity); - let segment_offset = tail * Self::T_SIZE; + let offset = tail.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; - if tail + nitems > rb.capacity { + if offset + nitems > rb.capacity { // Write into two separate parts rb.segment - .write_slice(segment_offset, &items[..rb.capacity - tail]) + .write_slice(byte_offset, &items[..rb.capacity - offset]) .unwrap(); rb.segment - .write_slice(0, &items[rb.capacity - tail..]) + .write_slice(0, &items[rb.capacity - offset..]) .unwrap(); } else { - rb.segment.write_slice(segment_offset, items).unwrap(); + rb.segment.write_slice(byte_offset, items).unwrap(); } rb.advance_tail(tail, nitems); @@ -300,25 +309,26 @@ impl>> Producer { pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result { let rb = &self.rb; let free_len = rb.free_len(); - if free_len == 0 { - return Ok(0); - } - let write_len = reader.sum_lens().min(free_len); let tail = rb.tail(); - let write_len = if tail + write_len > rb.capacity { + let offset = tail.0 & (rb.capacity - 1); + + let write_len = if offset + free_len > rb.capacity { // Write into two separate parts - let mut writer = rb.segment.writer(); - writer.skip(tail).limit(rb.capacity - tail); - let mut len = reader.read(&mut writer)?; + let mut write_len = 0; let mut writer = rb.segment.writer(); - writer.limit(write_len - (rb.capacity - tail)); - len += reader.read(&mut writer)?; - len + writer.skip(offset).limit(rb.capacity - offset); + write_len += reader.read(&mut writer)?; + + let mut writer = rb.segment.writer(); + writer.limit(free_len - (rb.capacity - offset)); + write_len += reader.read(&mut writer)?; + + write_len } else { let mut writer = rb.segment.writer(); - writer.skip(tail).limit(write_len); + writer.skip(offset).limit(free_len); reader.read(&mut writer)? }; @@ -334,6 +344,8 @@ impl>> Producer { pub fn is_full(&self) -> bool; pub fn len(&self) -> usize; pub fn free_len(&self) -> usize; + pub fn head(&self) -> Wrapping; + pub fn tail(&self) -> Wrapping; } impl>> Consumer { @@ -350,11 +362,11 @@ impl>> Consumer { } let head = rb.head(); - debug_assert!(head < rb.capacity); + let offset = head.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; - let segment_offset = head * Self::T_SIZE; let mut reader = rb.segment.reader(); - reader.skip(segment_offset); + reader.skip(byte_offset); let item = reader.read_val::().unwrap(); rb.advance_head(head, 1); @@ -366,26 +378,26 @@ impl>> Consumer { /// Returns `Some` on success, all items are popped from the ring buffer. /// Returns `None` if the ring buffer is empty or cannot fill all items. pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> { - let nitems = items.len(); let rb = &self.rb; - if nitems > rb.len() { + let nitems = items.len(); + if rb.len() < nitems { return None; } let head = rb.head(); - debug_assert!(head < rb.capacity); + let offset = head.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; - let segment_offset = head * Self::T_SIZE; - if head + nitems > rb.capacity { + if offset + nitems > rb.capacity { // Read from two separate parts rb.segment - .read_slice(segment_offset, &mut items[..rb.capacity - head]) + .read_slice(byte_offset, &mut items[..rb.capacity - offset]) .unwrap(); rb.segment - .read_slice(0, &mut items[rb.capacity - head..]) + .read_slice(0, &mut items[rb.capacity - offset..]) .unwrap(); } else { - rb.segment.read_slice(segment_offset, items).unwrap(); + rb.segment.read_slice(byte_offset, items).unwrap(); } rb.advance_head(head, nitems); @@ -400,25 +412,26 @@ impl>> Consumer { pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result { let rb = &self.rb; let len = rb.len(); - if len == 0 { - return Ok(0); - } - let read_len = writer.sum_lens().min(len); let head = rb.head(); - let read_len = if head + read_len > rb.capacity { + let offset = head.0 & (rb.capacity - 1); + + let read_len = if offset + len > rb.capacity { // Read from two separate parts - let mut reader = rb.segment.reader(); - reader.skip(head).limit(rb.capacity - head); - let mut len = writer.write(&mut reader)?; + let mut read_len = 0; let mut reader = rb.segment.reader(); - reader.limit(read_len - (rb.capacity - head)); - len += writer.write(&mut reader)?; - len + reader.skip(offset).limit(rb.capacity - offset); + read_len += writer.write(&mut reader)?; + + let mut reader = rb.segment.reader(); + reader.limit(len - (rb.capacity - offset)); + read_len += writer.write(&mut reader)?; + + read_len } else { let mut reader = rb.segment.reader(); - reader.skip(head).limit(read_len); + reader.skip(offset).limit(len); writer.write(&mut reader)? }; @@ -434,6 +447,8 @@ impl>> Consumer { pub fn is_full(&self) -> bool; pub fn len(&self) -> usize; pub fn free_len(&self) -> usize; + pub fn head(&self) -> Wrapping; + pub fn tail(&self) -> Wrapping; } #[cfg(ktest)]