Use IoVec-based reader/writer to refactor network APIs

This commit is contained in:
Jianfeng Jiang
2024-09-11 02:36:24 +00:00
committed by Tate, Hongliang Tian
parent ea8327af0f
commit 985813c7f9
24 changed files with 484 additions and 453 deletions

View File

@ -21,7 +21,6 @@ use crate::{
socket::{
options::{Error as SocketError, SocketOption},
util::{
copy_message_from_user, copy_message_to_user, create_message_buffer,
options::SocketOptionSet, send_recv_flags::SendRecvFlags,
shutdown_cmd::SockShutdownCmd, socket_addr::SocketAddr, MessageHeader,
},
@ -30,7 +29,7 @@ use crate::{
},
prelude::*,
process::signal::{Pollable, Pollee, Poller},
util::IoVec,
util::{MultiRead, MultiWrite},
};
mod connected;
@ -245,7 +244,11 @@ impl StreamSocket {
accepted
}
fn try_recv(&self, buf: &mut [u8], flags: SendRecvFlags) -> Result<(usize, SocketAddr)> {
fn try_recv(
&self,
writer: &mut dyn MultiWrite,
flags: SendRecvFlags,
) -> Result<(usize, SocketAddr)> {
let state = self.state.read();
let connected_stream = match state.as_ref() {
@ -258,7 +261,7 @@ impl StreamSocket {
}
};
let received = connected_stream.try_recv(buf, flags).map(|recv_bytes| {
let received = connected_stream.try_recv(writer, flags).map(|recv_bytes| {
connected_stream.update_io_events(&self.pollee);
let remote_endpoint = connected_stream.remote_endpoint();
@ -271,15 +274,19 @@ impl StreamSocket {
received
}
fn recv(&self, buf: &mut [u8], flags: SendRecvFlags) -> Result<(usize, SocketAddr)> {
fn recv(
&self,
writer: &mut dyn MultiWrite,
flags: SendRecvFlags,
) -> Result<(usize, SocketAddr)> {
if self.is_nonblocking() {
self.try_recv(buf, flags)
self.try_recv(writer, flags)
} else {
self.wait_events(IoEvents::IN, || self.try_recv(buf, flags))
self.wait_events(IoEvents::IN, || self.try_recv(writer, flags))
}
}
fn try_send(&self, buf: &[u8], flags: SendRecvFlags) -> Result<usize> {
fn try_send(&self, reader: &mut dyn MultiRead, flags: SendRecvFlags) -> Result<usize> {
let state = self.state.read();
let connected_stream = match state.as_ref() {
@ -295,7 +302,7 @@ impl StreamSocket {
}
};
let sent_bytes = connected_stream.try_send(buf, flags).map(|sent_bytes| {
let sent_bytes = connected_stream.try_send(reader, flags).map(|sent_bytes| {
connected_stream.update_io_events(&self.pollee);
sent_bytes
});
@ -306,11 +313,11 @@ impl StreamSocket {
sent_bytes
}
fn send(&self, buf: &[u8], flags: SendRecvFlags) -> Result<usize> {
fn send(&self, reader: &mut dyn MultiRead, flags: SendRecvFlags) -> Result<usize> {
if self.is_nonblocking() {
self.try_send(buf, flags)
self.try_send(reader, flags)
} else {
self.wait_events(IoEvents::OUT, || self.try_send(buf, flags))
self.wait_events(IoEvents::OUT, || self.try_send(reader, flags))
}
}
@ -340,19 +347,15 @@ impl Pollable for StreamSocket {
impl FileLike for StreamSocket {
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
let mut buf = vec![0u8; writer.avail()];
// TODO: Set correct flags
let flags = SendRecvFlags::empty();
let read_len = self.recv(&mut buf, flags).map(|(len, _)| len)?;
writer.write_fallible(&mut buf.as_slice().into())?;
Ok(read_len)
self.recv(writer, flags).map(|(len, _)| len)
}
fn write(&self, reader: &mut VmReader) -> Result<usize> {
let buf = reader.collect()?;
// TODO: Set correct flags
let flags = SendRecvFlags::empty();
self.send(&buf, flags)
self.send(reader, flags)
}
fn status_flags(&self) -> StatusFlags {
@ -509,7 +512,7 @@ impl Socket for StreamSocket {
fn sendmsg(
&self,
io_vecs: &[IoVec],
reader: &mut dyn MultiRead,
message_header: MessageHeader,
flags: SendRecvFlags,
) -> Result<usize> {
@ -529,23 +532,18 @@ impl Socket for StreamSocket {
warn!("sending control message is not supported");
}
let buf = copy_message_from_user(io_vecs);
self.send(&buf, flags)
self.send(reader, flags)
}
fn recvmsg(&self, io_vecs: &[IoVec], flags: SendRecvFlags) -> Result<(usize, MessageHeader)> {
fn recvmsg(
&self,
writer: &mut dyn MultiWrite,
flags: SendRecvFlags,
) -> Result<(usize, MessageHeader)> {
// TODO: Deal with flags
debug_assert!(flags.is_all_supported());
let mut buf = create_message_buffer(io_vecs);
let (received_bytes, _) = self.recv(&mut buf, flags)?;
let copied_bytes = {
let message = &buf[..received_bytes];
copy_message_to_user(io_vecs, message)
};
let (received_bytes, _) = self.recv(writer, flags)?;
// TODO: Receive control message
@ -553,7 +551,7 @@ impl Socket for StreamSocket {
// peer address is ignored for connected socket.
let message_header = MessageHeader::new(None, None);
Ok((copied_bytes, message_header))
Ok((received_bytes, message_header))
}
fn get_option(&self, option: &mut dyn SocketOption) -> Result<()> {