mirror of
https://github.com/asterinas/asterinas.git
synced 2025-06-23 01:13:23 +00:00
Expose head
/tail
of RingBuffer
This commit is contained in:
committed by
Jianfeng Jiang
parent
a3c5ab8cb4
commit
bcf1e69878
@ -2,11 +2,11 @@
|
|||||||
|
|
||||||
use core::{
|
use core::{
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
|
num::Wrapping,
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
sync::atomic::{AtomicUsize, Ordering},
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
};
|
};
|
||||||
|
|
||||||
use align_ext::AlignExt;
|
|
||||||
use inherit_methods_macro::inherit_methods;
|
use inherit_methods_macro::inherit_methods;
|
||||||
use ostd::mm::{FrameAllocOptions, Segment, UntypedMem, VmIo};
|
use ostd::mm::{FrameAllocOptions, Segment, UntypedMem, VmIo};
|
||||||
|
|
||||||
@ -50,7 +50,6 @@ pub struct RingBuffer<T> {
|
|||||||
capacity: usize,
|
capacity: usize,
|
||||||
tail: AtomicUsize,
|
tail: AtomicUsize,
|
||||||
head: AtomicUsize,
|
head: AtomicUsize,
|
||||||
len: AtomicUsize,
|
|
||||||
phantom: PhantomData<T>,
|
phantom: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,17 +76,21 @@ impl<T> RingBuffer<T> {
|
|||||||
capacity.is_power_of_two(),
|
capacity.is_power_of_two(),
|
||||||
"capacity must be a power of two"
|
"capacity must be a power of two"
|
||||||
);
|
);
|
||||||
let nframes = capacity.saturating_mul(Self::T_SIZE).align_up(PAGE_SIZE) / PAGE_SIZE;
|
|
||||||
|
let nframes = capacity
|
||||||
|
.checked_mul(Self::T_SIZE)
|
||||||
|
.unwrap()
|
||||||
|
.div_ceil(PAGE_SIZE);
|
||||||
let segment = FrameAllocOptions::new()
|
let segment = FrameAllocOptions::new()
|
||||||
.zeroed(false)
|
.zeroed(false)
|
||||||
.alloc_segment(nframes)
|
.alloc_segment(nframes)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
segment,
|
segment,
|
||||||
capacity,
|
capacity,
|
||||||
tail: AtomicUsize::new(0),
|
tail: AtomicUsize::new(0),
|
||||||
head: AtomicUsize::new(0),
|
head: AtomicUsize::new(0),
|
||||||
len: AtomicUsize::new(0),
|
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -110,11 +113,6 @@ impl<T> RingBuffer<T> {
|
|||||||
self.capacity
|
self.capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the number of items in the `RingBuffer`.
|
|
||||||
pub fn len(&self) -> usize {
|
|
||||||
self.len.load(Ordering::Acquire)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Checks if the `RingBuffer` is empty.
|
/// Checks if the `RingBuffer` is empty.
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
self.len() == 0
|
self.len() == 0
|
||||||
@ -122,13 +120,44 @@ impl<T> RingBuffer<T> {
|
|||||||
|
|
||||||
/// Checks if the `RingBuffer` is full.
|
/// Checks if the `RingBuffer` is full.
|
||||||
pub fn is_full(&self) -> bool {
|
pub fn is_full(&self) -> bool {
|
||||||
self.len() == self.capacity
|
self.free_len() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the number of items in the `RingBuffer`.
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
// Implementation notes: This subtraction only makes sense if either the head or the tail
|
||||||
|
// is considered frozen; if both are volatile, the number of the items may become negative
|
||||||
|
// due to race conditions. This is always true with a `RingBuffer` or a pair of
|
||||||
|
// `RbProducer` and `RbConsumer`.
|
||||||
|
(self.tail() - self.head()).0
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the number of free items in the `RingBuffer`.
|
/// Gets the number of free items in the `RingBuffer`.
|
||||||
fn free_len(&self) -> usize {
|
pub fn free_len(&self) -> usize {
|
||||||
self.capacity - self.len()
|
self.capacity - self.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the head number of the `RingBuffer`.
|
||||||
|
///
|
||||||
|
/// This is the number of items read from the ring buffer. The number wraps when crossing
|
||||||
|
/// [`usize`] boundaries.
|
||||||
|
pub fn head(&self) -> Wrapping<usize> {
|
||||||
|
Wrapping(self.head.load(Ordering::Acquire))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the tail number of the `RingBuffer`.
|
||||||
|
///
|
||||||
|
/// This is the number of items written into the ring buffer. The number wraps when crossing
|
||||||
|
/// [`usize`] boundaries.
|
||||||
|
pub fn tail(&self) -> Wrapping<usize> {
|
||||||
|
Wrapping(self.tail.load(Ordering::Acquire))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clears the `RingBuffer`.
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.tail.store(0, Ordering::Release);
|
||||||
|
self.head.store(0, Ordering::Release);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Pod> RingBuffer<T> {
|
impl<T: Pod> RingBuffer<T> {
|
||||||
@ -180,33 +209,14 @@ impl<T: Pod> RingBuffer<T> {
|
|||||||
consumer.pop_slice(items)
|
consumer.pop_slice(items)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clears the `RingBuffer`.
|
pub(self) fn advance_tail(&self, mut tail: Wrapping<usize>, len: usize) {
|
||||||
pub fn clear(&mut self) {
|
tail += len;
|
||||||
self.tail.store(0, Ordering::Relaxed);
|
self.tail.store(tail.0, Ordering::Release);
|
||||||
self.head.store(0, Ordering::Relaxed);
|
|
||||||
self.len.store(0, Ordering::Release);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tail(&self) -> usize {
|
pub(self) fn advance_head(&self, mut head: Wrapping<usize>, len: usize) {
|
||||||
self.tail.load(Ordering::Acquire)
|
head += len;
|
||||||
}
|
self.head.store(head.0, Ordering::Release);
|
||||||
|
|
||||||
fn head(&self) -> usize {
|
|
||||||
self.head.load(Ordering::Acquire)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn advance_tail(&self, curr_pos: usize, len: usize) {
|
|
||||||
let next_pos = (curr_pos + len) & (self.capacity - 1);
|
|
||||||
self.tail.store(next_pos, Ordering::Release);
|
|
||||||
|
|
||||||
self.len.fetch_add(len, Ordering::Release);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn advance_head(&self, curr_pos: usize, len: usize) {
|
|
||||||
let next_pos = (curr_pos + len) & (self.capacity - 1);
|
|
||||||
self.head.store(next_pos, Ordering::Release);
|
|
||||||
|
|
||||||
self.len.fetch_sub(len, Ordering::Release);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,11 +259,11 @@ impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let tail = rb.tail();
|
let tail = rb.tail();
|
||||||
debug_assert!(tail < rb.capacity);
|
let offset = tail.0 & (rb.capacity - 1);
|
||||||
|
let byte_offset = offset * Self::T_SIZE;
|
||||||
|
|
||||||
let segment_offset = tail * Self::T_SIZE;
|
|
||||||
let mut writer = rb.segment.writer();
|
let mut writer = rb.segment.writer();
|
||||||
writer.skip(segment_offset);
|
writer.skip(byte_offset);
|
||||||
writer.write_val(&item).unwrap();
|
writer.write_val(&item).unwrap();
|
||||||
|
|
||||||
rb.advance_tail(tail, 1);
|
rb.advance_tail(tail, 1);
|
||||||
@ -265,27 +275,26 @@ impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
|
|||||||
/// Returns `Some` on success, all items are pushed to the ring buffer.
|
/// Returns `Some` on success, all items are pushed to the ring buffer.
|
||||||
/// Returns `None` if the ring buffer is full or cannot fit all items.
|
/// Returns `None` if the ring buffer is full or cannot fit all items.
|
||||||
pub fn push_slice(&mut self, items: &[T]) -> Option<()> {
|
pub fn push_slice(&mut self, items: &[T]) -> Option<()> {
|
||||||
let nitems = items.len();
|
|
||||||
let rb = &self.rb;
|
let rb = &self.rb;
|
||||||
let free_len = rb.free_len();
|
let nitems = items.len();
|
||||||
if free_len < nitems {
|
if rb.free_len() < nitems {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
let tail = rb.tail();
|
let tail = rb.tail();
|
||||||
debug_assert!(tail < rb.capacity);
|
let offset = tail.0 & (rb.capacity - 1);
|
||||||
let segment_offset = tail * Self::T_SIZE;
|
let byte_offset = offset * Self::T_SIZE;
|
||||||
|
|
||||||
if tail + nitems > rb.capacity {
|
if offset + nitems > rb.capacity {
|
||||||
// Write into two separate parts
|
// Write into two separate parts
|
||||||
rb.segment
|
rb.segment
|
||||||
.write_slice(segment_offset, &items[..rb.capacity - tail])
|
.write_slice(byte_offset, &items[..rb.capacity - offset])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
rb.segment
|
rb.segment
|
||||||
.write_slice(0, &items[rb.capacity - tail..])
|
.write_slice(0, &items[rb.capacity - offset..])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
} else {
|
} else {
|
||||||
rb.segment.write_slice(segment_offset, items).unwrap();
|
rb.segment.write_slice(byte_offset, items).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
rb.advance_tail(tail, nitems);
|
rb.advance_tail(tail, nitems);
|
||||||
@ -300,25 +309,26 @@ impl<R: Deref<Target = RingBuffer<u8>>> Producer<u8, R> {
|
|||||||
pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result<usize> {
|
pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result<usize> {
|
||||||
let rb = &self.rb;
|
let rb = &self.rb;
|
||||||
let free_len = rb.free_len();
|
let free_len = rb.free_len();
|
||||||
if free_len == 0 {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
let write_len = reader.sum_lens().min(free_len);
|
|
||||||
|
|
||||||
let tail = rb.tail();
|
let tail = rb.tail();
|
||||||
let write_len = if tail + write_len > rb.capacity {
|
let offset = tail.0 & (rb.capacity - 1);
|
||||||
|
|
||||||
|
let write_len = if offset + free_len > rb.capacity {
|
||||||
// Write into two separate parts
|
// Write into two separate parts
|
||||||
let mut writer = rb.segment.writer();
|
let mut write_len = 0;
|
||||||
writer.skip(tail).limit(rb.capacity - tail);
|
|
||||||
let mut len = reader.read(&mut writer)?;
|
|
||||||
|
|
||||||
let mut writer = rb.segment.writer();
|
let mut writer = rb.segment.writer();
|
||||||
writer.limit(write_len - (rb.capacity - tail));
|
writer.skip(offset).limit(rb.capacity - offset);
|
||||||
len += reader.read(&mut writer)?;
|
write_len += reader.read(&mut writer)?;
|
||||||
len
|
|
||||||
|
let mut writer = rb.segment.writer();
|
||||||
|
writer.limit(free_len - (rb.capacity - offset));
|
||||||
|
write_len += reader.read(&mut writer)?;
|
||||||
|
|
||||||
|
write_len
|
||||||
} else {
|
} else {
|
||||||
let mut writer = rb.segment.writer();
|
let mut writer = rb.segment.writer();
|
||||||
writer.skip(tail).limit(write_len);
|
writer.skip(offset).limit(free_len);
|
||||||
reader.read(&mut writer)?
|
reader.read(&mut writer)?
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -334,6 +344,8 @@ impl<T, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
|
|||||||
pub fn is_full(&self) -> bool;
|
pub fn is_full(&self) -> bool;
|
||||||
pub fn len(&self) -> usize;
|
pub fn len(&self) -> usize;
|
||||||
pub fn free_len(&self) -> usize;
|
pub fn free_len(&self) -> usize;
|
||||||
|
pub fn head(&self) -> Wrapping<usize>;
|
||||||
|
pub fn tail(&self) -> Wrapping<usize>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
|
impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
|
||||||
@ -350,11 +362,11 @@ impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let head = rb.head();
|
let head = rb.head();
|
||||||
debug_assert!(head < rb.capacity);
|
let offset = head.0 & (rb.capacity - 1);
|
||||||
|
let byte_offset = offset * Self::T_SIZE;
|
||||||
|
|
||||||
let segment_offset = head * Self::T_SIZE;
|
|
||||||
let mut reader = rb.segment.reader();
|
let mut reader = rb.segment.reader();
|
||||||
reader.skip(segment_offset);
|
reader.skip(byte_offset);
|
||||||
let item = reader.read_val::<T>().unwrap();
|
let item = reader.read_val::<T>().unwrap();
|
||||||
|
|
||||||
rb.advance_head(head, 1);
|
rb.advance_head(head, 1);
|
||||||
@ -366,26 +378,26 @@ impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
|
|||||||
/// Returns `Some` on success, all items are popped from the ring buffer.
|
/// Returns `Some` on success, all items are popped from the ring buffer.
|
||||||
/// Returns `None` if the ring buffer is empty or cannot fill all items.
|
/// Returns `None` if the ring buffer is empty or cannot fill all items.
|
||||||
pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> {
|
pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> {
|
||||||
let nitems = items.len();
|
|
||||||
let rb = &self.rb;
|
let rb = &self.rb;
|
||||||
if nitems > rb.len() {
|
let nitems = items.len();
|
||||||
|
if rb.len() < nitems {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
let head = rb.head();
|
let head = rb.head();
|
||||||
debug_assert!(head < rb.capacity);
|
let offset = head.0 & (rb.capacity - 1);
|
||||||
|
let byte_offset = offset * Self::T_SIZE;
|
||||||
|
|
||||||
let segment_offset = head * Self::T_SIZE;
|
if offset + nitems > rb.capacity {
|
||||||
if head + nitems > rb.capacity {
|
|
||||||
// Read from two separate parts
|
// Read from two separate parts
|
||||||
rb.segment
|
rb.segment
|
||||||
.read_slice(segment_offset, &mut items[..rb.capacity - head])
|
.read_slice(byte_offset, &mut items[..rb.capacity - offset])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
rb.segment
|
rb.segment
|
||||||
.read_slice(0, &mut items[rb.capacity - head..])
|
.read_slice(0, &mut items[rb.capacity - offset..])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
} else {
|
} else {
|
||||||
rb.segment.read_slice(segment_offset, items).unwrap();
|
rb.segment.read_slice(byte_offset, items).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
rb.advance_head(head, nitems);
|
rb.advance_head(head, nitems);
|
||||||
@ -400,25 +412,26 @@ impl<R: Deref<Target = RingBuffer<u8>>> Consumer<u8, R> {
|
|||||||
pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize> {
|
pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize> {
|
||||||
let rb = &self.rb;
|
let rb = &self.rb;
|
||||||
let len = rb.len();
|
let len = rb.len();
|
||||||
if len == 0 {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
let read_len = writer.sum_lens().min(len);
|
|
||||||
|
|
||||||
let head = rb.head();
|
let head = rb.head();
|
||||||
let read_len = if head + read_len > rb.capacity {
|
let offset = head.0 & (rb.capacity - 1);
|
||||||
|
|
||||||
|
let read_len = if offset + len > rb.capacity {
|
||||||
// Read from two separate parts
|
// Read from two separate parts
|
||||||
let mut reader = rb.segment.reader();
|
let mut read_len = 0;
|
||||||
reader.skip(head).limit(rb.capacity - head);
|
|
||||||
let mut len = writer.write(&mut reader)?;
|
|
||||||
|
|
||||||
let mut reader = rb.segment.reader();
|
let mut reader = rb.segment.reader();
|
||||||
reader.limit(read_len - (rb.capacity - head));
|
reader.skip(offset).limit(rb.capacity - offset);
|
||||||
len += writer.write(&mut reader)?;
|
read_len += writer.write(&mut reader)?;
|
||||||
len
|
|
||||||
|
let mut reader = rb.segment.reader();
|
||||||
|
reader.limit(len - (rb.capacity - offset));
|
||||||
|
read_len += writer.write(&mut reader)?;
|
||||||
|
|
||||||
|
read_len
|
||||||
} else {
|
} else {
|
||||||
let mut reader = rb.segment.reader();
|
let mut reader = rb.segment.reader();
|
||||||
reader.skip(head).limit(read_len);
|
reader.skip(offset).limit(len);
|
||||||
writer.write(&mut reader)?
|
writer.write(&mut reader)?
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -434,6 +447,8 @@ impl<T, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
|
|||||||
pub fn is_full(&self) -> bool;
|
pub fn is_full(&self) -> bool;
|
||||||
pub fn len(&self) -> usize;
|
pub fn len(&self) -> usize;
|
||||||
pub fn free_len(&self) -> usize;
|
pub fn free_len(&self) -> usize;
|
||||||
|
pub fn head(&self) -> Wrapping<usize>;
|
||||||
|
pub fn tail(&self) -> Wrapping<usize>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(ktest)]
|
#[cfg(ktest)]
|
||||||
|
Reference in New Issue
Block a user