diff --git a/services/comps/block/src/bio.rs b/services/comps/block/src/bio.rs index f3ee10acd..abf57572b 100644 --- a/services/comps/block/src/bio.rs +++ b/services/comps/block/src/bio.rs @@ -17,6 +17,7 @@ use int_to_c_enum::TryFromInt; /// (2) The target sectors on the device for doing I/O, /// (3) The memory locations (`BioSegment`) from/to which data are read/written, /// (4) The optional callback function that will be invoked when the I/O is completed. +#[derive(Debug)] pub struct Bio(Arc); impl Bio { @@ -85,10 +86,7 @@ impl Bio { ); assert!(result.is_ok()); - if let Err(e) = block_device - .request_queue() - .enqueue(SubmittedBio(self.0.clone())) - { + if let Err(e) = block_device.enqueue(SubmittedBio(self.0.clone())) { // Fail to submit, revert the status. let result = self.0.status.compare_exchange( BioStatus::Submit as u32, @@ -151,6 +149,7 @@ impl From for aster_frame::Error { /// This structure holds a list of `Bio` requests and provides functionality to /// wait for their completion and retrieve their statuses. #[must_use] +#[derive(Debug)] pub struct BioWaiter { bios: Vec>, } @@ -230,6 +229,7 @@ impl Default for BioWaiter { /// A submitted `Bio` object. /// /// The request queue of block device only accepts a `SubmittedBio` into the queue. +#[derive(Debug)] pub struct SubmittedBio(Arc); impl SubmittedBio { @@ -309,6 +309,18 @@ impl BioInner { } } +impl Debug for BioInner { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("BioInner") + .field("type", &self.type_()) + .field("sid_range", &self.sid_range()) + .field("status", &self.status()) + .field("segments", &self.segments()) + .field("complete_fn", &self.complete_fn) + .finish() + } +} + /// The type of `Bio`. #[derive(Clone, Copy, Debug, PartialEq, TryFromInt)] #[repr(u8)] diff --git a/services/comps/block/src/lib.rs b/services/comps/block/src/lib.rs index 723f9c390..7d40e8ca1 100644 --- a/services/comps/block/src/lib.rs +++ b/services/comps/block/src/lib.rs @@ -40,7 +40,8 @@ mod impl_block_device; mod prelude; pub mod request_queue; -use self::{prelude::*, request_queue::BioRequestQueue}; +use self::bio::{BioEnqueueError, SubmittedBio}; +use self::prelude::*; use aster_frame::sync::SpinLock; use component::init_component; @@ -52,8 +53,8 @@ pub const BLOCK_SIZE: usize = aster_frame::config::PAGE_SIZE; pub const SECTOR_SIZE: usize = 512; pub trait BlockDevice: Send + Sync + Any + Debug { - /// Returns this block device's request queue, to which block I/O requests may be submitted. - fn request_queue(&self) -> &dyn BioRequestQueue; + /// Enqueues a new `SubmittedBio` to the block device. + fn enqueue(&self, bio: SubmittedBio) -> Result<(), BioEnqueueError>; fn handle_irq(&self); } diff --git a/services/comps/block/src/prelude.rs b/services/comps/block/src/prelude.rs index 77dc3c68a..21c28f66c 100644 --- a/services/comps/block/src/prelude.rs +++ b/services/comps/block/src/prelude.rs @@ -8,4 +8,4 @@ pub(crate) use alloc::vec::Vec; pub(crate) use core::any::Any; pub(crate) use core::fmt::Debug; pub(crate) use core::ops::Range; -pub(crate) use core::sync::atomic::{AtomicU32, Ordering}; +pub(crate) use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; diff --git a/services/comps/block/src/request_queue.rs b/services/comps/block/src/request_queue.rs index 749a8b15d..21e284b40 100644 --- a/services/comps/block/src/request_queue.rs +++ b/services/comps/block/src/request_queue.rs @@ -7,24 +7,118 @@ use super::{ id::Sid, }; -/// Represents the software staging queue for the `BioRequest` objects. -pub trait BioRequestQueue { +use aster_frame::sync::{Mutex, WaitQueue}; + +/// A simple block I/O request queue backed by one internal FIFO queue. +/// +/// It is a FIFO producer-consumer queue, where the producer (e.g., filesystem) +/// submits requests to the queue, and the consumer (e.g., block device driver) +/// continuously consumes and processes these requests from the queue. +/// +/// It supports merging the new request with the front request if if the type +/// is same and the sector range is contiguous. +pub struct BioRequestSingleQueue { + queue: Mutex>, + num_requests: AtomicUsize, + wait_queue: WaitQueue, +} + +impl BioRequestSingleQueue { + /// Creates an empty queue. + pub fn new() -> Self { + Self { + queue: Mutex::new(VecDeque::new()), + num_requests: AtomicUsize::new(0), + wait_queue: WaitQueue::new(), + } + } + + /// Returns the number of requests currently in this queue. + pub fn num_requests(&self) -> usize { + self.num_requests.load(Ordering::Relaxed) + } + /// Enqueues a `SubmittedBio` to this queue. /// - /// This `SubmittedBio` will be merged into an existing `BioRequest`, or a new - /// `BioRequest` will be created from the `SubmittedBio` before being placed - /// into the queue. + /// When enqueueing the `SubmittedBio`, try to insert it into the last request if the + /// type is same and the sector range is contiguous. + /// Otherwise, creates and inserts a new request for the `SubmittedBio`. /// /// This method will wake up the waiter if a new `BioRequest` is enqueued. - fn enqueue(&self, bio: SubmittedBio) -> Result<(), BioEnqueueError>; + pub fn enqueue(&self, bio: SubmittedBio) -> Result<(), BioEnqueueError> { + let mut queue = self.queue.lock(); + if let Some(request) = queue.front_mut() { + if request.can_merge(&bio) { + request.merge_bio(bio); + return Ok(()); + } + } + + let new_request = BioRequest::from(bio); + queue.push_front(new_request); + self.inc_num_requests(); + drop(queue); + + self.wait_queue.wake_all(); + Ok(()) + } /// Dequeues a `BioRequest` from this queue. /// /// This method will wait until one request can be retrieved. - fn dequeue(&self) -> BioRequest; + pub fn dequeue(&self) -> BioRequest { + let mut num_requests = self.num_requests(); + + loop { + if num_requests > 0 { + let mut queue = self.queue.lock(); + if let Some(request) = queue.pop_back() { + self.dec_num_requests(); + return request; + } + } + + num_requests = self.wait_queue.wait_until(|| { + let num_requests = self.num_requests(); + if num_requests > 0 { + Some(num_requests) + } else { + None + } + }); + } + } + + fn dec_num_requests(&self) { + self.num_requests.fetch_sub(1, Ordering::Relaxed); + } + + fn inc_num_requests(&self) { + self.num_requests.fetch_add(1, Ordering::Relaxed); + } +} + +impl Default for BioRequestSingleQueue { + fn default() -> Self { + Self::new() + } +} + +impl Debug for BioRequestSingleQueue { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("BioRequestSingleQueue") + .field("num_requests", &self.num_requests()) + .field("queue", &self.queue.lock()) + .finish() + } } /// The block I/O request. +/// +/// The advantage of this data structure is to merge several `SubmittedBio`s that are +/// contiguous on the target device's sector address, allowing them to be collectively +/// processed in a queue. +#[derive(Debug)] pub struct BioRequest { /// The type of the I/O type_: BioType, diff --git a/services/comps/virtio/src/device/block/device.rs b/services/comps/virtio/src/device/block/device.rs index 5e0613087..a83b82514 100644 --- a/services/comps/virtio/src/device/block/device.rs +++ b/services/comps/virtio/src/device/block/device.rs @@ -1,22 +1,16 @@ // SPDX-License-Identifier: MPL-2.0 -use core::{ - fmt::Debug, - hint::spin_loop, - mem::size_of, - sync::atomic::{AtomicUsize, Ordering}, -}; +use core::{fmt::Debug, hint::spin_loop, mem::size_of}; -use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, string::ToString, sync::Arc, vec::Vec}; use aster_block::{ bio::{BioEnqueueError, BioStatus, BioType, SubmittedBio}, id::Sid, - request_queue::{BioRequest, BioRequestQueue}, + request_queue::{BioRequest, BioRequestSingleQueue}, }; use aster_frame::{ io_mem::IoMem, sync::SpinLock, - sync::{Mutex, WaitQueue}, trap::TrapFrame, vm::{VmAllocOptions, VmFrame, VmIo, VmReader, VmWriter}, }; @@ -37,7 +31,7 @@ use super::{BlockFeatures, VirtioBlockConfig}; pub struct BlockDevice { device: DeviceInner, /// The software staging queue. - queue: BioSingleQueue, + queue: BioRequestSingleQueue, } impl BlockDevice { @@ -47,7 +41,7 @@ impl BlockDevice { let device = DeviceInner::init(transport)?; Self { device, - queue: BioSingleQueue::new(), + queue: BioRequestSingleQueue::new(), } }; aster_block::register_device(super::DEVICE_NAME.to_string(), Arc::new(block_device)); @@ -117,8 +111,8 @@ impl BlockDevice { } impl aster_block::BlockDevice for BlockDevice { - fn request_queue(&self) -> &dyn BioRequestQueue { - &self.queue + fn enqueue(&self, bio: SubmittedBio) -> Result<(), BioEnqueueError> { + self.queue.enqueue(bio) } fn handle_irq(&self) { @@ -303,99 +297,3 @@ impl Default for BlockResp { } } } - -/// A simple block I/O request queue with a single queue. -/// -/// It is a FIFO producer-consumer queue, where the producer (e.g., filesystem) -/// submits requests to the queue, and the consumer (e.g., block device driver) -/// continuously consumes and processes these requests from the queue. -pub struct BioSingleQueue { - queue: Mutex>, - num_requests: AtomicUsize, - wait_queue: WaitQueue, -} - -impl BioSingleQueue { - /// Creates an empty queue. - pub fn new() -> Self { - Self { - queue: Mutex::new(VecDeque::new()), - num_requests: AtomicUsize::new(0), - wait_queue: WaitQueue::new(), - } - } - - /// Returns the number of requests currently in this queue. - pub fn num_requests(&self) -> usize { - self.num_requests.load(Ordering::Relaxed) - } - - fn dec_num_requests(&self) { - self.num_requests.fetch_sub(1, Ordering::Relaxed); - } - - fn inc_num_requests(&self) { - self.num_requests.fetch_add(1, Ordering::Relaxed); - } -} - -impl Default for BioSingleQueue { - fn default() -> Self { - Self::new() - } -} - -impl Debug for BioSingleQueue { - fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { - f.debug_struct("BioSingleQueue") - .field("num_requests", &self.num_requests()) - .finish() - } -} - -impl BioRequestQueue for BioSingleQueue { - /// Enqueues a `SubmittedBio` to this queue. - /// - /// When enqueueing the `SubmittedBio`, try to insert it into the last request if the - /// type is same and the sector range is contiguous. - /// Otherwise, creates and inserts a new request for the `SubmittedBio`. - fn enqueue(&self, bio: SubmittedBio) -> Result<(), BioEnqueueError> { - let mut queue = self.queue.lock(); - if let Some(request) = queue.front_mut() { - if request.can_merge(&bio) { - request.merge_bio(bio); - return Ok(()); - } - } - - let new_request = BioRequest::from(bio); - queue.push_front(new_request); - drop(queue); - self.inc_num_requests(); - self.wait_queue.wake_all(); - Ok(()) - } - - /// Dequeues a `BioRequest` from this queue. - fn dequeue(&self) -> BioRequest { - let mut num_requests = self.num_requests(); - - loop { - if num_requests > 0 { - if let Some(request) = self.queue.lock().pop_back() { - self.dec_num_requests(); - return request; - } - } - - num_requests = self.wait_queue.wait_until(|| { - let num_requests = self.num_requests(); - if num_requests > 0 { - Some(num_requests) - } else { - None - } - }); - } - } -}