mirror of
https://github.com/asterinas/asterinas.git
synced 2025-06-23 01:13:23 +00:00
Send RST packets when appropriate
This commit is contained in:
committed by
Tate, Hongliang Tian
parent
eef56c770b
commit
aa29640ed7
@ -23,7 +23,7 @@ use super::{
|
|||||||
use crate::{
|
use crate::{
|
||||||
errors::BindError,
|
errors::BindError,
|
||||||
ext::Ext,
|
ext::Ext,
|
||||||
socket::{TcpConnectionBg, TcpListenerBg, UdpSocketBg},
|
socket::{TcpListenerBg, UdpSocketBg},
|
||||||
socket_table::SocketTable,
|
socket_table::SocketTable,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -156,11 +156,6 @@ impl<E: Ext> IfaceCommon<E> {
|
|||||||
debug_assert!(removed.is_some());
|
debug_assert!(removed.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn remove_dead_tcp_connection(&self, socket: &Arc<TcpConnectionBg<E>>) {
|
|
||||||
let mut sockets = self.sockets.lock();
|
|
||||||
sockets.remove_dead_tcp_connection(socket.connection_key());
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn remove_udp_socket(&self, socket: &Arc<UdpSocketBg<E>>) {
|
pub(crate) fn remove_udp_socket(&self, socket: &Arc<UdpSocketBg<E>>) {
|
||||||
let mut sockets = self.sockets.lock();
|
let mut sockets = self.sockets.lock();
|
||||||
let removed = sockets.remove_udp_socket(socket);
|
let removed = sockets.remove_udp_socket(socket);
|
||||||
|
@ -40,6 +40,8 @@ pub struct RawTcpSocketExt<E: Ext> {
|
|||||||
socket: Box<RawTcpSocket>,
|
socket: Box<RawTcpSocket>,
|
||||||
pub(super) listener: Option<Arc<TcpListenerBg<E>>>,
|
pub(super) listener: Option<Arc<TcpListenerBg<E>>>,
|
||||||
has_connected: bool,
|
has_connected: bool,
|
||||||
|
/// Indicates if the receiving side of this socket is shut down by the user.
|
||||||
|
is_recv_shut: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: Ext> Deref for RawTcpSocketExt<E> {
|
impl<E: Ext> Deref for RawTcpSocketExt<E> {
|
||||||
@ -73,6 +75,23 @@ impl<E: Ext> RawTcpSocketExt<E> {
|
|||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks if the socket is closing.
|
||||||
|
///
|
||||||
|
/// More specifically, we say a socket is closing if and only if it has sent its FIN packet but
|
||||||
|
/// is still waiting for an ACK packet from the peer to acknowledge the FIN it sent.
|
||||||
|
pub fn is_closing(&self) -> bool {
|
||||||
|
let state = self.state();
|
||||||
|
matches!(state, State::FinWait1 | State::Closing | State::LastAck)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns whether the receiving half of the socket is shut down.
|
||||||
|
///
|
||||||
|
/// This method will return true if and only if [`TcpConnection::shut_recv`] or
|
||||||
|
/// [`TcpConnection::close`] is called.
|
||||||
|
pub fn is_recv_shut(&self) -> bool {
|
||||||
|
self.is_recv_shut
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
define_boolean_value!(
|
define_boolean_value!(
|
||||||
@ -81,10 +100,40 @@ define_boolean_value!(
|
|||||||
);
|
);
|
||||||
|
|
||||||
impl<E: Ext> RawTcpSocketExt<E> {
|
impl<E: Ext> RawTcpSocketExt<E> {
|
||||||
fn on_new_state(
|
/// Checks the TCP state for additional events and whether the connection is dead.
|
||||||
|
fn check_state(
|
||||||
&mut self,
|
&mut self,
|
||||||
this: &Arc<TcpConnectionBg<E>>,
|
this: &Arc<TcpConnectionBg<E>>,
|
||||||
|
old_state: State,
|
||||||
|
old_recv_queue: usize,
|
||||||
) -> (SocketEvents, TcpConnBecameDead) {
|
) -> (SocketEvents, TcpConnBecameDead) {
|
||||||
|
let became_dead = if self.state() != State::Established {
|
||||||
|
// After the connection is closed by the user, no new data can be read, and such unread
|
||||||
|
// data will immediately cause the connection to be reset.
|
||||||
|
// Note that "closed" here means that either (1) `close()` or (2) both `shut_send()`
|
||||||
|
// and `shut_recv()` are called. In the latter case, there may be some buffered data.
|
||||||
|
if self.is_recv_shut
|
||||||
|
// These are states where the sending half is closed but new data can come in.
|
||||||
|
&& matches!(old_state, State::FinWait1 | State::FinWait2)
|
||||||
|
&& self.recv_queue() > old_recv_queue
|
||||||
|
{
|
||||||
|
self.abort();
|
||||||
|
}
|
||||||
|
self.check_dead(this)
|
||||||
|
} else {
|
||||||
|
TcpConnBecameDead::FALSE
|
||||||
|
};
|
||||||
|
|
||||||
|
let events = if self.state() != old_state {
|
||||||
|
self.on_new_state(this)
|
||||||
|
} else {
|
||||||
|
SocketEvents::empty()
|
||||||
|
};
|
||||||
|
|
||||||
|
(events, became_dead)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_new_state(&mut self, this: &Arc<TcpConnectionBg<E>>) -> SocketEvents {
|
||||||
let may_send = self.may_send();
|
let may_send = self.may_send();
|
||||||
|
|
||||||
if may_send && !self.has_connected {
|
if may_send && !self.has_connected {
|
||||||
@ -99,8 +148,6 @@ impl<E: Ext> RawTcpSocketExt<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let became_dead = self.check_dead(this);
|
|
||||||
|
|
||||||
let mut events = SocketEvents::empty();
|
let mut events = SocketEvents::empty();
|
||||||
if !self.may_recv_new() {
|
if !self.may_recv_new() {
|
||||||
events |= SocketEvents::CLOSED_RECV;
|
events |= SocketEvents::CLOSED_RECV;
|
||||||
@ -109,7 +156,7 @@ impl<E: Ext> RawTcpSocketExt<E> {
|
|||||||
events |= SocketEvents::CLOSED_SEND;
|
events |= SocketEvents::CLOSED_SEND;
|
||||||
}
|
}
|
||||||
|
|
||||||
(events, became_dead)
|
events
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks whether the TCP connection becomes dead.
|
/// Checks whether the TCP connection becomes dead.
|
||||||
@ -123,15 +170,20 @@ impl<E: Ext> RawTcpSocketExt<E> {
|
|||||||
/// dead if it is not closed.
|
/// dead if it is not closed.
|
||||||
fn check_dead(&self, this: &Arc<TcpConnectionBg<E>>) -> TcpConnBecameDead {
|
fn check_dead(&self, this: &Arc<TcpConnectionBg<E>>) -> TcpConnBecameDead {
|
||||||
// FIXME: This is a temporary workaround to mark TimeWait socket as dead.
|
// FIXME: This is a temporary workaround to mark TimeWait socket as dead.
|
||||||
if self.state() == smoltcp::socket::tcp::State::Closed
|
if self.state() == State::TimeWait {
|
||||||
|| self.state() == smoltcp::socket::tcp::State::TimeWait
|
return TcpConnBecameDead::TRUE;
|
||||||
{
|
}
|
||||||
|
|
||||||
|
// According to the current smoltcp implementation, a socket in the CLOSED state with the
|
||||||
|
// remote endpoint set means that an outgoing RST packet is pending. We cannot simply mark
|
||||||
|
// such a socket as dead.
|
||||||
|
if self.state() == State::Closed && self.remote_endpoint().is_none() {
|
||||||
return TcpConnBecameDead::TRUE;
|
return TcpConnBecameDead::TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// According to the current smoltcp implementation, a backlog socket will return back to
|
// According to the current smoltcp implementation, a backlog socket will return back to
|
||||||
// the `Listen` state if the connection is RSTed before its establishment.
|
// the `Listen` state if the connection is RSTed before its establishment.
|
||||||
if self.state() == smoltcp::socket::tcp::State::Listen {
|
if self.state() == State::Listen {
|
||||||
if let Some(ref listener) = self.listener {
|
if let Some(ref listener) = self.listener {
|
||||||
let mut backlog = listener.inner.backlog.lock();
|
let mut backlog = listener.inner.backlog.lock();
|
||||||
// This may fail due to race conditions, but it's fine.
|
// This may fail due to race conditions, but it's fine.
|
||||||
@ -158,6 +210,7 @@ impl<E: Ext> TcpConnectionInner<E> {
|
|||||||
socket,
|
socket,
|
||||||
listener,
|
listener,
|
||||||
has_connected: false,
|
has_connected: false,
|
||||||
|
is_recv_shut: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
TcpConnectionInner {
|
TcpConnectionInner {
|
||||||
@ -175,25 +228,26 @@ impl<E: Ext> Inner<E> for TcpConnectionInner<E> {
|
|||||||
type Observer = E::TcpEventObserver;
|
type Observer = E::TcpEventObserver;
|
||||||
|
|
||||||
fn on_drop(this: &Arc<SocketBg<Self, E>>) {
|
fn on_drop(this: &Arc<SocketBg<Self, E>>) {
|
||||||
let became_dead = {
|
debug_assert!(
|
||||||
let mut socket = this.inner.lock();
|
{
|
||||||
|
let socket = this.inner.lock();
|
||||||
// FIXME: Send RSTs when there is unread data.
|
if socket.state() == State::Closed {
|
||||||
socket.close();
|
// (1) The socket is fully closed.
|
||||||
|
|
||||||
if *socket.check_dead(this) {
|
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
// A TCP connection may not be appropriate for immediate removal. We leave the removal
|
// (2) The receiving half is closed and the sending half is closing.
|
||||||
// decision to the polling logic.
|
socket.is_recv_shut
|
||||||
this.update_next_poll_at_ms(PollAt::Now);
|
&& !matches!(
|
||||||
false
|
socket.state(),
|
||||||
}
|
State::SynSent
|
||||||
};
|
| State::SynReceived
|
||||||
|
| State::Established
|
||||||
if became_dead {
|
| State::CloseWait,
|
||||||
this.bound.iface().common().remove_dead_tcp_connection(this);
|
)
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"a connection must be either closed or reset before dropping"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -319,6 +373,10 @@ impl<E: Ext> TcpConnection<E> {
|
|||||||
|
|
||||||
let mut socket = self.0.inner.lock();
|
let mut socket = self.0.inner.lock();
|
||||||
|
|
||||||
|
if socket.is_recv_shut && socket.recv_queue() == 0 {
|
||||||
|
return Err(smoltcp::socket::tcp::RecvError::Finished);
|
||||||
|
}
|
||||||
|
|
||||||
let result = socket.recv(f)?;
|
let result = socket.recv(f)?;
|
||||||
let need_poll = self
|
let need_poll = self
|
||||||
.0
|
.0
|
||||||
@ -327,17 +385,15 @@ impl<E: Ext> TcpConnection<E> {
|
|||||||
Ok((result, need_poll))
|
Ok((result, need_poll))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Closes the connection.
|
/// Shuts down the sending half of the connection.
|
||||||
///
|
///
|
||||||
/// This method returns `false` if the socket is closed _before_ calling this method.
|
/// This method will return `false` if the socket is in the CLOSED or TIME_WAIT state.
|
||||||
///
|
///
|
||||||
/// Polling the iface is _always_ required after this method succeeds.
|
/// Polling the iface is _always_ required after this method succeeds.
|
||||||
pub fn close(&self) -> bool {
|
pub fn shut_send(&self) -> bool {
|
||||||
let mut socket = self.0.inner.lock();
|
let mut socket = self.0.inner.lock();
|
||||||
|
|
||||||
socket.listener = None;
|
if matches!(socket.state(), State::Closed | State::TimeWait) {
|
||||||
|
|
||||||
if socket.state() == State::Closed {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -347,6 +403,56 @@ impl<E: Ext> TcpConnection<E> {
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Shuts down the receiving half of the connection.
|
||||||
|
///
|
||||||
|
/// This method will return `false` if the socket is in the CLOSED or TIME_WAIT state.
|
||||||
|
///
|
||||||
|
/// Polling the iface is _not_ required after this method succeeds.
|
||||||
|
pub fn shut_recv(&self) -> bool {
|
||||||
|
let mut socket = self.0.inner.lock();
|
||||||
|
|
||||||
|
if matches!(socket.state(), State::Closed | State::TimeWait) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.is_recv_shut = true;
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Closes the connection.
|
||||||
|
///
|
||||||
|
/// Polling the iface is _always_ required after this method succeeds.
|
||||||
|
///
|
||||||
|
/// Note that either this method or [`Self::reset`] must be called before dropping the TCP
|
||||||
|
/// connection to avoid resource leakage.
|
||||||
|
pub fn close(&self) {
|
||||||
|
let mut socket = self.0.inner.lock();
|
||||||
|
|
||||||
|
socket.is_recv_shut = true;
|
||||||
|
|
||||||
|
if socket.recv_queue() != 0 {
|
||||||
|
// If there is unread data, reset the connection immediately.
|
||||||
|
socket.abort();
|
||||||
|
} else {
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
self.0.update_next_poll_at_ms(PollAt::Now);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resets the connection.
|
||||||
|
///
|
||||||
|
/// Polling the iface is _always_ required after this method succeeds.
|
||||||
|
///
|
||||||
|
/// Note that either this method or [`Self::close`] must be called before dropping the TCP
|
||||||
|
/// connection to avoid resource leakage.
|
||||||
|
pub fn reset(&self) {
|
||||||
|
let mut socket = self.0.inner.lock();
|
||||||
|
|
||||||
|
socket.abort();
|
||||||
|
self.0.update_next_poll_at_ms(PollAt::Now);
|
||||||
|
}
|
||||||
|
|
||||||
/// Calls `f` with an immutable reference to the associated [`RawTcpSocket`].
|
/// Calls `f` with an immutable reference to the associated [`RawTcpSocket`].
|
||||||
//
|
//
|
||||||
// NOTE: If a mutable reference is required, add a method above that correctly updates the next
|
// NOTE: If a mutable reference is required, add a method above that correctly updates the next
|
||||||
@ -427,6 +533,7 @@ impl<E: Ext> TcpConnectionBg<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let old_state = socket.state();
|
let old_state = socket.state();
|
||||||
|
let old_recv_queue = socket.recv_queue();
|
||||||
// For TCP, receiving an ACK packet can free up space in the queue, allowing more packets
|
// For TCP, receiving an ACK packet can free up space in the queue, allowing more packets
|
||||||
// to be queued.
|
// to be queued.
|
||||||
let mut events = SocketEvents::CAN_RECV | SocketEvents::CAN_SEND;
|
let mut events = SocketEvents::CAN_RECV | SocketEvents::CAN_SEND;
|
||||||
@ -436,13 +543,8 @@ impl<E: Ext> TcpConnectionBg<E> {
|
|||||||
Some((ip_repr, tcp_repr)) => TcpProcessResult::ProcessedWithReply(ip_repr, tcp_repr),
|
Some((ip_repr, tcp_repr)) => TcpProcessResult::ProcessedWithReply(ip_repr, tcp_repr),
|
||||||
};
|
};
|
||||||
|
|
||||||
let became_dead = if socket.state() != old_state {
|
let (state_events, became_dead) = socket.check_state(self, old_state, old_recv_queue);
|
||||||
let (new_events, became_dead) = socket.on_new_state(self);
|
events |= state_events;
|
||||||
events |= new_events;
|
|
||||||
became_dead
|
|
||||||
} else {
|
|
||||||
TcpConnBecameDead::FALSE
|
|
||||||
};
|
|
||||||
|
|
||||||
self.add_events(events);
|
self.add_events(events);
|
||||||
self.update_next_poll_at_ms(socket.poll_at(cx));
|
self.update_next_poll_at_ms(socket.poll_at(cx));
|
||||||
@ -452,16 +554,17 @@ impl<E: Ext> TcpConnectionBg<E> {
|
|||||||
|
|
||||||
/// Tries to generate an outgoing packet and dispatches the generated packet.
|
/// Tries to generate an outgoing packet and dispatches the generated packet.
|
||||||
pub(crate) fn dispatch<D>(
|
pub(crate) fn dispatch<D>(
|
||||||
this: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
dispatch: D,
|
dispatch: D,
|
||||||
) -> (Option<(IpRepr, TcpRepr<'static>)>, TcpConnBecameDead)
|
) -> (Option<(IpRepr, TcpRepr<'static>)>, TcpConnBecameDead)
|
||||||
where
|
where
|
||||||
D: FnOnce(&mut Context, &IpRepr, &TcpRepr) -> Option<(IpRepr, TcpRepr<'static>)>,
|
D: FnOnce(&mut Context, &IpRepr, &TcpRepr) -> Option<(IpRepr, TcpRepr<'static>)>,
|
||||||
{
|
{
|
||||||
let mut socket = this.inner.lock();
|
let mut socket = self.inner.lock();
|
||||||
|
|
||||||
let old_state = socket.state();
|
let old_state = socket.state();
|
||||||
|
let old_recv_queue = socket.recv_queue();
|
||||||
let mut events = SocketEvents::empty();
|
let mut events = SocketEvents::empty();
|
||||||
|
|
||||||
let mut reply = None;
|
let mut reply = None;
|
||||||
@ -482,16 +585,11 @@ impl<E: Ext> TcpConnectionBg<E> {
|
|||||||
events |= SocketEvents::CAN_RECV | SocketEvents::CAN_SEND;
|
events |= SocketEvents::CAN_RECV | SocketEvents::CAN_SEND;
|
||||||
}
|
}
|
||||||
|
|
||||||
let became_dead = if socket.state() != old_state {
|
let (state_events, became_dead) = socket.check_state(self, old_state, old_recv_queue);
|
||||||
let (new_events, became_dead) = socket.on_new_state(this);
|
events |= state_events;
|
||||||
events |= new_events;
|
|
||||||
became_dead
|
|
||||||
} else {
|
|
||||||
TcpConnBecameDead::FALSE
|
|
||||||
};
|
|
||||||
|
|
||||||
this.add_events(events);
|
self.add_events(events);
|
||||||
this.update_next_poll_at_ms(socket.poll_at(cx));
|
self.update_next_poll_at_ms(socket.poll_at(cx));
|
||||||
|
|
||||||
(reply, became_dead)
|
(reply, became_dead)
|
||||||
}
|
}
|
||||||
|
@ -53,23 +53,11 @@ impl<E: Ext> Inner<E> for TcpListenerInner<E> {
|
|||||||
type Observer = E::TcpEventObserver;
|
type Observer = E::TcpEventObserver;
|
||||||
|
|
||||||
fn on_drop(this: &Arc<SocketBg<Self, E>>) {
|
fn on_drop(this: &Arc<SocketBg<Self, E>>) {
|
||||||
// A TCP listener can be removed immediately.
|
debug_assert_eq!(
|
||||||
this.bound.iface().common().remove_tcp_listener(this);
|
Arc::strong_count(this),
|
||||||
|
1,
|
||||||
let (connecting, connected) = {
|
"a listener must be closed before dropping"
|
||||||
let mut socket = this.inner.backlog.lock();
|
);
|
||||||
(
|
|
||||||
core::mem::take(&mut socket.connecting),
|
|
||||||
core::mem::take(&mut socket.connected),
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
// The lock on `connecting`/`connected` cannot be locked after locking `self`, otherwise we
|
|
||||||
// might get a deadlock. due to inconsistent lock order problems.
|
|
||||||
//
|
|
||||||
// FIXME: Send RSTs instead of going through the normal socket close process.
|
|
||||||
drop(connecting);
|
|
||||||
drop(connected);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,7 +128,7 @@ impl<E: Ext> TcpListener<E> {
|
|||||||
|
|
||||||
let remote_endpoint = {
|
let remote_endpoint = {
|
||||||
// The lock on `accepted` cannot be locked after locking `self`, otherwise we might get
|
// The lock on `accepted` cannot be locked after locking `self`, otherwise we might get
|
||||||
// a deadlock. due to inconsistent lock order problems.
|
// a deadlock due to inconsistent lock order problems.
|
||||||
let mut socket = accepted.0.inner.lock();
|
let mut socket = accepted.0.inner.lock();
|
||||||
|
|
||||||
socket.listener = None;
|
socket.listener = None;
|
||||||
@ -156,6 +144,30 @@ impl<E: Ext> TcpListener<E> {
|
|||||||
pub fn can_accept(&self) -> bool {
|
pub fn can_accept(&self) -> bool {
|
||||||
!self.0.inner.backlog.lock().connected.is_empty()
|
!self.0.inner.backlog.lock().connected.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Closes the listener.
|
||||||
|
///
|
||||||
|
/// Polling the iface is _always_ required after this method succeeds.
|
||||||
|
///
|
||||||
|
/// Note that this method must be called before dropping the TCP listener to avoid resource
|
||||||
|
/// leakage.
|
||||||
|
pub fn close(&self) {
|
||||||
|
// A TCP listener can be removed immediately.
|
||||||
|
self.0.bound.iface().common().remove_tcp_listener(&self.0);
|
||||||
|
|
||||||
|
let (connecting, connected) = {
|
||||||
|
let mut socket = self.0.inner.backlog.lock();
|
||||||
|
(
|
||||||
|
core::mem::take(&mut socket.connecting),
|
||||||
|
core::mem::take(&mut socket.connected),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
// The lock on `connecting`/`connected` cannot be locked after locking `self`, otherwise we
|
||||||
|
// might get a deadlock. due to inconsistent lock order problems.
|
||||||
|
connecting.values().for_each(|socket| socket.reset());
|
||||||
|
connected.iter().for_each(|socket| socket.reset());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: Ext> RawTcpSetOption for TcpListener<E> {
|
impl<E: Ext> RawTcpSetOption for TcpListener<E> {
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
use core::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
|
|
||||||
use aster_bigtcp::{
|
use aster_bigtcp::{
|
||||||
errors::tcp::{RecvError, SendError},
|
errors::tcp::{RecvError, SendError},
|
||||||
socket::{NeedIfacePoll, RawTcpSetOption},
|
socket::{NeedIfacePoll, RawTcpSetOption},
|
||||||
@ -13,10 +11,13 @@ use crate::{
|
|||||||
events::IoEvents,
|
events::IoEvents,
|
||||||
net::{
|
net::{
|
||||||
iface::{Iface, RawTcpSocketExt, TcpConnection},
|
iface::{Iface, RawTcpSocketExt, TcpConnection},
|
||||||
socket::util::{send_recv_flags::SendRecvFlags, shutdown_cmd::SockShutdownCmd},
|
socket::{
|
||||||
|
util::{send_recv_flags::SendRecvFlags, shutdown_cmd::SockShutdownCmd},
|
||||||
|
LingerOption,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
process::signal::Pollee,
|
process::signal::{Pollee, Poller},
|
||||||
util::{MultiRead, MultiWrite},
|
util::{MultiRead, MultiWrite},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -34,8 +35,6 @@ pub struct ConnectedStream {
|
|||||||
/// connection is established asynchronously will succeed and any subsequent `connect()` will
|
/// connection is established asynchronously will succeed and any subsequent `connect()` will
|
||||||
/// fail.
|
/// fail.
|
||||||
is_new_connection: bool,
|
is_new_connection: bool,
|
||||||
/// Indicates if the receiving side of this socket is shut down by the user.
|
|
||||||
is_receiving_shut: AtomicBool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectedStream {
|
impl ConnectedStream {
|
||||||
@ -48,7 +47,6 @@ impl ConnectedStream {
|
|||||||
tcp_conn,
|
tcp_conn,
|
||||||
remote_endpoint,
|
remote_endpoint,
|
||||||
is_new_connection,
|
is_new_connection,
|
||||||
is_receiving_shut: AtomicBool::new(false),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,12 +54,14 @@ impl ConnectedStream {
|
|||||||
let mut events = IoEvents::empty();
|
let mut events = IoEvents::empty();
|
||||||
|
|
||||||
if cmd.shut_read() {
|
if cmd.shut_read() {
|
||||||
self.is_receiving_shut.store(true, Ordering::Relaxed);
|
if !self.tcp_conn.shut_recv() {
|
||||||
|
return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected");
|
||||||
|
}
|
||||||
events |= IoEvents::IN | IoEvents::RDHUP;
|
events |= IoEvents::IN | IoEvents::RDHUP;
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.shut_write() {
|
if cmd.shut_write() {
|
||||||
if !self.tcp_conn.close() {
|
if !self.tcp_conn.shut_send() {
|
||||||
return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected");
|
return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected");
|
||||||
}
|
}
|
||||||
events |= IoEvents::OUT | IoEvents::HUP;
|
events |= IoEvents::OUT | IoEvents::HUP;
|
||||||
@ -85,9 +85,6 @@ impl ConnectedStream {
|
|||||||
});
|
});
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok((Ok(0), need_poll)) if self.is_receiving_shut.load(Ordering::Relaxed) => {
|
|
||||||
Ok((0, need_poll))
|
|
||||||
}
|
|
||||||
Ok((Ok(0), need_poll)) => {
|
Ok((Ok(0), need_poll)) => {
|
||||||
debug_assert!(!*need_poll);
|
debug_assert!(!*need_poll);
|
||||||
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty")
|
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty")
|
||||||
@ -166,8 +163,7 @@ impl ConnectedStream {
|
|||||||
|
|
||||||
pub(super) fn check_io_events(&self) -> IoEvents {
|
pub(super) fn check_io_events(&self) -> IoEvents {
|
||||||
self.tcp_conn.raw_with(|socket| {
|
self.tcp_conn.raw_with(|socket| {
|
||||||
let is_receiving_closed =
|
let is_receiving_closed = socket.is_recv_shut() || !socket.may_recv_new();
|
||||||
self.is_receiving_shut.load(Ordering::Relaxed) || !socket.may_recv_new();
|
|
||||||
let is_sending_closed = !socket.may_send();
|
let is_sending_closed = !socket.may_send();
|
||||||
|
|
||||||
let mut events = IoEvents::empty();
|
let mut events = IoEvents::empty();
|
||||||
@ -205,4 +201,42 @@ impl ConnectedStream {
|
|||||||
pub(super) fn raw_with<R>(&self, f: impl FnOnce(&RawTcpSocketExt) -> R) -> R {
|
pub(super) fn raw_with<R>(&self, f: impl FnOnce(&RawTcpSocketExt) -> R) -> R {
|
||||||
self.tcp_conn.raw_with(f)
|
self.tcp_conn.raw_with(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn into_connection(self) -> TcpConnection {
|
||||||
|
self.tcp_conn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn close_and_linger(tcp_conn: TcpConnection, linger: LingerOption, pollee: &Pollee) {
|
||||||
|
let timeout = match (linger.is_on(), linger.timeout()) {
|
||||||
|
// No linger. Drain the send buffer in the background.
|
||||||
|
(false, _) => {
|
||||||
|
tcp_conn.close();
|
||||||
|
tcp_conn.iface().poll();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Linger with a zero timeout. Reset the connection immediately.
|
||||||
|
(true, duration) if duration.is_zero() => {
|
||||||
|
tcp_conn.reset();
|
||||||
|
tcp_conn.iface().poll();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Linger with a non-zero timeout. See below.
|
||||||
|
(true, duration) => {
|
||||||
|
tcp_conn.close();
|
||||||
|
tcp_conn.iface().poll();
|
||||||
|
duration
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut poller = Poller::new(Some(&timeout));
|
||||||
|
pollee.register_poller(poller.as_handle_mut(), IoEvents::HUP);
|
||||||
|
|
||||||
|
// Now wait for the ACK packet to acknowledge the FIN packet we sent. If the timeout expires or
|
||||||
|
// we are interrupted by signals, the remaining task is done in the background.
|
||||||
|
while tcp_conn.raw_with(|socket| socket.is_closing()) {
|
||||||
|
if poller.wait().is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -110,4 +110,8 @@ impl ConnectingStream {
|
|||||||
) -> R {
|
) -> R {
|
||||||
set_option(&self.tcp_conn)
|
set_option(&self.tcp_conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn into_connection(self) -> TcpConnection {
|
||||||
|
self.tcp_conn
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,4 +73,8 @@ impl ListenStream {
|
|||||||
) -> R {
|
) -> R {
|
||||||
set_option(&self.tcp_listener)
|
set_option(&self.tcp_listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn into_listener(self) -> TcpListener {
|
||||||
|
self.tcp_listener
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ use aster_bigtcp::{
|
|||||||
socket::{NeedIfacePoll, RawTcpOption, RawTcpSetOption},
|
socket::{NeedIfacePoll, RawTcpOption, RawTcpSetOption},
|
||||||
wire::IpEndpoint,
|
wire::IpEndpoint,
|
||||||
};
|
};
|
||||||
use connected::ConnectedStream;
|
use connected::{close_and_linger, ConnectedStream};
|
||||||
use connecting::{ConnResult, ConnectingStream};
|
use connecting::{ConnResult, ConnectingStream};
|
||||||
use init::InitStream;
|
use init::InitStream;
|
||||||
use listen::ListenStream;
|
use listen::ListenStream;
|
||||||
@ -825,14 +825,19 @@ impl Drop for StreamSocket {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let state = self.state.get_mut().take();
|
let state = self.state.get_mut().take();
|
||||||
|
|
||||||
let iface_to_poll = state.iface().cloned();
|
let conn = match state {
|
||||||
|
State::Init(_) => return,
|
||||||
// Dropping the state will drop the sockets. This will trigger the socket close process (if
|
State::Connecting(connecting_stream) => connecting_stream.into_connection(),
|
||||||
// needed) and require immediate iface polling afterwards.
|
State::Connected(connected_stream) => connected_stream.into_connection(),
|
||||||
drop(state);
|
State::Listen(listen_stream) => {
|
||||||
|
let listener = listen_stream.into_listener();
|
||||||
if let Some(iface) = iface_to_poll {
|
listener.close();
|
||||||
iface.poll();
|
listener.iface().poll();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let linger = self.options.get_mut().socket.linger();
|
||||||
|
close_and_linger(conn, linger, &self.pollee);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -151,7 +151,12 @@ impl Pollee {
|
|||||||
new_events & mask
|
new_events & mask
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_poller(&self, poller: &mut PollHandle, mask: IoEvents) {
|
/// Registers a poller to listen notification for new events.
|
||||||
|
///
|
||||||
|
/// The functionality of this method is a subset of calling [`Self::poll_with`] and providing
|
||||||
|
/// the same poller. Unlike [`Self::poll_with`], this method performs poller registration
|
||||||
|
/// without checking (and perhaps caching) the current events.
|
||||||
|
pub fn register_poller(&self, poller: &mut PollHandle, mask: IoEvents) {
|
||||||
self.inner
|
self.inner
|
||||||
.subject
|
.subject
|
||||||
.register_observer(poller.observer.clone(), mask);
|
.register_observer(poller.observer.clone(), mask);
|
||||||
|
@ -611,3 +611,40 @@ FN_TEST(bind_and_connect_same_address)
|
|||||||
TEST_SUCC(close(sk_connect2));
|
TEST_SUCC(close(sk_connect2));
|
||||||
}
|
}
|
||||||
END_TEST()
|
END_TEST()
|
||||||
|
|
||||||
|
#define SETUP_CONN \
|
||||||
|
sk_addr.sin_port = S_PORT; \
|
||||||
|
\
|
||||||
|
sk_connect = TEST_SUCC(socket(PF_INET, SOCK_STREAM, 0)); \
|
||||||
|
TEST_SUCC(connect(sk_connect, (struct sockaddr *)&sk_addr, \
|
||||||
|
sizeof(sk_addr))); \
|
||||||
|
\
|
||||||
|
len = sizeof(sk_addr); \
|
||||||
|
sk_accept = TEST_SUCC( \
|
||||||
|
accept(sk_listen, (struct sockaddr *)&sk_addr, &len));
|
||||||
|
|
||||||
|
FN_TEST(shutdown_shutdown)
|
||||||
|
{
|
||||||
|
int sk_accept;
|
||||||
|
int sk_connect;
|
||||||
|
socklen_t len;
|
||||||
|
|
||||||
|
SETUP_CONN;
|
||||||
|
|
||||||
|
// Test 1: Perform `shutdown` multiple times
|
||||||
|
TEST_SUCC(shutdown(sk_accept, SHUT_RDWR));
|
||||||
|
TEST_SUCC(shutdown(sk_accept, SHUT_RDWR));
|
||||||
|
|
||||||
|
// Test 2: Perform `shutdown` after the connection is closed
|
||||||
|
TEST_SUCC(shutdown(sk_connect, SHUT_RDWR));
|
||||||
|
TEST_ERRNO(shutdown(sk_connect, SHUT_RD), ENOTCONN);
|
||||||
|
TEST_ERRNO(shutdown(sk_connect, SHUT_WR), ENOTCONN);
|
||||||
|
TEST_ERRNO(shutdown(sk_accept, SHUT_RD), ENOTCONN);
|
||||||
|
TEST_ERRNO(shutdown(sk_accept, SHUT_WR), ENOTCONN);
|
||||||
|
|
||||||
|
TEST_SUCC(close(sk_accept));
|
||||||
|
TEST_SUCC(close(sk_connect));
|
||||||
|
}
|
||||||
|
END_TEST()
|
||||||
|
|
||||||
|
#undef SETUP_CONN
|
||||||
|
Reference in New Issue
Block a user