From e6f465b553f613d60828e0e503dc94ce4109ded1 Mon Sep 17 00:00:00 2001 From: Shaowei Song Date: Wed, 28 Aug 2024 06:23:24 +0000 Subject: [PATCH] Replace all the ring buffers with the new one --- Cargo.lock | 20 ---- kernel/Cargo.toml | 1 - kernel/comps/network/Cargo.toml | 1 - kernel/src/device/pty/pty.rs | 17 ++-- kernel/src/device/tty/line_discipline.rs | 33 ++++--- kernel/src/fs/pipe.rs | 11 +-- kernel/src/fs/utils/channel.rs | 91 +++++++++++-------- .../src/net/socket/unix/stream/connected.rs | 6 +- .../src/net/socket/vsock/stream/connected.rs | 14 +-- 9 files changed, 95 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 749f31544..f5fd22574 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,7 +134,6 @@ dependencies = [ "int-to-c-enum", "log", "ostd", - "ringbuf", "smoltcp", "spin 0.9.8", ] @@ -180,7 +179,6 @@ dependencies = [ "ostd", "paste", "rand", - "ringbuf", "smoltcp", "spin 0.9.8", "static_assertions", @@ -414,15 +412,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-utils" -version = "0.8.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] - [[package]] name = "ctor" version = "0.1.25" @@ -1257,15 +1246,6 @@ dependencies = [ "bitflags 2.4.1", ] -[[package]] -name = "ringbuf" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "rle-decode-fast" version = "1.0.3" diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 321adc4ed..1fbc860c9 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -49,7 +49,6 @@ tdx-guest = { version = "0.1.7", optional = true } xmas-elf = "0.8.0" # data-structures bitflags = "1.3" -ringbuf = { version = "0.3.2", default-features = false, features = ["alloc"] } keyable-arc = { path = "libs/keyable-arc" } # unzip initramfs libflate = { version = "2", default-features = false } diff --git a/kernel/comps/network/Cargo.toml b/kernel/comps/network/Cargo.toml index d725a8c4c..0b2abb295 100644 --- a/kernel/comps/network/Cargo.toml +++ b/kernel/comps/network/Cargo.toml @@ -15,7 +15,6 @@ component = { path = "../../libs/comp-sys/component" } int-to-c-enum = { path = "../../libs/int-to-c-enum" } log = "0.4" ostd = { path = "../../../ostd" } -ringbuf = { version = "0.3.2", default-features = false, features = ["alloc"] } smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", rev = "dc08e0b", default-features = false, features = [ "alloc", "log", diff --git a/kernel/src/device/pty/pty.rs b/kernel/src/device/pty/pty.rs index 0ceb05423..674a0f683 100644 --- a/kernel/src/device/pty/pty.rs +++ b/kernel/src/device/pty/pty.rs @@ -2,8 +2,6 @@ use alloc::format; -use ringbuf::{ring_buffer::RbBase, HeapRb, Rb}; - use crate::{ device::tty::{line_discipline::LineDiscipline, new_job_control_and_ldisc}, events::IoEvents, @@ -20,6 +18,7 @@ use crate::{ signal::{Pollee, Poller}, JobControl, Terminal, }, + util::ring_buffer::RingBuffer, }; const BUFFER_CAPACITY: usize = 4096; @@ -32,7 +31,7 @@ pub struct PtyMaster { ptmx: Arc, index: u32, output: Arc, - input: SpinLock>, + input: SpinLock>, job_control: Arc, /// The state of input buffer pollee: Pollee, @@ -46,7 +45,7 @@ impl PtyMaster { ptmx, index, output: ldisc, - input: SpinLock::new(HeapRb::new(BUFFER_CAPACITY)), + input: SpinLock::new(RingBuffer::new(BUFFER_CAPACITY)), job_control, pollee: Pollee::new(IoEvents::OUT), weak_self: weak_ref.clone(), @@ -89,7 +88,7 @@ impl PtyMaster { self.output.buffer_len() } - fn update_state(&self, buf: &HeapRb) { + fn update_state(&self, buf: &RingBuffer) { if buf.is_empty() { self.pollee.del_events(IoEvents::IN) } else { @@ -125,10 +124,10 @@ impl FileIo for PtyMaster { continue; } - let read_len = input.len().min(read_len); - let mut buf = vec![0u8; read_len]; - input.pop_slice(&mut buf); - writer.write_fallible(&mut buf.as_slice().into())?; + let read_len = match input.read_fallible(writer) { + Ok(len) => len, + Err((_, len)) => len, + }; self.update_state(&input); return Ok(read_len); } diff --git a/kernel/src/device/tty/line_discipline.rs b/kernel/src/device/tty/line_discipline.rs index ad9ef7f44..482e40af5 100644 --- a/kernel/src/device/tty/line_discipline.rs +++ b/kernel/src/device/tty/line_discipline.rs @@ -5,7 +5,6 @@ use alloc::format; use ostd::trap::{disable_local, in_interrupt_context}; -use ringbuf::{ring_buffer::RbBase, Rb, StaticRb}; use super::termio::{KernelTermios, WinSize, CC_C_CHAR}; use crate::{ @@ -17,6 +16,7 @@ use crate::{ Pollee, Poller, }, thread::work_queue::{submit_work_item, work_item::WorkItem, WorkPriority}, + util::ring_buffer::RingBuffer, }; // This implementation refers the implementation of linux @@ -30,7 +30,7 @@ pub struct LineDiscipline { /// current line current_line: SpinLock, /// The read buffer - read_buffer: SpinLock>, + read_buffer: SpinLock>, /// termios termios: SpinLock, /// Windows size, @@ -45,21 +45,24 @@ pub struct LineDiscipline { work_item_para: Arc>, } -#[derive(Default)] pub struct CurrentLine { - buffer: StaticRb, + buffer: RingBuffer, +} + +impl Default for CurrentLine { + fn default() -> Self { + Self { + buffer: RingBuffer::new(BUFFER_CAPACITY), + } + } } impl CurrentLine { - pub fn new() -> Self { - Self { - buffer: StaticRb::default(), - } - } - /// read all bytes inside current line and clear current line pub fn drain(&mut self) -> Vec { - self.buffer.pop_iter().collect() + let mut ret = vec![0u8; self.buffer.len()]; + self.buffer.pop_slice(ret.as_mut_slice()).unwrap(); + ret } pub fn push_char(&mut self, char: u8) { @@ -69,7 +72,7 @@ impl CurrentLine { } pub fn backspace(&mut self) { - self.buffer.pop(); + let _ = self.buffer.pop(); } pub fn is_full(&self) -> bool { @@ -92,8 +95,8 @@ impl LineDiscipline { } }))); Self { - current_line: SpinLock::new(CurrentLine::new()), - read_buffer: SpinLock::new(StaticRb::default()), + current_line: SpinLock::new(CurrentLine::default()), + read_buffer: SpinLock::new(RingBuffer::new(BUFFER_CAPACITY)), termios: SpinLock::new(KernelTermios::default()), winsize: SpinLock::new(WinSize::default()), pollee: Pollee::new(IoEvents::empty()), @@ -372,7 +375,7 @@ impl LineDiscipline { pub fn drain_input(&self) { self.current_line.lock().drain(); - let _: Vec<_> = self.read_buffer.lock().pop_iter().collect(); + self.read_buffer.lock().clear(); } pub fn buffer_len(&self) -> usize { diff --git a/kernel/src/fs/pipe.rs b/kernel/src/fs/pipe.rs index ee676475c..0005cdebe 100644 --- a/kernel/src/fs/pipe.rs +++ b/kernel/src/fs/pipe.rs @@ -62,13 +62,11 @@ impl Pollable for PipeReader { impl FileLike for PipeReader { fn read(&self, writer: &mut VmWriter) -> Result { - let mut buf = vec![0u8; writer.avail()]; let read_len = if self.status_flags().contains(StatusFlags::O_NONBLOCK) { - self.consumer.try_read(&mut buf)? + self.consumer.try_read(writer)? } else { - self.wait_events(IoEvents::IN, || self.consumer.try_read(&mut buf))? + self.wait_events(IoEvents::IN, || self.consumer.try_read(writer))? }; - writer.write_fallible(&mut buf.as_slice().into())?; Ok(read_len) } @@ -147,11 +145,10 @@ impl Pollable for PipeWriter { impl FileLike for PipeWriter { fn write(&self, reader: &mut VmReader) -> Result { - let buf = reader.collect()?; if self.status_flags().contains(StatusFlags::O_NONBLOCK) { - self.producer.try_write(&buf) + self.producer.try_write(reader) } else { - self.wait_events(IoEvents::OUT, || self.producer.try_write(&buf)) + self.wait_events(IoEvents::OUT, || self.producer.try_write(reader)) } } diff --git a/kernel/src/fs/utils/channel.rs b/kernel/src/fs/utils/channel.rs index 915b46797..5c43e7fea 100644 --- a/kernel/src/fs/utils/channel.rs +++ b/kernel/src/fs/utils/channel.rs @@ -4,12 +4,12 @@ use core::sync::atomic::{AtomicBool, Ordering}; use aster_rights::{Read, ReadOp, TRights, Write, WriteOp}; use aster_rights_proc::require; -use ringbuf::{HeapConsumer as HeapRbConsumer, HeapProducer as HeapRbProducer, HeapRb}; use crate::{ events::{IoEvents, Observer}, prelude::*, process::signal::{Pollee, Poller}, + util::ring_buffer::{RbConsumer, RbProducer, RingBuffer}, }; /// A unidirectional communication channel, intended to implement IPC, e.g., pipe, @@ -93,11 +93,11 @@ macro_rules! impl_common_methods_for_channel { } impl Producer { - fn this_end(&self) -> &FifoInner> { + fn this_end(&self) -> &FifoInner> { &self.0.common.producer } - fn peer_end(&self) -> &FifoInner> { + fn peer_end(&self) -> &FifoInner> { &self.0.common.consumer } @@ -127,14 +127,14 @@ impl Producer { impl_common_methods_for_channel!(); } -impl Producer { +impl Producer { /// Tries to write `buf` to the channel. /// /// - Returns `Ok(_)` with the number of bytes written if successful. /// - Returns `Err(EPIPE)` if the channel is shut down. /// - Returns `Err(EAGAIN)` if the channel is full. - pub fn try_write(&self, buf: &[T]) -> Result { - if buf.is_empty() { + pub fn try_write(&self, reader: &mut VmReader) -> Result { + if reader.remain() == 0 { // Even after shutdown, writing an empty buffer is still fine. return Ok(0); } @@ -143,7 +143,7 @@ impl Producer { return_errno_with_message!(Errno::EPIPE, "the channel is shut down"); } - let written_len = self.0.write(buf); + let written_len = self.0.write(reader); self.update_pollee(); if written_len > 0 { @@ -154,7 +154,7 @@ impl Producer { } } -impl Producer { +impl Producer { /// Tries to push `item` to the channel. /// /// - Returns `Ok(())` if successful. @@ -188,11 +188,11 @@ impl Drop for Producer { } impl Consumer { - fn this_end(&self) -> &FifoInner> { + fn this_end(&self) -> &FifoInner> { &self.0.common.consumer } - fn peer_end(&self) -> &FifoInner> { + fn peer_end(&self) -> &FifoInner> { &self.0.common.producer } @@ -220,21 +220,21 @@ impl Consumer { impl_common_methods_for_channel!(); } -impl Consumer { +impl Consumer { /// Tries to read `buf` from the channel. /// /// - Returns `Ok(_)` with the number of bytes read if successful. /// - Returns `Ok(0)` if the channel is shut down and there is no data left. /// - Returns `Err(EAGAIN)` if the channel is empty. - pub fn try_read(&self, buf: &mut [T]) -> Result { - if buf.is_empty() { + pub fn try_read(&self, writer: &mut VmWriter) -> Result { + if writer.avail() == 0 { return Ok(0); } // This must be recorded before the actual operation to avoid race conditions. let is_shutdown = self.is_shutdown() || self.is_peer_shutdown(); - let read_len = self.0.read(buf); + let read_len = self.0.read(writer); self.update_pollee(); if read_len > 0 { @@ -247,7 +247,7 @@ impl Consumer { } } -impl Consumer { +impl Consumer { /// Tries to read an item from the channel. /// /// - Returns `Ok(Some(_))` with the popped item if successful. @@ -299,28 +299,40 @@ impl Fifo { } } -impl Fifo { +impl Fifo { #[require(R > Read)] - pub fn read(&self, buf: &mut [T]) -> usize { + pub fn read(&self, writer: &mut VmWriter) -> usize { let mut rb = self.common.consumer.rb(); - rb.pop_slice(buf) + match rb.read_fallible(writer) { + Ok(len) => len, + Err((e, len)) => { + error!("memory read failed on the ring buffer, error: {e:?}"); + len + } + } } #[require(R > Write)] - pub fn write(&self, buf: &[T]) -> usize { + pub fn write(&self, reader: &mut VmReader) -> usize { let mut rb = self.common.producer.rb(); - rb.push_slice(buf) + match rb.write_fallible(reader) { + Ok(len) => len, + Err((e, len)) => { + error!("memory write failed on the ring buffer, error: {e:?}"); + len + } + } } } -impl Fifo { +impl Fifo { /// Pushes an item into the endpoint. /// If the `push` method fails, this method will return /// `Err` containing the item that hasn't been pushed #[require(R > Write)] pub fn push(&self, item: T) -> core::result::Result<(), T> { let mut rb = self.common.producer.rb(); - rb.push(item) + rb.push(item).ok_or(item) } /// Pops an item from the endpoint. @@ -332,13 +344,13 @@ impl Fifo { } struct Common { - producer: FifoInner>, - consumer: FifoInner>, + producer: FifoInner>, + consumer: FifoInner>, } impl Common { fn new(capacity: usize) -> Self { - let rb: HeapRb = HeapRb::new(capacity); + let rb: RingBuffer = RingBuffer::new(capacity); let (rb_producer, rb_consumer) = rb.split(); let producer = FifoInner::new(rb_producer, IoEvents::OUT); @@ -387,23 +399,28 @@ mod test { use super::*; #[ktest] - fn test_non_copy() { - #[derive(Clone, Debug, PartialEq, Eq)] - struct NonCopy(Arc); - + fn test_channel_basics() { let channel = Channel::with_capacity(16); let (producer, consumer) = channel.split(); - let data = NonCopy(Arc::new(99)); - let expected_data = data.clone(); + let data = [1u8, 3, 7]; - for _ in 0..3 { - producer.try_push(data.clone()).unwrap(); + for d in &data { + producer.try_push(*d).unwrap(); + } + for d in &data { + let popped = consumer.try_pop().unwrap().unwrap(); + assert_eq!(*d, popped); } - for _ in 0..3 { - let data = consumer.try_pop().unwrap().unwrap(); - assert_eq!(data, expected_data); - } + let mut expected_data = [0u8; 3]; + let write_len = producer + .try_write(&mut VmReader::from(data.as_slice()).to_fallible()) + .unwrap(); + assert_eq!(write_len, 3); + consumer + .try_read(&mut VmWriter::from(expected_data.as_mut_slice()).to_fallible()) + .unwrap(); + assert_eq!(data, expected_data); } } diff --git a/kernel/src/net/socket/unix/stream/connected.rs b/kernel/src/net/socket/unix/stream/connected.rs index f3d962d1f..77f0d3705 100644 --- a/kernel/src/net/socket/unix/stream/connected.rs +++ b/kernel/src/net/socket/unix/stream/connected.rs @@ -48,11 +48,13 @@ impl Connected { } pub(super) fn try_read(&self, buf: &mut [u8]) -> Result { - self.reader.try_read(buf) + let mut writer = VmWriter::from(buf).to_fallible(); + self.reader.try_read(&mut writer) } pub(super) fn try_write(&self, buf: &[u8]) -> Result { - self.writer.try_write(buf) + let mut reader = VmReader::from(buf).to_fallible(); + self.writer.try_write(&mut reader) } pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { diff --git a/kernel/src/net/socket/vsock/stream/connected.rs b/kernel/src/net/socket/vsock/stream/connected.rs index b2c0dbccf..8dcf14810 100644 --- a/kernel/src/net/socket/vsock/stream/connected.rs +++ b/kernel/src/net/socket/vsock/stream/connected.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: MPL-2.0 use aster_virtio::device::socket::connect::{ConnectionInfo, VsockEvent}; -use ringbuf::{ring_buffer::RbBase, HeapRb, Rb}; use super::connecting::Connecting; use crate::{ @@ -12,6 +11,7 @@ use crate::{ }, prelude::*, process::signal::{Pollee, Poller}, + util::ring_buffer::RingBuffer, }; const PER_CONNECTION_BUFFER_CAPACITY: usize = 4096; @@ -53,7 +53,7 @@ impl Connected { pub fn try_recv(&self, buf: &mut [u8]) -> Result { let mut connection = self.connection.disable_irq().lock(); let bytes_read = connection.buffer.len().min(buf.len()); - connection.buffer.pop_slice(&mut buf[..bytes_read]); + connection.buffer.pop_slice(&mut buf[..bytes_read]).unwrap(); connection.info.done_forwarding(bytes_read); match bytes_read { @@ -144,7 +144,7 @@ impl Connected { struct Connection { info: ConnectionInfo, - buffer: HeapRb, + buffer: RingBuffer, /// The peer sent a SHUTDOWN request, but we haven't yet responded with a RST because there is /// still data in the buffer. peer_requested_shutdown: bool, @@ -157,7 +157,7 @@ impl Connection { info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap(); Self { info, - buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY), + buffer: RingBuffer::new(PER_CONNECTION_BUFFER_CAPACITY), peer_requested_shutdown: false, local_shutdown: false, } @@ -183,7 +183,7 @@ impl Connection { info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap(); Self { info, - buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY), + buffer: RingBuffer::new(PER_CONNECTION_BUFFER_CAPACITY), peer_requested_shutdown: false, local_shutdown: false, } @@ -194,10 +194,10 @@ impl Connection { } fn add(&mut self, bytes: &[u8]) -> bool { - if bytes.len() > self.buffer.free_len() { + if bytes.len() > self.buffer.capacity() - self.buffer.len() { return false; } - self.buffer.push_slice(bytes); + self.buffer.push_slice(bytes).unwrap(); true } }