Optimize vsock code structure

This commit is contained in:
Anmin Liu
2024-05-14 10:36:14 +00:00
committed by Tate, Hongliang Tian
parent 60dd17fdd3
commit 646406115e
16 changed files with 189 additions and 94 deletions

View File

@ -19,10 +19,15 @@ jobs:
sudo modprobe vhost_vsock sudo modprobe vhost_vsock
sudo apt-get install socat sudo apt-get install socat
echo "Run vsock server on host...." echo "Run vsock server on host...."
socat -ddd VSOCK-LISTEN:1234,fork SYSTEM:'read cmd; result=\$(eval \"\$cmd\" 2>&1); echo \"\$result\"' & socat -ddd VSOCK-LISTEN:1234,fork SYSTEM:'while read cmd; do result=$(eval "$cmd" 2>&1); echo "$result"; done' &
- name: Run Vsock Client on Guest - name: Run Vsock Client and Server on Guest
id: guest_vsock_client_server id: guest_vsock_client_server
run: | run: |
docker run --privileged --network=host --device=/dev/kvm -v ./:/root/asterinas asterinas/asterinas:0.4.2 \ docker run --privileged --network=host --device=/dev/kvm -v ./:/root/asterinas asterinas/asterinas:0.4.2 \
make run AUTO_TEST=vsock ENABLE_KVM=0 SCHEME=microvm RELEASE_MODE=1 make run AUTO_TEST=vsock ENABLE_KVM=0 SCHEME=microvm RELEASE_MODE=1 &
- name: Run Vsock Client on Host
id: host_vsock_client
run: |
sleep 5m
echo "Run vsock client on host...."
echo "Hello from host" | socat -dd - vsock-connect:3:4321

View File

