Set keepalive and tcp_nodelay on underlying sockets

This commit is contained in:
jiangjianfeng
2024-12-05 08:40:20 +00:00
committed by Tate, Hongliang Tian
parent 8b07a68e9e
commit 58cf8ea681
13 changed files with 293 additions and 35 deletions

View File

@ -2,7 +2,10 @@
use core::sync::atomic::{AtomicBool, Ordering};
use aster_bigtcp::wire::IpEndpoint;
use aster_bigtcp::{
socket::{NeedIfacePoll, RawTcpSetOption},
wire::IpEndpoint,
};
use connected::ConnectedStream;
use connecting::{ConnResult, ConnectingStream};
use init::InitStream;
@ -20,13 +23,19 @@ use crate::{
utils::{InodeMode, Metadata, StatusFlags},
},
match_sock_option_mut, match_sock_option_ref,
net::socket::{
options::{Error as SocketError, SocketOption},
util::{
options::SocketOptionSet, send_recv_flags::SendRecvFlags,
shutdown_cmd::SockShutdownCmd, socket_addr::SocketAddr, MessageHeader,
net::{
iface::Iface,
socket::{
options::{Error as SocketError, SocketOption},
util::{
options::{SetSocketLevelOption, SocketOptionSet},
send_recv_flags::SendRecvFlags,
shutdown_cmd::SockShutdownCmd,
socket_addr::SocketAddr,
MessageHeader,
},
Socket,
},
Socket,
},
prelude::*,
process::signal::{PollHandle, Pollable, Pollee},
@ -87,11 +96,28 @@ impl StreamSocket {
})
}
fn new_connected(connected_stream: ConnectedStream) -> Arc<Self> {
fn new_accepted(connected_stream: ConnectedStream) -> Arc<Self> {
let options = connected_stream.raw_with(|raw_tcp_socket| {
let mut options = OptionSet::new();
if raw_tcp_socket.keep_alive().is_some() {
options.socket.set_keep_alive(true);
}
if !raw_tcp_socket.nagle_enabled() {
options.tcp.set_no_delay(true);
}
// TODO: Update other options for a newly-accepted socket
options
});
let pollee = Pollee::new();
connected_stream.set_observer(StreamObserver::new(pollee.clone()));
Arc::new(Self {
options: RwLock::new(OptionSet::new()),
options: RwLock::new(options),
state: RwLock::new(Takeable::new(State::Connected(connected_stream))),
is_nonblocking: AtomicBool::new(false),
pollee,
@ -276,7 +302,7 @@ impl StreamSocket {
.try_accept(&self.pollee)
.map(|connected_stream| {
let remote_endpoint = connected_stream.remote_endpoint();
let accepted_socket = Self::new_connected(connected_stream);
let accepted_socket = Self::new_accepted(connected_stream);
(accepted_socket as _, remote_endpoint.into())
});
let iface_to_poll = listen_stream.iface().clone();
@ -650,11 +676,23 @@ impl Socket for StreamSocket {
}
fn set_option(&self, option: &dyn SocketOption) -> Result<()> {
let mut options = self.options.write();
let (mut options, mut state) = self.update_connecting();
match options.socket.set_option(option) {
match options.socket.set_option(option, state.as_mut()) {
Err(err) if err.error() == Errno::ENOPROTOOPT => (),
res => return res,
Err(err) => return Err(err),
Ok(need_iface_poll) => {
let iface_to_poll = need_iface_poll.then(|| state.iface().cloned()).flatten();
drop(state);
drop(options);
if let Some(iface) = iface_to_poll {
iface.poll();
}
return Ok(());
}
}
// FIXME: Here we have only set the value of the option, without actually
@ -663,6 +701,7 @@ impl Socket for StreamSocket {
tcp_no_delay: NoDelay => {
let no_delay = tcp_no_delay.get().unwrap();
options.tcp.set_no_delay(*no_delay);
state.set_raw_option(|raw_socket: &mut dyn RawTcpSetOption| raw_socket.set_nagle_enabled(!no_delay));
},
tcp_congestion: Congestion => {
let congestion = tcp_congestion.get().unwrap();
@ -694,18 +733,62 @@ impl Socket for StreamSocket {
}
}
impl State {
/// Calls `f` to set raw socket option.
///
/// Note that for listening socket, `f` is called on all backlog sockets in `Listen` State.
/// That is to say, `f` won't be called on backlog sockets in `SynReceived` or `Established` state.
fn set_raw_option<R>(&mut self, set_option: impl Fn(&mut dyn RawTcpSetOption) -> R) -> R {
match self {
State::Init(init_stream) => init_stream.set_raw_option(set_option),
State::Connecting(connecting_stream) => connecting_stream.set_raw_option(set_option),
State::Connected(connected_stream) => connected_stream.set_raw_option(set_option),
State::Listen(listen_stream) => listen_stream.set_raw_option(set_option),
}
}
fn iface(&self) -> Option<&Arc<Iface>> {
match self {
State::Init(_) => None,
State::Connecting(ref connecting_stream) => Some(connecting_stream.iface()),
State::Connected(ref connected_stream) => Some(connected_stream.iface()),
State::Listen(ref listen_stream) => Some(listen_stream.iface()),
}
}
}
impl SetSocketLevelOption for State {
fn set_keep_alive(&mut self, keep_alive: bool) -> NeedIfacePoll {
/// The keepalive interval.
///
/// The linux value can be found at `/proc/sys/net/ipv4/tcp_keepalive_intvl`,
/// which is by default 75 seconds for most Linux distributions.
const KEEPALIVE_INTERVAL: aster_bigtcp::time::Duration =
aster_bigtcp::time::Duration::from_secs(75);
let interval = if keep_alive {
Some(KEEPALIVE_INTERVAL)
} else {
None
};
let set_keepalive =
|raw_socket: &mut dyn RawTcpSetOption| raw_socket.set_keep_alive(interval);
self.set_raw_option(set_keepalive)
}
}
impl Drop for StreamSocket {
fn drop(&mut self) {
let state = self.state.get_mut().take();
let iface_to_poll = match state {
State::Init(_) => None,
State::Connecting(ref connecting_stream) => Some(connecting_stream.iface().clone()),
State::Connected(ref connected_stream) => Some(connected_stream.iface().clone()),
State::Listen(ref listen_stream) => Some(listen_stream.iface().clone()),
};
let iface_to_poll = state.iface().cloned();
// Dropping the state will drop the sockets. This will trigger the socket close process (if
// needed) and require immediate iface polling afterwards.
drop(state);
if let Some(iface) = iface_to_poll {
iface.poll();
}