Replace all the ring buffers with the new one

This commit is contained in:
Shaowei Song 2024-08-28 06:23:24 +00:00 committed by Tate, Hongliang Tian
parent 923b3704d7
commit e6f465b553
9 changed files with 95 additions and 99 deletions

20
Cargo.lock generated
View File

@ -134,7 +134,6 @@ dependencies = [
"int-to-c-enum",
"log",
"ostd",
"ringbuf",
"smoltcp",
"spin 0.9.8",
]
@ -180,7 +179,6 @@ dependencies = [
"ostd",
"paste",
"rand",
"ringbuf",
"smoltcp",
"spin 0.9.8",
"static_assertions",
@ -414,15 +412,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
dependencies = [
"cfg-if",
]
[[package]]
name = "ctor"
version = "0.1.25"
@ -1257,15 +1246,6 @@ dependencies = [
"bitflags 2.4.1",
]
[[package]]
name = "ringbuf"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "rle-decode-fast"
version = "1.0.3"

View File

@ -49,7 +49,6 @@ tdx-guest = { version = "0.1.7", optional = true }
xmas-elf = "0.8.0"
# data-structures
bitflags = "1.3"
ringbuf = { version = "0.3.2", default-features = false, features = ["alloc"] }
keyable-arc = { path = "libs/keyable-arc" }
# unzip initramfs
libflate = { version = "2", default-features = false }

View File

@ -15,7 +15,6 @@ component = { path = "../../libs/comp-sys/component" }
int-to-c-enum = { path = "../../libs/int-to-c-enum" }
log = "0.4"
ostd = { path = "../../../ostd" }
ringbuf = { version = "0.3.2", default-features = false, features = ["alloc"] }
smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", rev = "dc08e0b", default-features = false, features = [
"alloc",
"log",

View File

@ -2,8 +2,6 @@
use alloc::format;
use ringbuf::{ring_buffer::RbBase, HeapRb, Rb};
use crate::{
device::tty::{line_discipline::LineDiscipline, new_job_control_and_ldisc},
events::IoEvents,
@ -20,6 +18,7 @@ use crate::{
signal::{Pollee, Poller},
JobControl, Terminal,
},
util::ring_buffer::RingBuffer,
};
const BUFFER_CAPACITY: usize = 4096;
@ -32,7 +31,7 @@ pub struct PtyMaster {
ptmx: Arc<dyn Inode>,
index: u32,
output: Arc<LineDiscipline>,
input: SpinLock<HeapRb<u8>>,
input: SpinLock<RingBuffer<u8>>,
job_control: Arc<JobControl>,
/// The state of input buffer
pollee: Pollee,
@ -46,7 +45,7 @@ impl PtyMaster {
ptmx,
index,
output: ldisc,
input: SpinLock::new(HeapRb::new(BUFFER_CAPACITY)),
input: SpinLock::new(RingBuffer::new(BUFFER_CAPACITY)),
job_control,
pollee: Pollee::new(IoEvents::OUT),
weak_self: weak_ref.clone(),
@ -89,7 +88,7 @@ impl PtyMaster {
self.output.buffer_len()
}
fn update_state(&self, buf: &HeapRb<u8>) {
fn update_state(&self, buf: &RingBuffer<u8>) {
if buf.is_empty() {
self.pollee.del_events(IoEvents::IN)
} else {
@ -125,10 +124,10 @@ impl FileIo for PtyMaster {
continue;
}
let read_len = input.len().min(read_len);
let mut buf = vec![0u8; read_len];
input.pop_slice(&mut buf);
writer.write_fallible(&mut buf.as_slice().into())?;
let read_len = match input.read_fallible(writer) {
Ok(len) => len,
Err((_, len)) => len,
};
self.update_state(&input);
return Ok(read_len);
}

View File

@ -5,7 +5,6 @@
use alloc::format;
use ostd::trap::{disable_local, in_interrupt_context};
use ringbuf::{ring_buffer::RbBase, Rb, StaticRb};
use super::termio::{KernelTermios, WinSize, CC_C_CHAR};
use crate::{
@ -17,6 +16,7 @@ use crate::{
Pollee, Poller,
},
thread::work_queue::{submit_work_item, work_item::WorkItem, WorkPriority},
util::ring_buffer::RingBuffer,
};
// This implementation refers the implementation of linux
@ -30,7 +30,7 @@ pub struct LineDiscipline {
/// current line
current_line: SpinLock<CurrentLine>,
/// The read buffer
read_buffer: SpinLock<StaticRb<u8, BUFFER_CAPACITY>>,
read_buffer: SpinLock<RingBuffer<u8>>,
/// termios
termios: SpinLock<KernelTermios>,
/// Windows size,
@ -45,21 +45,24 @@ pub struct LineDiscipline {
work_item_para: Arc<SpinLock<LineDisciplineWorkPara>>,
}
#[derive(Default)]
pub struct CurrentLine {
buffer: StaticRb<u8, BUFFER_CAPACITY>,
buffer: RingBuffer<u8>,
}
impl Default for CurrentLine {
fn default() -> Self {
Self {
buffer: RingBuffer::new(BUFFER_CAPACITY),
}
}
}
impl CurrentLine {
pub fn new() -> Self {
Self {
buffer: StaticRb::default(),
}
}
/// read all bytes inside current line and clear current line
pub fn drain(&mut self) -> Vec<u8> {
self.buffer.pop_iter().collect()
let mut ret = vec![0u8; self.buffer.len()];
self.buffer.pop_slice(ret.as_mut_slice()).unwrap();
ret
}
pub fn push_char(&mut self, char: u8) {
@ -69,7 +72,7 @@ impl CurrentLine {
}
pub fn backspace(&mut self) {
self.buffer.pop();
let _ = self.buffer.pop();
}
pub fn is_full(&self) -> bool {
@ -92,8 +95,8 @@ impl LineDiscipline {
}
})));
Self {
current_line: SpinLock::new(CurrentLine::new()),
read_buffer: SpinLock::new(StaticRb::default()),
current_line: SpinLock::new(CurrentLine::default()),
read_buffer: SpinLock::new(RingBuffer::new(BUFFER_CAPACITY)),
termios: SpinLock::new(KernelTermios::default()),
winsize: SpinLock::new(WinSize::default()),
pollee: Pollee::new(IoEvents::empty()),
@ -372,7 +375,7 @@ impl LineDiscipline {
pub fn drain_input(&self) {
self.current_line.lock().drain();
let _: Vec<_> = self.read_buffer.lock().pop_iter().collect();
self.read_buffer.lock().clear();
}
pub fn buffer_len(&self) -> usize {

View File

@ -62,13 +62,11 @@ impl Pollable for PipeReader {
impl FileLike for PipeReader {
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
let mut buf = vec![0u8; writer.avail()];
let read_len = if self.status_flags().contains(StatusFlags::O_NONBLOCK) {
self.consumer.try_read(&mut buf)?
self.consumer.try_read(writer)?
} else {
self.wait_events(IoEvents::IN, || self.consumer.try_read(&mut buf))?
self.wait_events(IoEvents::IN, || self.consumer.try_read(writer))?
};
writer.write_fallible(&mut buf.as_slice().into())?;
Ok(read_len)
}
@ -147,11 +145,10 @@ impl Pollable for PipeWriter {
impl FileLike for PipeWriter {
fn write(&self, reader: &mut VmReader) -> Result<usize> {
let buf = reader.collect()?;
if self.status_flags().contains(StatusFlags::O_NONBLOCK) {
self.producer.try_write(&buf)
self.producer.try_write(reader)
} else {
self.wait_events(IoEvents::OUT, || self.producer.try_write(&buf))
self.wait_events(IoEvents::OUT, || self.producer.try_write(reader))
}
}

View File

@ -4,12 +4,12 @@ use core::sync::atomic::{AtomicBool, Ordering};
use aster_rights::{Read, ReadOp, TRights, Write, WriteOp};
use aster_rights_proc::require;
use ringbuf::{HeapConsumer as HeapRbConsumer, HeapProducer as HeapRbProducer, HeapRb};
use crate::{
events::{IoEvents, Observer},
prelude::*,
process::signal::{Pollee, Poller},
util::ring_buffer::{RbConsumer, RbProducer, RingBuffer},
};
/// A unidirectional communication channel, intended to implement IPC, e.g., pipe,
@ -93,11 +93,11 @@ macro_rules! impl_common_methods_for_channel {
}
impl<T> Producer<T> {
fn this_end(&self) -> &FifoInner<HeapRbProducer<T>> {
fn this_end(&self) -> &FifoInner<RbProducer<T>> {
&self.0.common.producer
}
fn peer_end(&self) -> &FifoInner<HeapRbConsumer<T>> {
fn peer_end(&self) -> &FifoInner<RbConsumer<T>> {
&self.0.common.consumer
}
@ -127,14 +127,14 @@ impl<T> Producer<T> {
impl_common_methods_for_channel!();
}
impl<T: Copy> Producer<T> {
impl Producer<u8> {
/// Tries to write `buf` to the channel.
///
/// - Returns `Ok(_)` with the number of bytes written if successful.
/// - Returns `Err(EPIPE)` if the channel is shut down.
/// - Returns `Err(EAGAIN)` if the channel is full.
pub fn try_write(&self, buf: &[T]) -> Result<usize> {
if buf.is_empty() {
pub fn try_write(&self, reader: &mut VmReader) -> Result<usize> {
if reader.remain() == 0 {
// Even after shutdown, writing an empty buffer is still fine.
return Ok(0);
}
@ -143,7 +143,7 @@ impl<T: Copy> Producer<T> {
return_errno_with_message!(Errno::EPIPE, "the channel is shut down");
}
let written_len = self.0.write(buf);
let written_len = self.0.write(reader);
self.update_pollee();
if written_len > 0 {
@ -154,7 +154,7 @@ impl<T: Copy> Producer<T> {
}
}
impl<T> Producer<T> {
impl<T: Pod> Producer<T> {
/// Tries to push `item` to the channel.
///
/// - Returns `Ok(())` if successful.
@ -188,11 +188,11 @@ impl<T> Drop for Producer<T> {
}
impl<T> Consumer<T> {
fn this_end(&self) -> &FifoInner<HeapRbConsumer<T>> {
fn this_end(&self) -> &FifoInner<RbConsumer<T>> {
&self.0.common.consumer
}
fn peer_end(&self) -> &FifoInner<HeapRbProducer<T>> {
fn peer_end(&self) -> &FifoInner<RbProducer<T>> {
&self.0.common.producer
}
@ -220,21 +220,21 @@ impl<T> Consumer<T> {
impl_common_methods_for_channel!();
}
impl<T: Copy> Consumer<T> {
impl Consumer<u8> {
/// Tries to read `buf` from the channel.
///
/// - Returns `Ok(_)` with the number of bytes read if successful.
/// - Returns `Ok(0)` if the channel is shut down and there is no data left.
/// - Returns `Err(EAGAIN)` if the channel is empty.
pub fn try_read(&self, buf: &mut [T]) -> Result<usize> {
if buf.is_empty() {
pub fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
if writer.avail() == 0 {
return Ok(0);
}
// This must be recorded before the actual operation to avoid race conditions.
let is_shutdown = self.is_shutdown() || self.is_peer_shutdown();
let read_len = self.0.read(buf);
let read_len = self.0.read(writer);
self.update_pollee();
if read_len > 0 {
@ -247,7 +247,7 @@ impl<T: Copy> Consumer<T> {
}
}
impl<T> Consumer<T> {
impl<T: Pod> Consumer<T> {
/// Tries to read an item from the channel.
///
/// - Returns `Ok(Some(_))` with the popped item if successful.
@ -299,28 +299,40 @@ impl<T, R: TRights> Fifo<T, R> {
}
}
impl<T: Copy, R: TRights> Fifo<T, R> {
impl<R: TRights> Fifo<u8, R> {
#[require(R > Read)]
pub fn read(&self, buf: &mut [T]) -> usize {
pub fn read(&self, writer: &mut VmWriter) -> usize {
let mut rb = self.common.consumer.rb();
rb.pop_slice(buf)
match rb.read_fallible(writer) {
Ok(len) => len,
Err((e, len)) => {
error!("memory read failed on the ring buffer, error: {e:?}");
len
}
}
}
#[require(R > Write)]
pub fn write(&self, buf: &[T]) -> usize {
pub fn write(&self, reader: &mut VmReader) -> usize {
let mut rb = self.common.producer.rb();
rb.push_slice(buf)
match rb.write_fallible(reader) {
Ok(len) => len,
Err((e, len)) => {
error!("memory write failed on the ring buffer, error: {e:?}");
len
}
}
}
}
impl<T, R: TRights> Fifo<T, R> {
impl<T: Pod, R: TRights> Fifo<T, R> {
/// Pushes an item into the endpoint.
/// If the `push` method fails, this method will return
/// `Err` containing the item that hasn't been pushed
#[require(R > Write)]
pub fn push(&self, item: T) -> core::result::Result<(), T> {
let mut rb = self.common.producer.rb();
rb.push(item)
rb.push(item).ok_or(item)
}
/// Pops an item from the endpoint.
@ -332,13 +344,13 @@ impl<T, R: TRights> Fifo<T, R> {
}
struct Common<T> {
producer: FifoInner<HeapRbProducer<T>>,
consumer: FifoInner<HeapRbConsumer<T>>,
producer: FifoInner<RbProducer<T>>,
consumer: FifoInner<RbConsumer<T>>,
}
impl<T> Common<T> {
fn new(capacity: usize) -> Self {
let rb: HeapRb<T> = HeapRb::new(capacity);
let rb: RingBuffer<T> = RingBuffer::new(capacity);
let (rb_producer, rb_consumer) = rb.split();
let producer = FifoInner::new(rb_producer, IoEvents::OUT);
@ -387,23 +399,28 @@ mod test {
use super::*;
#[ktest]
fn test_non_copy() {
#[derive(Clone, Debug, PartialEq, Eq)]
struct NonCopy(Arc<usize>);
fn test_channel_basics() {
let channel = Channel::with_capacity(16);
let (producer, consumer) = channel.split();
let data = NonCopy(Arc::new(99));
let expected_data = data.clone();
let data = [1u8, 3, 7];
for _ in 0..3 {
producer.try_push(data.clone()).unwrap();
for d in &data {
producer.try_push(*d).unwrap();
}
for d in &data {
let popped = consumer.try_pop().unwrap().unwrap();
assert_eq!(*d, popped);
}
for _ in 0..3 {
let data = consumer.try_pop().unwrap().unwrap();
assert_eq!(data, expected_data);
}
let mut expected_data = [0u8; 3];
let write_len = producer
.try_write(&mut VmReader::from(data.as_slice()).to_fallible())
.unwrap();
assert_eq!(write_len, 3);
consumer
.try_read(&mut VmWriter::from(expected_data.as_mut_slice()).to_fallible())
.unwrap();
assert_eq!(data, expected_data);
}
}

View File

@ -48,11 +48,13 @@ impl Connected {
}
pub(super) fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
self.reader.try_read(buf)
let mut writer = VmWriter::from(buf).to_fallible();
self.reader.try_read(&mut writer)
}
pub(super) fn try_write(&self, buf: &[u8]) -> Result<usize> {
self.writer.try_write(buf)
let mut reader = VmReader::from(buf).to_fallible();
self.writer.try_write(&mut reader)
}
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {

View File

@ -1,7 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
use aster_virtio::device::socket::connect::{ConnectionInfo, VsockEvent};
use ringbuf::{ring_buffer::RbBase, HeapRb, Rb};
use super::connecting::Connecting;
use crate::{
@ -12,6 +11,7 @@ use crate::{
},
prelude::*,
process::signal::{Pollee, Poller},
util::ring_buffer::RingBuffer,
};
const PER_CONNECTION_BUFFER_CAPACITY: usize = 4096;
@ -53,7 +53,7 @@ impl Connected {
pub fn try_recv(&self, buf: &mut [u8]) -> Result<usize> {
let mut connection = self.connection.disable_irq().lock();
let bytes_read = connection.buffer.len().min(buf.len());
connection.buffer.pop_slice(&mut buf[..bytes_read]);
connection.buffer.pop_slice(&mut buf[..bytes_read]).unwrap();
connection.info.done_forwarding(bytes_read);
match bytes_read {
@ -144,7 +144,7 @@ impl Connected {
struct Connection {
info: ConnectionInfo,
buffer: HeapRb<u8>,
buffer: RingBuffer<u8>,
/// The peer sent a SHUTDOWN request, but we haven't yet responded with a RST because there is
/// still data in the buffer.
peer_requested_shutdown: bool,
@ -157,7 +157,7 @@ impl Connection {
info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap();
Self {
info,
buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY),
buffer: RingBuffer::new(PER_CONNECTION_BUFFER_CAPACITY),
peer_requested_shutdown: false,
local_shutdown: false,
}
@ -183,7 +183,7 @@ impl Connection {
info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap();
Self {
info,
buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY),
buffer: RingBuffer::new(PER_CONNECTION_BUFFER_CAPACITY),
peer_requested_shutdown: false,
local_shutdown: false,
}
@ -194,10 +194,10 @@ impl Connection {
}
fn add(&mut self, bytes: &[u8]) -> bool {
if bytes.len() > self.buffer.free_len() {
if bytes.len() > self.buffer.capacity() - self.buffer.len() {
return false;
}
self.buffer.push_slice(bytes);
self.buffer.push_slice(bytes).unwrap();
true
}
}