diff --git a/.github/workflows/vsock_interaction.yml b/.github/workflows/vsock_interaction.yml index 4115d9be7..a95338406 100644 --- a/.github/workflows/vsock_interaction.yml +++ b/.github/workflows/vsock_interaction.yml @@ -19,10 +19,15 @@ jobs: sudo modprobe vhost_vsock sudo apt-get install socat echo "Run vsock server on host...." - socat -ddd VSOCK-LISTEN:1234,fork SYSTEM:'read cmd; result=\$(eval \"\$cmd\" 2>&1); echo \"\$result\"' & - - name: Run Vsock Client on Guest + socat -ddd VSOCK-LISTEN:1234,fork SYSTEM:'while read cmd; do result=$(eval "$cmd" 2>&1); echo "$result"; done' & + - name: Run Vsock Client and Server on Guest id: guest_vsock_client_server run: | docker run --privileged --network=host --device=/dev/kvm -v ./:/root/asterinas asterinas/asterinas:0.4.2 \ - make run AUTO_TEST=vsock ENABLE_KVM=0 SCHEME=microvm RELEASE_MODE=1 - \ No newline at end of file + make run AUTO_TEST=vsock ENABLE_KVM=0 SCHEME=microvm RELEASE_MODE=1 & + - name: Run Vsock Client on Host + id: host_vsock_client + run: | + sleep 5m + echo "Run vsock client on host...." + echo "Hello from host" | socat -dd - vsock-connect:3:4321 diff --git a/OSDK.toml b/OSDK.toml index 2a5439d62..fdf05dfa5 100644 --- a/OSDK.toml +++ b/OSDK.toml @@ -65,7 +65,6 @@ qemu.args = """\ -device virtio-net-pci,netdev=mynet0,disable-legacy=on,disable-modern=off \ -device virtio-keyboard-pci,disable-legacy=on,disable-modern=off \ -device virtio-blk-pci,bus=pcie.0,addr=0x6,drive=x0,disable-legacy=on,disable-modern=off \ - -device vhost-vsock-pci,id=vhost-vsock-pci0,guest-cid=3,disable-legacy=on,disable-modern=off \ -drive file=fs.img,if=none,format=raw,id=x0 \ -netdev user,id=mynet0,hostfwd=tcp::10027-:22,hostfwd=tcp::54136-:8090 \ -chardev stdio,id=mux,mux=on,logfile=./$(date '+%Y-%m-%dT%H%M%S').log \ diff --git a/kernel/aster-nix/src/net/socket/vsock/common.rs b/kernel/aster-nix/src/net/socket/vsock/common.rs index 613e7f3d4..20d9a0aec 100644 --- a/kernel/aster-nix/src/net/socket/vsock/common.rs +++ b/kernel/aster-nix/src/net/socket/vsock/common.rs @@ -6,7 +6,6 @@ use aster_virtio::device::socket::{ connect::{ConnectionInfo, VsockEvent, VsockEventType}, device::SocketDevice, error::SocketError, - get_device, DEVICE_NAME, }; use super::{ @@ -34,8 +33,7 @@ pub struct VsockSpace { impl VsockSpace { /// Create a new global VsockSpace - pub fn new() -> Self { - let driver = get_device(DEVICE_NAME).unwrap(); + pub fn new(driver: Arc>) -> Self { Self { driver, connecting_sockets: SpinLock::new(BTreeMap::new()), @@ -44,6 +42,7 @@ impl VsockSpace { used_ports: SpinLock::new(BTreeSet::new()), } } + /// Check whether the event is for this socket space fn is_event_for_socket(&self, event: &VsockEvent) -> bool { self.connecting_sockets @@ -58,10 +57,12 @@ impl VsockSpace { .read_irq_disabled() .contains_key(&(*event).into()) } + /// Alloc an unused port range pub fn alloc_ephemeral_port(&self) -> Result { let mut used_ports = self.used_ports.lock_irq_disabled(); - for port in 1024..=u32::MAX { + // FIXME: the maximal port number is not defined by spec + for port in 1024..u32::MAX { if !used_ports.contains(&port) { used_ports.insert(port); return Ok(port); @@ -69,15 +70,20 @@ impl VsockSpace { } return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port"); } - pub fn insert_port(&self, port: u32) -> bool { + + /// Bind a port + pub fn bind_port(&self, port: u32) -> bool { let mut used_ports = self.used_ports.lock_irq_disabled(); used_ports.insert(port) } + + /// Recycle a port pub fn recycle_port(&self, port: &u32) -> bool { let mut used_ports = self.used_ports.lock_irq_disabled(); used_ports.remove(port) } + /// Insert a connected socket pub fn insert_connected_socket( &self, id: ConnectionID, @@ -86,10 +92,14 @@ impl VsockSpace { let mut connected_sockets = self.connected_sockets.write_irq_disabled(); connected_sockets.insert(id, connected) } + + /// Remove a connected socket pub fn remove_connected_socket(&self, id: &ConnectionID) -> Option> { let mut connected_sockets = self.connected_sockets.write_irq_disabled(); connected_sockets.remove(id) } + + /// Insert a connecting socket pub fn insert_connecting_socket( &self, addr: VsockSocketAddr, @@ -98,10 +108,14 @@ impl VsockSpace { let mut connecting_sockets = self.connecting_sockets.lock_irq_disabled(); connecting_sockets.insert(addr, connecting) } + + /// Remove a connecting socket pub fn remove_connecting_socket(&self, addr: &VsockSocketAddr) -> Option> { let mut connecting_sockets = self.connecting_sockets.lock_irq_disabled(); connecting_sockets.remove(addr) } + + /// Insert a listening socket pub fn insert_listen_socket( &self, addr: VsockSocketAddr, @@ -110,6 +124,8 @@ impl VsockSpace { let mut listen_sockets = self.listen_sockets.lock_irq_disabled(); listen_sockets.insert(addr, listen) } + + /// Remove a listening socket pub fn remove_listen_socket(&self, addr: &VsockSocketAddr) -> Option> { let mut listen_sockets = self.listen_sockets.lock_irq_disabled(); listen_sockets.remove(addr) @@ -117,58 +133,66 @@ impl VsockSpace { } impl VsockSpace { + /// Get the CID of the guest pub fn guest_cid(&self) -> u32 { let driver = self.driver.lock_irq_disabled(); driver.guest_cid() as u32 } + /// Send a request packet for initializing a new connection. pub fn request(&self, info: &ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .request(info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send connect packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send connect packet")) } + /// Send a response packet for accepting a new connection. pub fn response(&self, info: &ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .response(info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send response packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send response packet")) } + /// Send a shutdown packet to close a connection pub fn shutdown(&self, info: &ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .shutdown(info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send shutdown packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send shutdown packet")) } + /// Send a reset packet to reset a connection pub fn reset(&self, info: &ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .reset(info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send reset packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send reset packet")) } + /// Send a credit request packet pub fn request_credit(&self, info: &ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .credit_request(info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send credit request packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send credit request packet")) } + /// Send a credit update packet pub fn update_credit(&self, info: &ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .credit_update(info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send credit update packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send credit update packet")) } + /// Send a data packet pub fn send(&self, buffer: &[u8], info: &mut ConnectionInfo) -> Result<()> { let mut driver = self.driver.lock_irq_disabled(); driver .send(buffer, info) - .map_err(|_| Error::with_message(Errno::EIO, "can not send data packet")) + .map_err(|_| Error::with_message(Errno::EIO, "cannot send data packet")) } /// Poll for each event from the driver @@ -233,7 +257,7 @@ impl VsockSpace { listen.push_incoming(connected).unwrap(); listen.update_io_events(); } - VsockEventType::Connected => { + VsockEventType::ConnectionResponse => { let connecting_sockets = self.connecting_sockets.lock_irq_disabled(); let Some(connecting) = connecting_sockets.get(&event.destination.into()) else { return_errno_with_message!( @@ -254,7 +278,7 @@ impl VsockSpace { let Some(connected) = connected_sockets.get(&event.into()) else { return_errno_with_message!(Errno::ENOTCONN, "the socket hasn't connected"); }; - connected.peer_requested_shutdown(); + connected.set_peer_requested_shutdown(); } VsockEventType::Received { length } => {} VsockEventType::CreditRequest => { @@ -277,9 +301,3 @@ impl VsockSpace { Ok(Some(event)) } } - -impl Default for VsockSpace { - fn default() -> Self { - Self::new() - } -} diff --git a/kernel/aster-nix/src/net/socket/vsock/mod.rs b/kernel/aster-nix/src/net/socket/vsock/mod.rs index ced39c12d..72e43bb32 100644 --- a/kernel/aster-nix/src/net/socket/vsock/mod.rs +++ b/kernel/aster-nix/src/net/socket/vsock/mod.rs @@ -16,8 +16,8 @@ pub use stream::VsockStreamSocket; pub static VSOCK_GLOBAL: Once> = Once::new(); pub fn init() { - if get_device(DEVICE_NAME).is_some() { - VSOCK_GLOBAL.call_once(|| Arc::new(VsockSpace::new())); + if let Some(driver) = get_device(DEVICE_NAME) { + VSOCK_GLOBAL.call_once(|| Arc::new(VsockSpace::new(driver))); register_recv_callback(DEVICE_NAME, || { let vsockspace = VSOCK_GLOBAL.get().unwrap(); vsockspace.poll().unwrap(); diff --git a/kernel/aster-nix/src/net/socket/vsock/stream/connected.rs b/kernel/aster-nix/src/net/socket/vsock/stream/connected.rs index c03c32b59..4d8e5152e 100644 --- a/kernel/aster-nix/src/net/socket/vsock/stream/connected.rs +++ b/kernel/aster-nix/src/net/socket/vsock/stream/connected.rs @@ -33,7 +33,7 @@ impl Connected { pub fn from_connecting(connecting: Arc) -> Self { Self { - connection: SpinLock::new(Connection::from_info(connecting.info())), + connection: SpinLock::new(Connection::new_from_info(connecting.info())), id: connecting.id(), pollee: Pollee::new(IoEvents::empty()), } @@ -58,7 +58,7 @@ impl Connected { match bytes_read { 0 => { - if !connection.peer_requested_shutdown { + if !connection.is_peer_requested_shutdown() { return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty"); } else { return_errno_with_message!(Errno::ECONNRESET, "the connection is reset"); @@ -84,17 +84,26 @@ impl Connected { let connection = self.connection.lock_irq_disabled(); // If buffer is now empty and the peer requested shutdown, finish shutting down the // connection. - connection.peer_requested_shutdown && connection.buffer.is_empty() + connection.is_peer_requested_shutdown() && connection.buffer.is_empty() + } + + pub fn is_closed(&self) -> bool { + let connection = self.connection.lock_irq_disabled(); + connection.is_local_shutdown() } pub fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { // TODO: deal with cmd if self.should_close() { - let connection = self.connection.lock_irq_disabled(); + let mut connection = self.connection.lock_irq_disabled(); + if connection.is_local_shutdown() { + return Ok(()); + } let vsockspace = VSOCK_GLOBAL.get().unwrap(); vsockspace.reset(&connection.info).unwrap(); vsockspace.remove_connected_socket(&self.id()); vsockspace.recycle_port(&self.local_addr().port); + connection.set_local_shutdown(); } Ok(()) } @@ -113,8 +122,10 @@ impl Connected { connection.add(bytes) } - pub fn peer_requested_shutdown(&self) { - self.connection.lock_irq_disabled().peer_requested_shutdown = true + pub fn set_peer_requested_shutdown(&self) { + self.connection + .lock_irq_disabled() + .set_peer_requested_shutdown() } pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { @@ -134,41 +145,69 @@ impl Connected { impl Drop for Connected { fn drop(&mut self) { + let connection = self.connection.lock_irq_disabled(); + if connection.is_local_shutdown() { + return; + } let vsockspace = VSOCK_GLOBAL.get().unwrap(); + vsockspace.reset(&connection.info).unwrap(); + vsockspace.remove_connected_socket(&self.id()); vsockspace.recycle_port(&self.local_addr().port); } } -pub struct Connection { +struct Connection { info: ConnectionInfo, buffer: HeapRb, /// The peer sent a SHUTDOWN request, but we haven't yet responded with a RST because there is /// still data in the buffer. - pub peer_requested_shutdown: bool, + peer_requested_shutdown: bool, + local_shutdown: bool, } impl Connection { - pub fn new(peer: VsockSocketAddr, local_port: u32) -> Self { + fn new(peer: VsockSocketAddr, local_port: u32) -> Self { let mut info = ConnectionInfo::new(peer.into(), local_port); info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap(); Self { info, buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY), peer_requested_shutdown: false, + local_shutdown: false, } } - pub fn from_info(mut info: ConnectionInfo) -> Self { + + fn is_peer_requested_shutdown(&self) -> bool { + self.peer_requested_shutdown + } + + fn set_peer_requested_shutdown(&mut self) { + self.peer_requested_shutdown = true + } + + fn is_local_shutdown(&self) -> bool { + self.local_shutdown + } + + fn set_local_shutdown(&mut self) { + self.local_shutdown = true + } + + fn new_from_info(mut info: ConnectionInfo) -> Self { info.buf_alloc = PER_CONNECTION_BUFFER_CAPACITY.try_into().unwrap(); Self { info, buffer: HeapRb::new(PER_CONNECTION_BUFFER_CAPACITY), peer_requested_shutdown: false, + local_shutdown: false, } } - pub fn update_for_event(&mut self, event: &VsockEvent) { + + fn update_for_event(&mut self, event: &VsockEvent) { self.info.update_for_event(event) } - pub fn add(&mut self, bytes: &[u8]) -> bool { + + fn add(&mut self, bytes: &[u8]) -> bool { if bytes.len() > self.buffer.free_len() { return false; } diff --git a/kernel/aster-nix/src/net/socket/vsock/stream/connecting.rs b/kernel/aster-nix/src/net/socket/vsock/stream/connecting.rs index ab9c76dea..cb447be62 100644 --- a/kernel/aster-nix/src/net/socket/vsock/stream/connecting.rs +++ b/kernel/aster-nix/src/net/socket/vsock/stream/connecting.rs @@ -24,6 +24,7 @@ impl Connecting { pollee: Pollee::new(IoEvents::empty()), } } + pub fn peer_addr(&self) -> VsockSocketAddr { self.id.peer_addr } @@ -35,15 +36,19 @@ impl Connecting { pub fn id(&self) -> ConnectionID { self.id } + pub fn info(&self) -> ConnectionInfo { self.info.lock_irq_disabled().clone() } + pub fn update_info(&self, event: &VsockEvent) { self.info.lock_irq_disabled().update_for_event(event) } + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { self.pollee.poll(mask, poller) } + pub fn add_events(&self, events: IoEvents) { self.pollee.add_events(events) } @@ -53,5 +58,6 @@ impl Drop for Connecting { fn drop(&mut self) { let vsockspace = VSOCK_GLOBAL.get().unwrap(); vsockspace.recycle_port(&self.local_addr().port); + vsockspace.remove_connecting_socket(&self.local_addr()); } } diff --git a/kernel/aster-nix/src/net/socket/vsock/stream/init.rs b/kernel/aster-nix/src/net/socket/vsock/stream/init.rs index ee7df3a18..ede7fb48e 100644 --- a/kernel/aster-nix/src/net/socket/vsock/stream/init.rs +++ b/kernel/aster-nix/src/net/socket/vsock/stream/init.rs @@ -44,7 +44,7 @@ impl Init { } else { return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port"); } - } else if !vsockspace.insert_port(new_addr.port) { + } else if !vsockspace.bind_port(new_addr.port) { return_errno_with_message!(Errno::EADDRNOTAVAIL, "the port in address is occupied"); } diff --git a/kernel/aster-nix/src/net/socket/vsock/stream/listen.rs b/kernel/aster-nix/src/net/socket/vsock/stream/listen.rs index 2b3a30bae..f6589e545 100644 --- a/kernel/aster-nix/src/net/socket/vsock/stream/listen.rs +++ b/kernel/aster-nix/src/net/socket/vsock/stream/listen.rs @@ -27,6 +27,7 @@ impl Listen { pub fn addr(&self) -> VsockSocketAddr { self.addr } + pub fn push_incoming(&self, connect: Arc) -> Result<()> { let mut incoming_connections = self.incoming_connection.lock_irq_disabled(); if incoming_connections.len() >= self.backlog { @@ -47,13 +48,14 @@ impl Listen { Ok(connection) } + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { self.pollee.poll(mask, poller) } pub fn update_io_events(&self) { - let can_accept = !self.incoming_connection.lock_irq_disabled().is_empty(); - if can_accept { + let incomming_connection = self.incoming_connection.lock_irq_disabled(); + if !incomming_connection.is_empty() { self.pollee.add_events(IoEvents::IN); } else { self.pollee.del_events(IoEvents::IN); @@ -63,6 +65,8 @@ impl Listen { impl Drop for Listen { fn drop(&mut self) { - VSOCK_GLOBAL.get().unwrap().recycle_port(&self.addr.port); + let vsockspace = VSOCK_GLOBAL.get().unwrap(); + vsockspace.recycle_port(&self.addr.port); + vsockspace.remove_listen_socket(&self.addr); } } diff --git a/kernel/aster-nix/src/net/socket/vsock/stream/socket.rs b/kernel/aster-nix/src/net/socket/vsock/stream/socket.rs index abbe0da19..9163c0a5e 100644 --- a/kernel/aster-nix/src/net/socket/vsock/stream/socket.rs +++ b/kernel/aster-nix/src/net/socket/vsock/stream/socket.rs @@ -35,12 +35,14 @@ impl VsockStreamSocket { is_nonblocking: AtomicBool::new(nonblocking), } } + pub(super) fn new_from_connected(connected: Arc) -> Self { Self { status: RwLock::new(Status::Connected(connected)), is_nonblocking: AtomicBool::new(false), } } + fn is_nonblocking(&self) -> bool { self.is_nonblocking.load(Ordering::Relaxed) } @@ -103,7 +105,8 @@ impl VsockStreamSocket { VSOCK_GLOBAL .get() .unwrap() - .response(&connected.get_info())?; + .response(&connected.get_info()) + .unwrap(); let socket = Arc::new(VsockStreamSocket::new_from_connected(connected)); Ok((socket, peer_addr.into())) @@ -123,7 +126,6 @@ impl VsockStreamSocket { let peer_addr = self.peer_addr()?; // If buffer is now empty and the peer requested shutdown, finish shutting down the // connection. - // TODO: properly place the close request if connected.should_close() { if let Err(e) = self.shutdown(SockShutdownCmd::SHUT_RDWR) { debug!("The error is {:?}", e); @@ -189,7 +191,7 @@ impl Socket for VsockStreamSocket { } // Since blocking mode is supported, there is no need to store the connecting status. - // TODO: Refactor when blocking mode is supported. + // TODO: Refactor when nonblocking mode is supported. fn connect(&self, sockaddr: SocketAddr) -> Result<()> { let init = match &*self.status.read() { Status::Init(init) => init.clone(), @@ -223,7 +225,12 @@ impl Socket for VsockStreamSocket { .poll(IoEvents::IN, Some(&poller)) .contains(IoEvents::IN) { - poller.wait()?; + if let Err(e) = poller.wait() { + vsockspace + .remove_connecting_socket(&connecting.local_addr()) + .unwrap(); + return Err(e); + } } vsockspace diff --git a/kernel/comps/virtio/src/device/socket/connect.rs b/kernel/comps/virtio/src/device/socket/connect.rs index 6fa581bd1..62d90c3d3 100644 --- a/kernel/comps/virtio/src/device/socket/connect.rs +++ b/kernel/comps/virtio/src/device/socket/connect.rs @@ -53,7 +53,7 @@ pub enum VsockEventType { /// The peer requests to establish a connection with us. ConnectionRequest, /// The connection was successfully established. - Connected, + ConnectionResponse, /// The connection was closed. Disconnected { /// The reason for the disconnection. @@ -107,7 +107,7 @@ impl VsockEvent { } VirtioVsockOp::Response => { header.check_data_is_empty()?; - VsockEventType::Connected + VsockEventType::ConnectionResponse } VirtioVsockOp::CreditUpdate => { header.check_data_is_empty()?; diff --git a/kernel/comps/virtio/src/device/socket/device.rs b/kernel/comps/virtio/src/device/socket/device.rs index 36b8f64e9..acb028395 100644 --- a/kernel/comps/virtio/src/device/socket/device.rs +++ b/kernel/comps/virtio/src/device/socket/device.rs @@ -9,7 +9,7 @@ use log::debug; use pod::Pod; use super::{ - buffer::{RxBuffer, RX_BUFFER_LEN}, + buffer::RxBuffer, config::{VirtioVsockConfig, VsockFeatures}, connect::{ConnectionInfo, VsockEvent}, error::SocketError, @@ -99,6 +99,7 @@ impl SocketDevice { fn handle_vsock_event(_: &TrapFrame) { handle_recv_irq(super::DEVICE_NAME); } + // FIXME: handle event virtqueue notification in live migration device .transport @@ -184,6 +185,7 @@ impl SocketDevice { header: &VirtioVsockHdr, buffer: &[u8], ) -> Result<(), SocketError> { + debug!("Sent packet {:?}. Op {:?}", header, header.op()); debug!("buffer in send_packet_to_tx_queue: {:?}", buffer); let tx_buffer = TxBuffer::new(header, buffer); @@ -254,9 +256,8 @@ impl SocketDevice { /// Receive bytes from peer, returns the header pub fn receive( &mut self, - buffer: &mut [u8], // connection_info: &mut ConnectionInfo, - ) -> Result { + ) -> Result { let (token, len) = self.recv_queue.pop_used()?; debug!( "receive packet in rx_queue: token = {}, len = {}", @@ -268,21 +269,10 @@ impl SocketDevice { .ok_or(QueueError::WrongToken)?; rx_buffer.set_packet_len(len as usize); - let mut buf_reader = rx_buffer.buf(); - let mut temp_buffer = vec![0u8; buf_reader.remain()]; - buf_reader.read(&mut VmWriter::from(&mut temp_buffer as &mut [u8])); + let new_rx_buffer = RxBuffer::new(size_of::()); + self.add_rx_buffer(new_rx_buffer, token)?; - let (header, payload) = read_header_and_body(&temp_buffer)?; - // The length written should be equal to len(header)+len(packet) - assert_eq!(len, header.len() + VIRTIO_VSOCK_HDR_LEN as u32); - debug!("Received packet {:?}. Op {:?}", header, header.op()); - debug!("body is {:?}", payload); - - assert!(buffer.len() >= payload.len()); - buffer[..payload.len()].copy_from_slice(payload); - - self.add_rx_buffer(rx_buffer, token)?; - Ok(header) + Ok(rx_buffer) } /// Polls the RX virtqueue for the next event, and calls the given handler function to handle it. @@ -294,10 +284,17 @@ impl SocketDevice { if !self.recv_queue.can_pop() { return Ok(None); } - let mut body = vec![0u8; RX_BUFFER_LEN]; - let header = self.receive(&mut body)?; + let rx_buffer = self.receive()?; - VsockEvent::from_header(&header).and_then(|event| handler(event, &body)) + let mut buf_reader = rx_buffer.buf(); + let mut temp_buffer = vec![0u8; buf_reader.remain()]; + buf_reader.read(&mut VmWriter::from(&mut temp_buffer as &mut [u8])); + + let (header, payload) = read_header_and_body(&temp_buffer)?; + // The length written should be equal to len(header)+len(packet) + debug!("Received packet {:?}. Op {:?}", header, header.op()); + debug!("body is {:?}", payload); + VsockEvent::from_header(&header).and_then(|event| handler(event, payload)) } /// Add a used rx buffer to recv queue,@index is only to check the correctness diff --git a/kernel/comps/virtio/src/device/socket/mod.rs b/kernel/comps/virtio/src/device/socket/mod.rs index 0350195ca..46bb0750d 100644 --- a/kernel/comps/virtio/src/device/socket/mod.rs +++ b/kernel/comps/virtio/src/device/socket/mod.rs @@ -21,18 +21,18 @@ pub fn register_device(name: String, device: Arc>) { VSOCK_DEVICE_TABLE .get() .unwrap() - .lock() + .lock_irq_disabled() .insert(name, (Arc::new(SpinLock::new(Vec::new())), device)); } pub fn get_device(str: &str) -> Option>> { - let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock(); + let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled(); let (_, device) = lock.get(str)?; Some(device.clone()) } pub fn all_devices() -> Vec<(String, Arc>)> { - let vsock_devs = VSOCK_DEVICE_TABLE.get().unwrap().lock(); + let vsock_devs = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled(); vsock_devs .iter() .map(|(name, (_, device))| (name.clone(), device.clone())) @@ -40,20 +40,19 @@ pub fn all_devices() -> Vec<(String, Arc>)> { } pub fn register_recv_callback(name: &str, callback: impl VsockDeviceIrqHandler) { - let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock(); + let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled(); let Some((callbacks, _)) = lock.get(name) else { return; }; - callbacks.lock().push(Arc::new(callback)); + callbacks.lock_irq_disabled().push(Arc::new(callback)); } pub fn handle_recv_irq(name: &str) { - let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock(); + let lock = VSOCK_DEVICE_TABLE.get().unwrap().lock_irq_disabled(); let Some((callbacks, _)) = lock.get(name) else { return; }; - let callbacks = callbacks.clone(); - let lock = callbacks.lock(); + let lock = callbacks.lock_irq_disabled(); for callback in lock.iter() { callback.call(()) } diff --git a/regression/apps/scripts/run_vsock_test.sh b/regression/apps/scripts/run_vsock_test.sh index 87cb0321a..a07f3dc41 100644 --- a/regression/apps/scripts/run_vsock_test.sh +++ b/regression/apps/scripts/run_vsock_test.sh @@ -9,5 +9,5 @@ cd ${VSOCK_DIR} echo "Start vsock test......" ./vsock_client -# ./vsock_server +./vsock_server echo "Vsock test passed." diff --git a/regression/apps/scripts/vsock_commands.sh b/regression/apps/scripts/vsock_commands.sh new file mode 100644 index 000000000..f27b3d04e --- /dev/null +++ b/regression/apps/scripts/vsock_commands.sh @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: MPL-2.0 + +echo 'Line 1: Hello from host' +echo 'Line 2: Hello from host, again' diff --git a/regression/apps/vsock/vsock_client.c b/regression/apps/vsock/vsock_client.c index 0bcbaf02e..d24f0b94d 100644 --- a/regression/apps/vsock/vsock_client.c +++ b/regression/apps/vsock/vsock_client.c @@ -12,36 +12,53 @@ int main() { int sock; - char *hello = "echo 'Hello from host'\n"; char buffer[1024] = { 0 }; + FILE *commandFile; struct sockaddr_vm serv_addr; + commandFile = fopen("../vsock_commands.sh", "r"); + if (commandFile == NULL) { + perror("Failed to open the command file"); + return -1; + } + if ((sock = socket(AF_VSOCK, SOCK_STREAM, 0)) < 0) { - printf("\n Socket creation error\n"); + perror("\n Socket creation error"); + fclose(commandFile); return -1; } printf("\n Create socket successfully!\n"); + serv_addr.svm_family = AF_VSOCK; serv_addr.svm_cid = VMADDR_CID_HOST; serv_addr.svm_port = PORT; if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { - printf("\nConnection Failed \n"); + perror("\nConnection Failed"); + close(sock); + fclose(commandFile); return -1; } - printf("\n Socket connect successfully!\n"); + printf("\n Socket connected successfully!\n"); - // Send message to the server and receive the reply - if (send(sock, hello, strlen(hello), 0) < 0) { - printf("\nSend Failed\n"); - return -1; + char command[1024]; + while (fgets(command, sizeof(command), commandFile) != NULL) { + if (send(sock, command, strlen(command), 0) < 0) { + perror("\nSend Failed"); + break; + } + printf("Command sent: %s", command); + memset(buffer, 0, sizeof(buffer)); + if (read(sock, buffer, sizeof(buffer)) < 0) { + perror("\nRead Failed"); + break; + } + printf("Server: %s\n", buffer); } - printf("Hello message sent\n"); - if (read(sock, buffer, 1024) < 0) { - printf("\nRead Failed\n"); - return -1; - } - printf("Server: %s\n", buffer); + + close(sock); + fclose(commandFile); + return 0; -} \ No newline at end of file +} diff --git a/regression/benchmark/README.md b/regression/benchmark/README.md index f033e8696..27e6c4cb7 100644 --- a/regression/benchmark/README.md +++ b/regression/benchmark/README.md @@ -55,4 +55,4 @@ export HOST_PORT=8888 iperf3 -s -B $HOST_ADDR -p $HOST_PORT -D # Start the server as a daemon iperf3 -c $HOST_ADDR -p $HOST_PORT # Start the client ``` -Note that [a variant of iperf3](https://github.com/stefano-garzarella/iperf-vsock) can measure the performance of `vsock`. \ No newline at end of file +Note that [a variant of iperf3](https://github.com/stefano-garzarella/iperf-vsock) can measure the performance of `vsock`. But the implemented `vsock` has not been verified to work well in it. \ No newline at end of file