Notify virtqueue in a batch manner

This commit is contained in:
jiangjianfeng
2024-12-17 02:24:07 +00:00
committed by Tate, Hongliang Tian
parent acc10376e6
commit 9a540d0fb6
5 changed files with 109 additions and 17 deletions

View File

@ -2,7 +2,10 @@
use alloc::vec;
use aster_bigtcp::{device, time::Instant};
use aster_bigtcp::{
device::{self, NotifyDevice},
time::Instant,
};
use ostd::mm::VmWriter;
use crate::{buffer::RxBuffer, AnyNetworkDevice};
@ -32,6 +35,13 @@ impl device::Device for dyn AnyNetworkDevice {
self.capabilities()
}
}
impl NotifyDevice for dyn AnyNetworkDevice {
fn notify_poll_end(&mut self) {
self.notify_poll_end();
}
}
pub struct RxToken(RxBuffer);
impl device::RxToken for RxToken {

View File

@ -47,12 +47,23 @@ pub trait AnyNetworkDevice: Send + Sync + Any + Debug {
fn can_receive(&self) -> bool;
fn can_send(&self) -> bool;
/// Receive a packet from network. If packet is ready, returns a RxBuffer containing the packet.
/// Otherwise, return NotReady error.
/// Receives a packet from network. If packet is ready, returns a `RxBuffer` containing the packet.
/// Otherwise, return [`VirtioNetError::NotReady`].
fn receive(&mut self) -> Result<RxBuffer, VirtioNetError>;
/// Send a packet to network. Return until the request completes.
/// Sends a packet to network.
fn send(&mut self, packet: &[u8]) -> Result<(), VirtioNetError>;
/// Frees processes tx buffers.
fn free_processed_tx_buffers(&mut self);
/// Notifies the device driver that a polling operation has ended.
///
/// The driver can assume that the device remains protected by acquiring a poll lock
/// for the entire duration of the polling process.
/// Thus two polling process cannot happen simultaneously.
fn notify_poll_end(&mut self);
}
pub trait NetDeviceIrqHandler = Fn() + Send + Sync + 'static;

View File

@ -37,6 +37,22 @@ pub struct NetworkDevice {
tx_buffers: Vec<Option<TxBuffer>>,
rx_buffers: SlotVec<RxBuffer>,
transport: Box<dyn VirtioTransport>,
poll_stat: PollStatistics,
}
/// Structure to track the number of packets sent and received during a single polling process.
struct PollStatistics {
sent_packet: usize,
received_packet: usize,
}
impl PollStatistics {
const fn new() -> Self {
Self {
sent_packet: 0,
received_packet: 0,
}
}
}
impl NetworkDevice {
@ -81,7 +97,6 @@ impl NetworkDevice {
for i in 0..QUEUE_SIZE {
let rx_pool = RX_BUFFER_POOL.get().unwrap();
let rx_buffer = RxBuffer::new(size_of::<VirtioNetHdr>(), rx_pool);
// FIEME: Replace rx_buffer with VM segment-based data structure to use dma mapping.
let token = recv_queue.add_dma_buf(&[], &[&rx_buffer])?;
assert_eq!(i, token);
assert_eq!(rx_buffers.put(rx_buffer) as u16, i);
@ -102,6 +117,7 @@ impl NetworkDevice {
tx_buffers,
rx_buffers,
transport,
poll_stat: PollStatistics::new(),
};
/// Interrupt handler if network device config space changes
@ -139,22 +155,26 @@ impl NetworkDevice {
Ok(())
}
/// Add a rx buffer to recv queue
/// FIEME: Replace rx_buffer with VM segment-based data structure to use dma mapping.
/// Adds a `RxBuffer` to the receive queue.
fn add_rx_buffer(&mut self, rx_buffer: RxBuffer) -> Result<(), VirtioNetError> {
let token = self
.recv_queue
.add_dma_buf(&[], &[&rx_buffer])
.map_err(queue_to_network_error)?;
assert!(self.rx_buffers.put_at(token as usize, rx_buffer).is_none());
if self.recv_queue.should_notify() {
self.recv_queue.notify();
self.poll_stat.received_packet += 1;
if self.poll_stat.received_packet == QUEUE_SIZE as _ {
// If we know there are no free buffers for receiving,
// we will notify the receive queue as soon as possible.
self.notify_receive_queue();
}
Ok(())
}
/// Receive a packet from network. If packet is ready, returns a RxBuffer containing the packet.
/// Otherwise, return NotReady error.
/// Receives a packet from network.
fn receive(&mut self) -> Result<RxBuffer, VirtioNetError> {
let (token, len) = self.recv_queue.pop_used().map_err(queue_to_network_error)?;
debug!("receive packet: token = {}, len = {}", token, len);
@ -171,8 +191,7 @@ impl NetworkDevice {
Ok(rx_buffer)
}
/// Send a packet to network. Return until the request completes.
/// FIEME: Replace tx_buffer with VM segment-based data structure to use dma mapping.
/// Sends a packet to network.
fn send(&mut self, packet: &[u8]) -> Result<(), VirtioNetError> {
if !self.can_send() {
return Err(VirtioNetError::Busy);
@ -184,8 +203,13 @@ impl NetworkDevice {
.send_queue
.add_dma_buf(&[&tx_buffer], &[])
.map_err(queue_to_network_error)?;
if self.send_queue.should_notify() {
self.send_queue.notify();
self.poll_stat.sent_packet += 1;
if self.send_queue.available_desc() == 0 {
// If the send queue is full,
// we will notify the send queue as soon as possible.
self.notify_send_queue();
}
debug!("send packet, token = {}, len = {}", token, packet.len());
@ -208,6 +232,38 @@ impl NetworkDevice {
Ok(())
}
fn notify_send_queue(&mut self) {
if self.poll_stat.sent_packet == 0 {
return;
}
debug!(
"notify send queue: sent {} packets",
self.poll_stat.sent_packet
);
if self.send_queue.should_notify() {
self.send_queue.notify();
}
self.poll_stat.sent_packet = 0;
}
fn notify_receive_queue(&mut self) {
if self.poll_stat.received_packet == 0 {
return;
}
debug!(
"notify receive queue: received {} packets",
self.poll_stat.received_packet
);
if self.recv_queue.should_notify() {
self.recv_queue.notify();
}
self.poll_stat.received_packet = 0;
}
}
fn queue_to_network_error(err: QueueError) -> VirtioNetError {
@ -288,6 +344,11 @@ impl AnyNetworkDevice for NetworkDevice {
self.tx_buffers[token as usize] = None;
}
}
fn notify_poll_end(&mut self) {
self.notify_send_queue();
self.notify_receive_queue();
}
}
impl Debug for NetworkDevice {

View File

@ -18,3 +18,9 @@ pub trait WithDevice: Send + Sync {
where
F: FnOnce(&mut Self::Device) -> R;
}
/// A trait for notifying device drivers about the polling process.
pub trait NotifyDevice {
/// Notifies the device driver that polling has ended.
fn notify_poll_end(&mut self);
}

View File

@ -13,7 +13,7 @@ use smoltcp::{
};
use crate::{
device::WithDevice,
device::{NotifyDevice, WithDevice},
ext::Ext,
iface::{
common::IfaceCommon, iface::internal::IfaceInternal, time::get_network_timestamp, Iface,
@ -70,7 +70,10 @@ impl<D, E: Ext> IfaceInternal<E> for EtherIface<D, E> {
}
}
impl<D: WithDevice + 'static, E: Ext> Iface<E> for EtherIface<D, E> {
impl<D: WithDevice + 'static, E: Ext> Iface<E> for EtherIface<D, E>
where
D::Device: NotifyDevice,
{
fn poll(&self) {
self.driver.with(|device| {
let next_poll = self.common.poll(
@ -78,6 +81,7 @@ impl<D: WithDevice + 'static, E: Ext> Iface<E> for EtherIface<D, E> {
|data, iface_cx, tx_token| self.process(data, iface_cx, tx_token),
|pkt, iface_cx, tx_token| self.dispatch(pkt, iface_cx, tx_token),
);
device.notify_poll_end();
self.common.sched_poll().schedule_next_poll(next_poll);
});
}