@ -65,7 +65,6 @@ qemu.args = """\
-device virtio-net-pci,netdev=mynet0,disable-legacy=on,disable-modern=off \ -device virtio-net-pci,netdev=mynet0,disable-legacy=on,disable-modern=off \
-device virtio-keyboard-pci,disable-legacy=on,disable-modern=off \ -device virtio-keyboard-pci,disable-legacy=on,disable-modern=off \
-device virtio-blk-pci,bus=pcie.0,addr=0x6,drive=x0,disable-legacy=on,disable-modern=off \ -device virtio-blk-pci,bus=pcie.0,addr=0x6,drive=x0,disable-legacy=on,disable-modern=off \
-device vhost-vsock-pci,id=vhost-vsock-pci0,guest-cid=3,disable-legacy=on,disable-modern=off \
-drive file=fs.img,if=none,format=raw,id=x0 \ -drive file=fs.img,if=none,format=raw,id=x0 \
-netdev user,id=mynet0,hostfwd=tcp::10027-:22,hostfwd=tcp::54136-:8090 \ -netdev user,id=mynet0,hostfwd=tcp::10027-:22,hostfwd=tcp::54136-:8090 \
-chardev stdio,id=mux,mux=on,logfile=./$(date '+%Y-%m-%dT%H%M%S').log \ -chardev stdio,id=mux,mux=on,logfile=./$(date '+%Y-%m-%dT%H%M%S').log \

View File

@ -6,7 +6,6 @@ use aster_virtio::device::socket::{
connect::{ConnectionInfo, VsockEvent, VsockEventType}, connect::{ConnectionInfo, VsockEvent, VsockEventType},
device::SocketDevice, device::SocketDevice,
error::SocketError, error::SocketError,
get_device, DEVICE_NAME,
}; };
use super::{ use super::{
@ -34,8 +33,7 @@ pub struct VsockSpace {
impl VsockSpace { impl VsockSpace {
/// Create a new global VsockSpace /// Create a new global VsockSpace
pub fn new() -> Self { pub fn new(driver: Arc<SpinLock<SocketDevice>>) -> Self {
let driver = get_device(DEVICE_NAME).unwrap();
Self { Self {
driver, driver,
connecting_sockets: SpinLock::new(BTreeMap::new()), connecting_sockets: SpinLock::new(BTreeMap::new()),
@ -44,6 +42,7 @@ impl VsockSpace {
used_ports: SpinLock::new(BTreeSet::new()), used_ports: SpinLock::new(BTreeSet::new()),
} }
} }
/// Check whether the event is for this socket space /// Check whether the event is for this socket space
fn is_event_for_socket(&self, event: &VsockEvent) -> bool { fn is_event_for_socket(&self, event: &VsockEvent) -> bool {
self.connecting_sockets self.connecting_sockets
@ -58,10 +57,12 @@ impl VsockSpace {
.read_irq_disabled() .read_irq_disabled()
.contains_key(&(*event).into()) .contains_key(&(*event).into())
} }
/// Alloc an unused port range /// Alloc an unused port range
pub fn alloc_ephemeral_port(&self) -> Result<u32> { pub fn alloc_ephemeral_port(&self) -> Result<u32> {
let mut used_ports = self.used_ports.lock_irq_disabled(); let mut used_ports = self.used_ports.lock_irq_disabled();
for port in 1024..=u32::MAX { // FIXME: the maximal port number is not defined by spec
for port in 1024..u32::MAX {
if !used_ports.contains(&port) { if !used_ports.contains(&port) {
used_ports.insert(port); used_ports.insert(port);
return Ok(port); return Ok(port);
@ -69,15 +70,20 @@ impl VsockSpace {
} }
return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port"); return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port");
} }
pub fn insert_port(&self, port: u32) -> bool {
/// Bind a port
pub fn bind_port(&self, port: u32) -> bool {
let mut used_ports = self.used_ports.lock_irq_disabled(); let mut used_ports = self.used_ports.lock_irq_disabled();
used_ports.insert(port) used_ports.insert(port)
} }
/// Recycle a port
pub fn recycle_port(&self, port: &u32) -> bool { pub fn recycle_port(&self, port: &u32) -> bool {
let mut used_ports = self.used_ports.lock_irq_disabled(); let mut used_ports = self.used_ports.lock_irq_disabled();
used_ports.remove(port) used_ports.remove(port)
} }
/// Insert a connected socket
pub fn insert_connected_socket( pub fn insert_connected_socket(
&self, &self,
id: ConnectionID, id: ConnectionID,
@ -86,10 +92,14 @@ impl VsockSpace {
let mut connected_sockets = self.connected_sockets.write_irq_disabled(); let mut connected_sockets = self.connected_sockets.write_irq_disabled();
connected_sockets.insert(id, connected) connected_sockets.insert(id, connected)
} }
/// Remove a connected socket
pub fn remove_connected_socket(&self, id: &ConnectionID) -> Option<Arc<Connected>> { pub fn remove_connected_socket(&self, id: &ConnectionID) -> Option<Arc<Connected>> {
let mut connected_sockets = self.connected_sockets.write_irq_disabled(); let mut connected_sockets = self.connected_sockets.write_irq_disabled();
connected_sockets.remove(id) connected_sockets.remove(id)
} }
/// Insert a connecting socket
pub fn insert_connecting_socket( pub fn insert_connecting_socket(
&self, &self,
addr: VsockSocketAddr, addr: VsockSocketAddr,
@ -98,10 +108,14 @@ impl VsockSpace {
let mut connecting_sockets = self.connecting_sockets.lock_irq_disabled(); let mut connecting_sockets = self.connecting_sockets.lock_irq_disabled();
connecting_sockets.insert(addr, connecting) connecting_sockets.insert(addr, connecting)
} }
/// Remove a connecting socket
pub fn remove_connecting_socket(&self, addr: &VsockSocketAddr) -> Option<Arc<Connecting>> { pub fn remove_connecting_socket(&self, addr: &VsockSocketAddr) -> Option<Arc<Connecting>> {
let mut connecting_sockets = self.connecting_sockets.lock_irq_disabled(); let mut connecting_sockets = self.connecting_sockets.lock_irq_disabled();
connecting_sockets.remove(addr) connecting_sockets.remove(addr)
} }
/// Insert a listening socket
pub fn insert_listen_socket( pub fn insert_listen_socket(
&self, &self,
addr: VsockSocketAddr, addr: VsockSocketAddr,
@ -110,6 +124,8 @@ impl VsockSpace {
let mut listen_sockets = self.listen_sockets.lock_irq_disabled(); let mut listen_sockets = self.listen_sockets.lock_irq_disabled();
listen_sockets.insert(addr, listen) listen_sockets.insert(addr, listen)
} }
/// Remove a listening socket
pub fn remove_listen_socket(&self, addr: &VsockSocketAddr) -> Option<Arc<Listen>> { pub fn remove_listen_socket(&self, addr: &VsockSocketAddr) -> Option<Arc<Listen>> {
let mut listen_sockets = self.listen_sockets.lock_irq_disabled(); let mut listen_sockets = self.listen_sockets.lock_irq_disabled();
listen_sockets.remove(addr) listen_sockets.remove(addr)
@ -117,58 +133,66 @@ impl VsockSpace {
} }
impl VsockSpace { impl VsockSpace {
/// Get the CID of the guest
pub fn guest_cid(&self) -> u32 { pub fn guest_cid(&self) -> u32 {
let driver = self.driver.lock_irq_disabled(); let driver = self.driver.lock_irq_disabled();
driver.guest_cid() as u32 driver.guest_cid() as u32
} }
/// Send a request packet for initializing a new connection.
pub fn request(&self, info: &ConnectionInfo) -> Result<()> { pub fn request(&self, info: &ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.request(info) .request(info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send connect packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send connect packet"))
} }
/// Send a response packet for accepting a new connection.
pub fn response(&self, info: &ConnectionInfo) -> Result<()> { pub fn response(&self, info: &ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.response(info) .response(info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send response packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send response packet"))
} }
/// Send a shutdown packet to close a connection
pub fn shutdown(&self, info: &ConnectionInfo) -> Result<()> { pub fn shutdown(&self, info: &ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.shutdown(info) .shutdown(info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send shutdown packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send shutdown packet"))
} }
/// Send a reset packet to reset a connection
pub fn reset(&self, info: &ConnectionInfo) -> Result<()> { pub fn reset(&self, info: &ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.reset(info) .reset(info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send reset packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send reset packet"))
} }
/// Send a credit request packet
pub fn request_credit(&self, info: &ConnectionInfo) -> Result<()> { pub fn request_credit(&self, info: &ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.credit_request(info) .credit_request(info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send credit request packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send credit request packet"))
} }
/// Send a credit update packet
pub fn update_credit(&self, info: &ConnectionInfo) -> Result<()> { pub fn update_credit(&self, info: &ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.credit_update(info) .credit_update(info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send credit update packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send credit update packet"))
} }
/// Send a data packet
pub fn send(&self, buffer: &[u8], info: &mut ConnectionInfo) -> Result<()> { pub fn send(&self, buffer: &[u8], info: &mut ConnectionInfo) -> Result<()> {
let mut driver = self.driver.lock_irq_disabled(); let mut driver = self.driver.lock_irq_disabled();
driver driver
.send(buffer, info) .send(buffer, info)
.map_err(|_| Error::with_message(Errno::EIO, "can not send data packet")) .map_err(|_| Error::with_message(Errno::EIO, "cannot send data packet"))
} }
/// Poll for each event from the driver /// Poll for each event from the driver
@ -233,7 +257,7 @@ impl VsockSpace {
listen.push_incoming(connected).unwrap(); listen.push_incoming(connected).unwrap();
listen.update_io_events(); listen.update_io_events();
} }
VsockEventType::Connected => { VsockEventType::ConnectionResponse => {
let connecting_sockets = self.connecting_sockets.lock_irq_disabled(); let connecting_sockets = self.connecting_sockets.lock_irq_disabled();
let Some(connecting) = connecting_sockets.get(&event.destination.into()) else { let Some(connecting) = connecting_sockets.get(&event.destination.into()) else {
return_errno_with_message!( return_errno_with_message!(
@ -254,7 +278,7 @@ impl VsockSpace {
let Some(connected) = connected_sockets.get(&event.into()) else { let Some(connected) = connected_sockets.get(&event.into()) else {
return_errno_with_message!(Errno::ENOTCONN, "the socket hasn't connected"); return_errno_with_message!(Errno::ENOTCONN, "the socket hasn't connected");
}; };
connected.peer_requested_shutdown(); connected.set_peer_requested_shutdown();
} }
VsockEventType::Received { length } => {} VsockEventType::Received { length } => {}
VsockEventType::CreditRequest => { VsockEventType::CreditRequest => {
@ -277,9 +301,3 @@ impl VsockSpace {
Ok(Some(event)) Ok(Some(event))
} }
} }
impl Default for VsockSpace {
fn default() -> Self {
Self::new()
}
}

View File

@ -16,8 +16,8 @@ pub use stream::VsockStreamSocket;
pub static VSOCK_GLOBAL: Once<Arc<VsockSpace>> = Once::new(); pub static VSOCK_GLOBAL: Once<Arc<VsockSpace>> = Once::new();
pub fn init() { pub fn init() {
if get_device(DEVICE_NAME).is_some() { if let Some(driver) = get_device(DEVICE_NAME) {
VSOCK_GLOBAL.call_once(|| Arc::new(VsockSpace::new())); VSOCK_GLOBAL.call_once(|| Arc::new(VsockSpace::new(driver)));
register_recv_callback(DEVICE_NAME, || { register_recv_callback(DEVICE_NAME, || {
let vsockspace = VSOCK_GLOBAL.get().unwrap(); let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.poll().unwrap(); vsockspace.poll().unwrap();

View File

@ -33,7 +33,7 @@ impl Connected {
pub fn from_connecting(connecting: Arc<Connecting>) -> Self { pub fn from_connecting(connecting: Arc<Connecting>) -> Self {
Self { Self {
connection: SpinLock::new(Connection::from_info(connecting.info())), connection: SpinLock::new(Connection::new_from_info(connecting.info())),
id: connecting.id(), id: connecting.id(),
pollee: Pollee::new(IoEvents::empty()), pollee: Pollee::new(IoEvents::empty()),
} }
@ -58,7 +58,7 @@ impl Connected {
match bytes_read { match bytes_read {
0 => { 0 => {
if !connection.peer_requested_shutdown { if !connection.is_peer_requested_shutdown() {
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty"); return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty");
} else { } else {
return_errno_with_message!(Errno::ECONNRESET, "the connection is reset"); return_errno_with_message!(Errno::ECONNRESET, "the connection is reset");
@ -84,17 +84,26 @@ impl Connected {
let connection = self.connection.lock_irq_disabled(); let connection = self.connection.lock_irq_disabled();
// If buffer is now empty and the peer requested shutdown, finish shutting down the // If buffer is now empty and the peer requested shutdown, finish shutting down the
// connection. // connection.
connection.peer_requested_shutdown && connection.buffer.is_empty() connection.is_peer_requested_shutdown() && connection.buffer.is_empty()
}
pub fn is_closed(&self) -> bool {
let connection = self.connection.lock_irq_disabled();
connection.is_local_shutdown()
} }
pub fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { pub fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
// TODO: deal with cmd // TODO: deal with cmd
if self.should_close() { if self.should_close() {
let connection = self.connection.lock_irq_disabled(); let mut connection = self.connection.lock_irq_disabled();
if connection.is_local_shutdown() {
return Ok(());
}
let vsockspace = VSOCK_GLOBAL.get().unwrap(); let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.reset(&connection.info).unwrap(); vsockspace.reset(&connection.info).unwrap();
vsockspace.remove_connected_socket(&self.id()); vsockspace.remove_connected_socket(&self.id());
vsockspace.recycle_port(&self.local_addr().port); vsockspace.recycle_port(&self.local_addr().port);
connection.set_local_shutdown();
} }
Ok(()) Ok(())
} }
@ -113,8 +122,10 @@ impl Connected {
connection.add(bytes) connection.add(bytes)
} }
pub fn peer_requested_shutdown(&self) { pub fn set_peer_requested_shutdown(&self) {
self.connection.lock_irq_disabled().peer_requested_shutdown = true self.connection
.lock_irq_disabled()
.set_peer_requested_shutdown()
} }
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
@ -134,41 +145,69 @@ impl Connected {
impl Drop for Connected { impl Drop for Connected {
fn drop(&mut self) { fn drop(&mut self) {
let connection = self.connection.lock_irq_disabled();
if connection.is_local_shutdown() {
return;
}
let vsockspace = VSOCK_GLOBAL.get().unwrap(); let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.reset(&connection.info).unwrap();
vsockspace.remove_connected_socket(&self.id());
vsockspace.recycle_port(&self.local_addr().port); vsockspace.recycle_port(&self.local_addr().port);
} }
} }
pub struct Connection { struct Connection {
info: ConnectionInfo, info: ConnectionInfo,
buffer: HeapRb<u8>, buffer: HeapRb<u8>,
/// The peer sent a SHUTDOWN request, but we haven't yet responded with a RST because there is /// The peer sent a SHUTDOWN request, but we haven't yet responded with a RST because there is
/// still data in the buffer. /// still data in the buffer.
pub peer_requested_shutdown: bool, peer_requested_shutdown: bool,
local_shutdown: bool,
} }
impl Connection { impl Connection {
pub fn new(peer: VsockSocketAddr, local_port: u32) -> Self { fn new(peer: VsockSocketAddr, local_port: u32) -> Self {
let mut info = ConnectionInfo::new(peer.into(), local_port); let mut info = ConnectionInfo::new(peer.into(), local_port);
info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap(); info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap();
Self { Self {
info, info,
buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY), buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY),
peer_requested_shutdown: false, peer_requested_shutdown: false,
local_shutdown: false,
} }
} }
pub fn from_info(mut info: ConnectionInfo) -> Self {
fn is_peer_requested_shutdown(&self) -> bool {
self.peer_requested_shutdown
}
fn set_peer_requested_shutdown(&mut self) {
self.peer_requested_shutdown = true
}
fn is_local_shutdown(&self) -> bool {
self.local_shutdown
}
fn set_local_shutdown(&mut self) {
self.local_shutdown = true
}
fn new_from_info(mut info: ConnectionInfo) -> Self {
info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap(); info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap();
Self { Self {
info, info,
buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY), buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY),
peer_requested_shutdown: false, peer_requested_shutdown: false,
local_shutdown: false,
} }
} }
pub fn update_for_event(&mut self, event: &VsockEvent) {
fn update_for_event(&mut self, event: &VsockEvent) {
self.info.update_for_event(event) self.info.update_for_event(event)
} }
pub fn add(&mut self, bytes: &[u8]) -> bool {
fn add(&mut self, bytes: &[u8]) -> bool {
if bytes.len() > self.buffer.free_len() { if bytes.len() > self.buffer.free_len() {
return false; return false;
} }

View File

@ -24,6 +24,7 @@ impl Connecting {
pollee: Pollee::new(IoEvents::empty()), pollee: Pollee::new(IoEvents::empty()),
} }
} }
pub fn peer_addr(&self) -> VsockSocketAddr { pub fn peer_addr(&self) -> VsockSocketAddr {
self.id.peer_addr self.id.peer_addr
} }
@ -35,15 +36,19 @@ impl Connecting {
pub fn id(&self) -> ConnectionID { pub fn id(&self) -> ConnectionID {
self.id self.id
} }
pub fn info(&self) -> ConnectionInfo { pub fn info(&self) -> ConnectionInfo {
self.info.lock_irq_disabled().clone() self.info.lock_irq_disabled().clone()
} }
pub fn update_info(&self, event: &VsockEvent) { pub fn update_info(&self, event: &VsockEvent) {
self.info.lock_irq_disabled().update_for_event(event) self.info.lock_irq_disabled().update_for_event(event)
} }
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.pollee.poll(mask, poller) self.pollee.poll(mask, poller)
} }
pub fn add_events(&self, events: IoEvents) { pub fn add_events(&self, events: IoEvents) {
self.pollee.add_events(events) self.pollee.add_events(events)
} }
@ -53,5 +58,6 @@ impl Drop for Connecting {
fn drop(&mut self) { fn drop(&mut self) {
let vsockspace = VSOCK_GLOBAL.get().unwrap(); let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.recycle_port(&self.local_addr().port); vsockspace.recycle_port(&self.local_addr().port);
vsockspace.remove_connecting_socket(&self.local_addr());
} }
} }

View File

@ -44,7 +44,7 @@ impl Init {
} else { } else {
return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port"); return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port");
} }
} else if !vsockspace.insert_port(new_addr.port) { } else if !vsockspace.bind_port(new_addr.port) {
return_errno_with_message!(Errno::EADDRNOTAVAIL, "the port in address is occupied"); return_errno_with_message!(Errno::EADDRNOTAVAIL, "the port in address is occupied");
} }

View File

@ -27,6 +27,7 @@ impl Listen {
pub fn addr(&self) -> VsockSocketAddr { pub fn addr(&self) -> VsockSocketAddr {
self.addr self.addr
} }
pub fn push_incoming(&self, connect: Arc<Connected>) -> Result<()> { pub fn push_incoming(&self, connect: Arc<Connected>) -> Result<()> {
let mut incoming_connections = self.incoming_connection.lock_irq_disabled(); let mut incoming_connections = self.incoming_connection.lock_irq_disabled();
if incoming_connections.len() >= self.backlog { if incoming_connections.len() >= self.backlog {
@ -47,13 +48,14 @@ impl Listen {
Ok(connection) Ok(connection)
} }
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.pollee.poll(mask, poller) self.pollee.poll(mask, poller)
} }
pub fn update_io_events(&self) { pub fn update_io_events(&self) {
let can_accept = !self.incoming_connection.lock_irq_disabled().is_empty(); let incomming_connection = self.incoming_connection.lock_irq_disabled();
if can_accept { if !incomming_connection.is_empty() {
self.pollee.add_events(IoEvents::IN); self.pollee.add_events(IoEvents::IN);
} else { } else {
self.pollee.del_events(IoEvents::IN); self.pollee.del_events(IoEvents::IN);
@ -63,6 +65,8 @@ impl Listen {
impl Drop for Listen { impl Drop for Listen {
fn drop(&mut self) { fn drop(&mut self) {
VSOCK_GLOBAL.get().unwrap().recycle_port(&self.addr.port); let vsockspace = VSOCK_GLOBAL.get().unwrap();
vsockspace.recycle_port(&self.addr.port);
vsockspace.remove_listen_socket(&self.addr);
} }
} }

View File

@ -35,12 +35,14 @@ impl VsockStreamSocket {
is_nonblocking: AtomicBool::new(nonblocking), is_nonblocking: AtomicBool::new(nonblocking),
} }
} }
pub(super) fn new_from_connected(connected: Arc<Connected>) -> Self { pub(super) fn new_from_connected(connected: Arc<Connected>) -> Self {
Self { Self {
status: RwLock::new(Status::Connected(connected)), status: RwLock::new(Status::Connected(connected)),
is_nonblocking: AtomicBool::new(false), is_nonblocking: AtomicBool::new(false),
} }
} }
fn is_nonblocking(&self) -> bool { fn is_nonblocking(&self) -> bool {
self.is_nonblocking.load(Ordering::Relaxed) self.is_nonblocking.load(Ordering::Relaxed)
} }
@ -103,7 +105,8 @@ impl VsockStreamSocket {
VSOCK_GLOBAL VSOCK_GLOBAL
.get() .get()
.unwrap() .unwrap()
.response(&connected.get_info())?; .response(&connected.get_info())
.unwrap();
let socket = Arc::new(VsockStreamSocket::new_from_connected(connected)); let socket = Arc::new(VsockStreamSocket::new_from_connected(connected));
Ok((socket, peer_addr.into())) Ok((socket, peer_addr.into()))
@ -123,7 +126,6 @@ impl VsockStreamSocket {
let peer_addr = self.peer_addr()?; let peer_addr = self.peer_addr()?;
// If buffer is now empty and the peer requested shutdown, finish shutting down the // If buffer is now empty and the peer requested shutdown, finish shutting down the
// connection. // connection.
// TODO: properly place the close request
if connected.should_close() { if connected.should_close() {
if let Err(e) = self.shutdown(SockShutdownCmd::SHUT_RDWR) { if let Err(e) = self.shutdown(SockShutdownCmd::SHUT_RDWR) {
debug!("The error is {:?}", e); debug!("The error is {:?}", e);
@ -189,7 +191,7 @@ impl Socket for VsockStreamSocket {
} }
// Since blocking mode is supported, there is no need to store the connecting status. // Since blocking mode is supported, there is no need to store the connecting status.
// TODO: Refactor when blocking mode is supported. // TODO: Refactor when nonblocking mode is supported.
fn connect(&self, sockaddr: SocketAddr) -> Result<()> { fn connect(&self, sockaddr: SocketAddr) -> Result<()> {
let init = match &*self.status.read() { let init = match &*self.status.read() {
Status::Init(init) => init.clone(), Status::Init(init) => init.clone(),
@ -223,7 +225,12 @@ impl Socket for VsockStreamSocket {
.poll(IoEvents::IN, Some(&poller)) .poll(IoEvents::IN, Some(&poller))
.contains(IoEvents::IN) .contains(IoEvents::IN)
{ {
poller.wait()?; if let Err(e) = poller.wait() {
vsockspace
.remove_connecting_socket(&connecting.local_addr())
.unwrap();
return Err(e);
}
} }
vsockspace vsockspace

View File

@ -53,7 +53,7 @@ pub enum VsockEventType {
/// The peer requests to establish a connection with us. /// The peer requests to establish a connection with us.
ConnectionRequest, ConnectionRequest,
/// The connection was successfully established. /// The connection was successfully established.
Connected, ConnectionResponse,
/// The connection was closed. /// The connection was closed.
Disconnected { Disconnected {
/// The reason for the disconnection. /// The reason for the disconnection.
@ -107,7 +107,7 @@ impl VsockEvent {
} }
VirtioVsockOp::Response => { VirtioVsockOp::Response => {
header.check_data_is_empty()?; header.check_data_is_empty()?;
VsockEventType::Connected VsockEventType::ConnectionResponse
} }
VirtioVsockOp::CreditUpdate => { VirtioVsockOp::CreditUpdate => {
header.check_data_is_empty()?; header.check_data_is_empty()?;

View File

@ -9,7 +9,7 @@ use log::debug;
use pod::Pod; use pod::Pod;
use super::{ use super::{
buffer::{RxBuffer, RX_BUFFER_LEN}, buffer::RxBuffer,
config::{VirtioVsockConfig, VsockFeatures}, config::{VirtioVsockConfig, VsockFeatures},
connect::{ConnectionInfo, VsockEvent}, connect::{ConnectionInfo, VsockEvent},
error::SocketError, error::SocketError,
@ -99,6 +99,7 @@ impl SocketDevice {
fn handle_vsock_event(_: &TrapFrame) { fn handle_vsock_event(_: &TrapFrame) {
handle_recv_irq(super::DEVICE_NAME); handle_recv_irq(super::DEVICE_NAME);
} }
// FIXME: handle event virtqueue notification in live migration
device device
.transport .transport
@ -184,6 +185,7 @@ impl SocketDevice {
header: &VirtioVsockHdr, header: &VirtioVsockHdr,
buffer: &[u8], buffer: &[u8],
) -> Result<(), SocketError> { ) -> Result<(), SocketError> {
debug!("Sent packet {:?}. Op {:?}", header, header.op());
debug!("buffer in send_packet_to_tx_queue: {:?}", buffer); debug!("buffer in send_packet_to_tx_queue: {:?}", buffer);
let tx_buffer = TxBuffer::new(header, buffer); let tx_buffer = TxBuffer::new(header, buffer);
@ -254,9 +256,8 @@ impl SocketDevice {
/// Receive bytes from peer, returns the header /// Receive bytes from peer, returns the header
pub fn receive( pub fn receive(
&mut self, &mut self,
buffer: &mut [u8],
// connection_info: &mut ConnectionInfo, // connection_info: &mut ConnectionInfo,
) -> Result<VirtioVsockHdr, SocketError> { ) -> Result<RxBuffer, SocketError> {
let (token, len) = self.recv_queue.pop_used()?; let (token, len) = self.recv_queue.pop_used()?;
debug!( debug!(
"receive packet in rx_queue: token = {}, len = {}", "receive packet in rx_queue: token = {}, len = {}",
@ -268,21 +269,10 @@ impl SocketDevice {
.ok_or(QueueError::WrongToken)?; .ok_or(QueueError::WrongToken)?;
rx_buffer.set_packet_len(len as usize); rx_buffer.set_packet_len(len as usize);
let mut buf_reader = rx_buffer.buf(); let new_rx_buffer = RxBuffer::new(size_of::<VirtioVsockHdr>());
let mut temp_buffer = vec![0u8; buf_reader.remain()]; self.add_rx_buffer(new_rx_buffer, token)?;
buf_reader.read(&mut VmWriter::from(&mut temp_buffer as &mut [u8]));
let (header, payload) = read_header_and_body(&temp_buffer)?; Ok(rx_buffer)
// The length written should be equal to len(header)+len(packet)
assert_eq!(len, header.len() + VIRTIO_VSOCK_HDR_LEN as u32);
debug!("Received packet {:?}. Op {:?}", header, header.op());
debug!("body is {:?}", payload);
assert!(buffer.len() >= payload.len());
buffer[..payload.len()].copy_from_slice(payload);
self.add_rx_buffer(rx_buffer, token)?;
Ok(header)
} }
/// Polls the RX virtqueue for the next event, and calls the given handler function to handle it. /// Polls the RX virtqueue for the next event, and calls the given handler function to handle it.
@ -294,10 +284,17 @@ impl SocketDevice {
if !self.recv_queue.can_pop() { if !self.recv_queue.can_pop() {
return Ok(None); return Ok(None);
} }
let mut body = vec![0u8; RX_BUFFER_LEN]; let rx_buffer = self.receive()?;
let header = self.receive(&mut body)?;
VsockEvent::from_header(&header).and_then(|event| handler(event, &body)) let mut buf_reader = rx_buffer.buf();
let mut temp_buffer = vec![0u8; buf_reader.remain()];
buf_reader.read(&mut VmWriter::from(&mut temp_buffer as &mut [u8]));
let (header, payload) = read_header_and_body(&temp_buffer)?;
// The length written should be equal to len(header)+len(packet)
debug!("Received packet {:?}. Op {:?}", header, header.op());
debug!("body is {:?}", payload);
VsockEvent::from_header(&header).and_then(|event| handler(event, payload))
} }
/// Add a used rx buffer to recv queue,@index is only to check the correctness /// Add a used rx buffer to recv queue,@index is only to check the correctness

View File

@ -21,18 +21,18 @@ pub fn register_device(name: String, device: Arc<SpinLock<SocketDevice>>) {
VSOCK_DEVICE_TABLE VSOCK_DEVICE_TABLE
.get() .get()
.unwrap() .unwrap()
.lock() .lock_irq_disabled()
.insert(name, (Arc::new(SpinLock::new(Vec::new())), device)); .insert(name, (Arc::new(SpinLock::new(Vec::new())), device));
} }
pub fn get_device(str: &str) -> Option<Arc<SpinLock<SocketDevice>>> { pub fn get_device(str: &str) -> Option<Arc<SpinLock<SocketDevice>>> {
let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock(); let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled();
let (_, device) = lock.get(str)?; let (_, device) = lock.get(str)?;
Some(device.clone()) Some(device.clone())
} }
pub fn all_devices() -> Vec<(String, Arc<SpinLock<SocketDevice>>)> { pub fn all_devices() -> Vec<(String, Arc<SpinLock<SocketDevice>>)> {
let vsock_devs = VSOCK_DEVICE_TABLE.get().unwrap().lock(); let vsock_devs = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled();
vsock_devs vsock_devs
.iter() .iter()
.map(|(name, (_, device))| (name.clone(), device.clone())) .map(|(name, (_, device))| (name.clone(), device.clone()))
@ -40,20 +40,19 @@ pub fn all_devices() -> Vec<(String, Arc<SpinLock<SocketDevice>>)> {
} }
pub fn register_recv_callback(name: &str, callback: impl VsockDeviceIrqHandler) { pub fn register_recv_callback(name: &str, callback: impl VsockDeviceIrqHandler) {
let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock(); let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled();
let Some((callbacks, _)) = lock.get(name) else { let Some((callbacks, _)) = lock.get(name) else {
return; return;
}; };
callbacks.lock().push(Arc::new(callback)); callbacks.lock_irq_disabled().push(Arc::new(callback));
} }
pub fn handle_recv_irq(name: &str) { pub fn handle_recv_irq(name: &str) {
let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock(); let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled();
let Some((callbacks, _)) = lock.get(name) else { let Some((callbacks, _)) = lock.get(name) else {
return; return;
}; };
let callbacks = callbacks.clone(); let lock = callbacks.lock_irq_disabled();
let lock = callbacks.lock();
for callback in lock.iter() { for callback in lock.iter() {
callback.call(()) callback.call(())
} }

View File

@ -9,5 +9,5 @@ cd ${VSOCK_DIR}
echo "Start vsock test......" echo "Start vsock test......"
./vsock_client ./vsock_client
# ./vsock_server ./vsock_server
echo "Vsock test passed." echo "Vsock test passed."

View File

@ -0,0 +1,4 @@
# SPDX-License-Identifier: MPL-2.0
echo 'Line 1: Hello from host'
echo 'Line 2: Hello from host, again'

View File

@ -12,36 +12,53 @@
int main() int main()
{ {
int sock; int sock;
char *hello = "echo 'Hello from host'\n";
char buffer[1024] = { 0 }; char buffer[1024] = { 0 };
FILE *commandFile;
struct sockaddr_vm serv_addr; struct sockaddr_vm serv_addr;
commandFile = fopen("../vsock_commands.sh", "r");
if (commandFile == NULL) {
perror("Failed to open the command file");
return -1;
}
if ((sock = socket(AF_VSOCK, SOCK_STREAM, 0)) < 0) { if ((sock = socket(AF_VSOCK, SOCK_STREAM, 0)) < 0) {
printf("\n Socket creation error\n"); perror("\n Socket creation error");
fclose(commandFile);
return -1; return -1;
} }
printf("\n Create socket successfully!\n"); printf("\n Create socket successfully!\n");
serv_addr.svm_family = AF_VSOCK; serv_addr.svm_family = AF_VSOCK;
serv_addr.svm_cid = VMADDR_CID_HOST; serv_addr.svm_cid = VMADDR_CID_HOST;
serv_addr.svm_port = PORT; serv_addr.svm_port = PORT;
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) <
0) { 0) {
printf("\nConnection Failed \n"); perror("\nConnection Failed");
close(sock);
fclose(commandFile);
return -1; return -1;
} }
printf("\n Socket connect successfully!\n"); printf("\n Socket connected successfully!\n");
// Send message to the server and receive the reply char command[1024];
if (send(sock, hello, strlen(hello), 0) < 0) { while (fgets(command, sizeof(command), commandFile) != NULL) {
printf("\nSend Failed\n"); if (send(sock, command, strlen(command), 0) < 0) {
return -1; perror("\nSend Failed");
break;
}
printf("Command sent: %s", command);
memset(buffer, 0, sizeof(buffer));
if (read(sock, buffer, sizeof(buffer)) < 0) {
perror("\nRead Failed");
break;
}
printf("Server: %s\n", buffer);
} }
printf("Hello message sent\n");
if (read(sock, buffer, 1024) < 0) { close(sock);
printf("\nRead Failed\n"); fclose(commandFile);
return -1;
}
printf("Server: %s\n", buffer);
return 0; return 0;
} }

View File

@ -55,4 +55,4 @@ export HOST_PORT=8888
iperf3 -s -B $HOST_ADDR -p $HOST_PORT -D # Start the server as a daemon iperf3 -s -B $HOST_ADDR -p $HOST_PORT -D # Start the server as a daemon
iperf3 -c $HOST_ADDR -p $HOST_PORT # Start the client iperf3 -c $HOST_ADDR -p $HOST_PORT # Start the client
``` ```
Note that [a variant of iperf3](https://github.com/stefano-garzarella/iperf-vsock) can measure the performance of `vsock`. Note that [a variant of iperf3](https://github.com/stefano-garzarella/iperf-vsock) can measure the performance of `vsock`. But the implemented `vsock` has not been verified to work well in it.