Support poll multiple packets

This commit is contained in:
Anmin Liu
2024-05-16 04:22:31 +00:00
committed by Tate, Hongliang Tian
parent 646406115e
commit 878e8a88f4
6 changed files with 118 additions and 128 deletions

View File

@ -28,6 +28,6 @@ jobs:
- name: Run Vsock Client on Host - name: Run Vsock Client on Host
id: host_vsock_client id: host_vsock_client
run: | run: |
sleep 5m sleep 6m
echo "Run vsock client on host...." echo "Run vsock client on host...."
echo "Hello from host" | socat -dd - vsock-connect:3:4321 echo "Hello from host" | socat -dd - vsock-connect:3:4321

View File

@ -196,40 +196,16 @@ impl VsockSpace {
} }
/// Poll for each event from the driver /// Poll for each event from the driver
pub fn poll(&self) -> Result<Option<VsockEvent>> { pub fn poll(&self) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
let guest_cid = driver.guest_cid() as u32; let guest_cid = driver.guest_cid() as u32;
// match the socket and store the buffer body (if valid)
let result = driver while let Some(event) = self.poll_single(&mut driver)? {
.poll(|event, body| {
if !self.is_event_for_socket(&event) { if !self.is_event_for_socket(&event) {
return Ok(None); debug!("ignore event {:?}", event);
continue;
} }
// Deal with Received before the buffer are recycled.
if let VsockEventType::Received { length } = event.event_type {
// Only consider the connected socket and copy body to buffer
if let Some(connected) = self
.connected_sockets
.read_irq_disabled()
.get(&event.into())
{
debug!("Rw matches a connection with id {:?}", connected.id());
if !connected.add_connection_buffer(body) {
return Err(SocketError::BufferTooShort);
}
connected.update_io_events();
} else {
return Ok(None);
}
}
Ok(Some(event))
})
.map_err(|e| Error::with_message(Errno::EIO, "driver poll failed, please try again"))?;
let Some(event) = result else {
return Ok(None);
};
debug!("vsock receive event: {:?}", event); debug!("vsock receive event: {:?}", event);
// The socket must be stored in the VsockSpace. // The socket must be stored in the VsockSpace.
if let Some(connected) = self if let Some(connected) = self
@ -286,9 +262,9 @@ impl VsockSpace {
let Some(connected) = connected_sockets.get(&event.into()) else { let Some(connected) = connected_sockets.get(&event.into()) else {
return_errno_with_message!(Errno::ENOTCONN, "the socket hasn't connected"); return_errno_with_message!(Errno::ENOTCONN, "the socket hasn't connected");
}; };
driver driver.credit_update(&connected.get_info()).map_err(|_| {
.credit_update(&connected.get_info()) Error::with_message(Errno::EIO, "cannot send credit update")
.map_err(|_| Error::with_message(Errno::EIO, "cannot send credit update"))?; })?;
} }
VsockEventType::CreditUpdate => { VsockEventType::CreditUpdate => {
let connected_sockets = self.connected_sockets.read_irq_disabled(); let connected_sockets = self.connected_sockets.read_irq_disabled();
@ -298,6 +274,26 @@ impl VsockSpace {
connected.update_info(&event); connected.update_info(&event);
} }
} }
}
Ok(())
}
fn poll_single(&self, driver: &mut SocketDevice) -> Result<Option<VsockEvent>> {
driver
.poll(|event, body| {
// Deal with Received before the buffer are recycled.
if let VsockEventType::Received { length } = event.event_type {
// Only consider the connected socket and copy body to buffer
let connected_sockets = self.connected_sockets.read_irq_disabled();
let connected = connected_sockets.get(&event.into()).unwrap();
debug!("Rw matches a connection with id {:?}", connected.id());
if !connected.add_connection_buffer(body) {
return Err(SocketError::BufferTooShort);
}
connected.update_io_events();
}
Ok(Some(event)) Ok(Some(event))
})
.map_err(|e| Error::with_message(Errno::EIO, "driver poll failed"))
} }
} }

View File

@ -101,8 +101,6 @@ impl Connected {
} }
let vsockspace = VSOCK_GLOBAL.get().unwrap(); let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.reset(&connection.info).unwrap(); vsockspace.reset(&connection.info).unwrap();
vsockspace.remove_connected_socket(&self.id());
vsockspace.recycle_port(&self.local_addr().port);
connection.set_local_shutdown(); connection.set_local_shutdown();
} }
Ok(()) Ok(())
@ -143,19 +141,6 @@ impl Connected {
} }
} }
impl Drop for Connected {
fn drop(&mut self) {
let connection = self.connection.lock_irq_disabled();
if connection.is_local_shutdown() {
return;
}
let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.reset(&connection.info).unwrap();
vsockspace.remove_connected_socket(&self.id());
vsockspace.recycle_port(&self.local_addr().port);
}
}
struct Connection { struct Connection {
info: ConnectionInfo, info: ConnectionInfo,
buffer: HeapRb<u8>, buffer: HeapRb<u8>,

View File

@ -66,15 +66,6 @@ impl Init {
} }
} }
impl Drop for Init {
fn drop(&mut self) {
if let Some(addr) = *self.bound_addr.lock() {
let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.recycle_port(&addr.port);
}
}
}
impl Default for Init { impl Default for Init {
fn default() -> Self { fn default() -> Self {
Self::new() Self::new()

View File

@ -3,7 +3,7 @@
use super::connected::Connected; use super::connected::Connected;
use crate::{ use crate::{
events::IoEvents, events::IoEvents,
net::socket::vsock::{addr::VsockSocketAddr, VSOCK_GLOBAL}, net::socket::vsock::addr::VsockSocketAddr,
prelude::*, prelude::*,
process::signal::{Pollee, Poller}, process::signal::{Pollee, Poller},
}; };
@ -31,8 +31,9 @@ impl Listen {
pub fn push_incoming(&self, connect: Arc<Connected>) -> Result<()> { pub fn push_incoming(&self, connect: Arc<Connected>) -> Result<()> {
let mut incoming_connections = self.incoming_connection.lock_irq_disabled(); let mut incoming_connections = self.incoming_connection.lock_irq_disabled();
if incoming_connections.len() >= self.backlog { if incoming_connections.len() >= self.backlog {
return_errno_with_message!(Errno::ENOMEM, "queue in listenging socket is full") return_errno_with_message!(Errno::ECONNREFUSED, "queue in listenging socket is full")
} }
// FIXME: check if the port is already used
incoming_connections.push_back(connect); incoming_connections.push_back(connect);
Ok(()) Ok(())
} }
@ -62,11 +63,3 @@ impl Listen {
} }
} }
} }
impl Drop for Listen {
fn drop(&mut self) {
let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.recycle_port(&self.addr.port);
vsockspace.remove_listen_socket(&self.addr);
}
}

View File

@ -339,3 +339,28 @@ impl Socket for VsockStreamSocket {
} }
} }
} }
impl Drop for VsockStreamSocket {
fn drop(&mut self) {
let vsockspace = VSOCK_GLOBAL.get().unwrap();
let inner = self.status.read();
match &*inner {
Status::Init(init) => {
if let Some(addr) = init.bound_addr() {
vsockspace.recycle_port(&addr.port);
}
}
Status::Listen(listen) => {
vsockspace.recycle_port(&listen.addr().port);
vsockspace.remove_listen_socket(&listen.addr());
}
Status::Connected(connected) => {
if !connected.is_closed() {
vsockspace.reset(&connected.get_info()).unwrap();
}
vsockspace.remove_connected_socket(&connected.id());
vsockspace.recycle_port(&connected.local_addr().port);
}
}
}
}