Don't poll ifaces if not necessary

This commit is contained in:
Ruihan Li
2024-11-16 23:09:20 +08:00
committed by Tate, Hongliang Tian
parent fab61f5f66
commit 1c1da8ea06
11 changed files with 217 additions and 87 deletions

View File

@ -256,8 +256,25 @@ pub enum ConnectState {
Refused, Refused,
} }
#[derive(Debug, Clone, Copy)]
pub struct NeedIfacePoll(bool);
impl NeedIfacePoll {
pub const FALSE: Self = Self(false);
}
impl Deref for NeedIfacePoll {
type Target = bool;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<E> BoundTcpSocket<E> { impl<E> BoundTcpSocket<E> {
/// Connects to a remote endpoint. /// Connects to a remote endpoint.
///
/// Polling the iface is _always_ required after this method succeeds.
pub fn connect( pub fn connect(
&self, &self,
remote_endpoint: IpEndpoint, remote_endpoint: IpEndpoint,
@ -270,8 +287,7 @@ impl<E> BoundTcpSocket<E> {
socket.connect(iface.context(), remote_endpoint, self.0.port)?; socket.connect(iface.context(), remote_endpoint, self.0.port)?;
socket.has_connected = false; socket.has_connected = false;
self.0 self.0.update_next_poll_at_ms(PollAt::Now);
.update_next_poll_at_ms(socket.poll_at(iface.context()));
Ok(()) Ok(())
} }
@ -290,6 +306,8 @@ impl<E> BoundTcpSocket<E> {
} }
/// Listens at a specified endpoint. /// Listens at a specified endpoint.
///
/// Polling the iface is _not_ required after this method succeeds.
pub fn listen( pub fn listen(
&self, &self,
local_endpoint: IpEndpoint, local_endpoint: IpEndpoint,
@ -299,30 +317,49 @@ impl<E> BoundTcpSocket<E> {
socket.listen(local_endpoint) socket.listen(local_endpoint)
} }
pub fn send<F, R>(&self, f: F) -> Result<R, smoltcp::socket::tcp::SendError> /// Sends some data.
///
/// Polling the iface _may_ be required after this method succeeds.
pub fn send<F, R>(&self, f: F) -> Result<(R, NeedIfacePoll), smoltcp::socket::tcp::SendError>
where where
F: FnOnce(&mut [u8]) -> (usize, R), F: FnOnce(&mut [u8]) -> (usize, R),
{ {
let common = self.iface().common();
let mut iface = common.interface();
let mut socket = self.0.socket.lock(); let mut socket = self.0.socket.lock();
let result = socket.send(f); let result = socket.send(f)?;
self.0.update_next_poll_at_ms(PollAt::Now); let need_poll = self
.0
.update_next_poll_at_ms(socket.poll_at(iface.context()));
result Ok((result, need_poll))
} }
pub fn recv<F, R>(&self, f: F) -> Result<R, smoltcp::socket::tcp::RecvError> /// Receives some data.
///
/// Polling the iface _may_ be required after this method succeeds.
pub fn recv<F, R>(&self, f: F) -> Result<(R, NeedIfacePoll), smoltcp::socket::tcp::RecvError>
where where
F: FnOnce(&mut [u8]) -> (usize, R), F: FnOnce(&mut [u8]) -> (usize, R),
{ {
let common = self.iface().common();
let mut iface = common.interface();
let mut socket = self.0.socket.lock(); let mut socket = self.0.socket.lock();
let result = socket.recv(f); let result = socket.recv(f)?;
self.0.update_next_poll_at_ms(PollAt::Now); let need_poll = self
.0
.update_next_poll_at_ms(socket.poll_at(iface.context()));
result Ok((result, need_poll))
} }
/// Closes the connection.
///
/// Polling the iface is _always_ required after this method succeeds.
pub fn close(&self) { pub fn close(&self) {
let mut socket = self.0.socket.lock(); let mut socket = self.0.socket.lock();
@ -345,12 +382,17 @@ impl<E> BoundTcpSocket<E> {
impl<E> BoundUdpSocket<E> { impl<E> BoundUdpSocket<E> {
/// Binds to a specified endpoint. /// Binds to a specified endpoint.
///
/// Polling the iface is _not_ required after this method succeeds.
pub fn bind(&self, local_endpoint: IpEndpoint) -> Result<(), smoltcp::socket::udp::BindError> { pub fn bind(&self, local_endpoint: IpEndpoint) -> Result<(), smoltcp::socket::udp::BindError> {
let mut socket = self.0.socket.lock(); let mut socket = self.0.socket.lock();
socket.bind(local_endpoint) socket.bind(local_endpoint)
} }
/// Sends some data.
///
/// Polling the iface is _always_ required after this method succeeds.
pub fn send<F, R>( pub fn send<F, R>(
&self, &self,
size: usize, size: usize,
@ -381,6 +423,9 @@ impl<E> BoundUdpSocket<E> {
Ok(result) Ok(result)
} }
/// Receives some data.
///
/// Polling the iface is _not_ required after this method succeeds.
pub fn recv<F, R>(&self, f: F) -> Result<R, smoltcp::socket::udp::RecvError> pub fn recv<F, R>(&self, f: F) -> Result<R, smoltcp::socket::udp::RecvError>
where where
F: FnOnce(&[u8], UdpMetadata) -> R, F: FnOnce(&[u8], UdpMetadata) -> R,
@ -389,7 +434,6 @@ impl<E> BoundUdpSocket<E> {
let (data, meta) = socket.recv()?; let (data, meta) = socket.recv()?;
let result = f(data, meta); let result = f(data, meta);
self.0.update_next_poll_at_ms(PollAt::Now);
Ok(result) Ok(result)
} }
@ -448,13 +492,25 @@ impl<T, E> BoundSocketInner<T, E> {
/// The update is typically needed after new network or user events have been handled, so this /// The update is typically needed after new network or user events have been handled, so this
/// method also marks that there may be new events, so that the event observer provided by /// method also marks that there may be new events, so that the event observer provided by
/// [`BoundSocket::set_observer`] can be notified later. /// [`BoundSocket::set_observer`] can be notified later.
fn update_next_poll_at_ms(&self, poll_at: PollAt) { fn update_next_poll_at_ms(&self, poll_at: PollAt) -> NeedIfacePoll {
match poll_at { match poll_at {
PollAt::Now => self.next_poll_at_ms.store(0, Ordering::Relaxed), PollAt::Now => {
PollAt::Time(instant) => self self.next_poll_at_ms.store(0, Ordering::Relaxed);
.next_poll_at_ms NeedIfacePoll(true)
.store(instant.total_millis() as u64, Ordering::Relaxed), }
PollAt::Ingress => self.next_poll_at_ms.store(u64::MAX, Ordering::Relaxed), PollAt::Time(instant) => {
let old_total_millis = self.next_poll_at_ms.load(Ordering::Relaxed);
let new_total_millis = instant.total_millis() as u64;
self.next_poll_at_ms
.store(new_total_millis, Ordering::Relaxed);
NeedIfacePoll(new_total_millis < old_total_millis)
}
PollAt::Ingress => {
self.next_poll_at_ms.store(u64::MAX, Ordering::Relaxed);
NeedIfacePoll(false)
}
} }
} }
} }

View File

@ -5,7 +5,7 @@ mod event;
mod state; mod state;
mod unbound; mod unbound;
pub use bound::{BoundTcpSocket, BoundUdpSocket, ConnectState}; pub use bound::{BoundTcpSocket, BoundUdpSocket, ConnectState, NeedIfacePoll};
pub(crate) use bound::{BoundTcpSocketInner, BoundUdpSocketInner, TcpProcessResult}; pub(crate) use bound::{BoundTcpSocketInner, BoundUdpSocketInner, TcpProcessResult};
pub use event::{SocketEventObserver, SocketEvents}; pub use event::{SocketEventObserver, SocketEvents};
pub use state::TcpStateCheck; pub use state::TcpStateCheck;

View File

@ -6,7 +6,7 @@ use aster_bigtcp::device::WithDevice;
use ostd::sync::LocalIrqDisabled; use ostd::sync::LocalIrqDisabled;
use spin::Once; use spin::Once;
use super::{poll_ifaces, Iface}; use super::{poll::poll_ifaces, Iface};
use crate::{ use crate::{
net::iface::ext::{IfaceEx, IfaceExt}, net::iface::ext::{IfaceEx, IfaceExt},
prelude::*, prelude::*,

View File

@ -4,8 +4,9 @@ mod ext;
mod init; mod init;
mod poll; mod poll;
pub use ext::IfaceEx;
pub use init::{init, IFACES}; pub use init::{init, IFACES};
pub use poll::{lazy_init, poll_ifaces}; pub use poll::lazy_init;
pub type Iface = dyn aster_bigtcp::iface::Iface<ext::IfaceExt>; pub type Iface = dyn aster_bigtcp::iface::Iface<ext::IfaceExt>;
pub type BoundTcpSocket = aster_bigtcp::socket::BoundTcpSocket<ext::IfaceExt>; pub type BoundTcpSocket = aster_bigtcp::socket::BoundTcpSocket<ext::IfaceExt>;

View File

@ -19,7 +19,7 @@ pub fn lazy_init() {
} }
} }
pub fn poll_ifaces() { pub(super) fn poll_ifaces() {
let ifaces = IFACES.get().unwrap(); let ifaces = IFACES.get().unwrap();
for iface in ifaces.iter() { for iface in ifaces.iter() {

View File

@ -7,7 +7,10 @@ use aster_bigtcp::{
use crate::{ use crate::{
events::IoEvents, events::IoEvents,
net::{iface::BoundUdpSocket, socket::util::send_recv_flags::SendRecvFlags}, net::{
iface::{BoundUdpSocket, Iface},
socket::util::send_recv_flags::SendRecvFlags,
},
prelude::*, prelude::*,
util::{MultiRead, MultiWrite}, util::{MultiRead, MultiWrite},
}; };
@ -37,6 +40,10 @@ impl BoundDatagram {
self.remote_endpoint = Some(*endpoint) self.remote_endpoint = Some(*endpoint)
} }
pub fn iface(&self) -> &Arc<Iface> {
self.bound_socket.iface()
}
pub fn try_recv( pub fn try_recv(
&self, &self,
writer: &mut dyn MultiWrite, writer: &mut dyn MultiWrite,

View File

@ -18,7 +18,7 @@ use crate::{
}, },
match_sock_option_mut, match_sock_option_mut,
net::{ net::{
iface::poll_ifaces, iface::IfaceEx,
socket::{ socket::{
options::{Error as SocketError, SocketOption}, options::{Error as SocketError, SocketOption},
util::{ util::{
@ -157,14 +157,9 @@ impl DatagramSocket {
return_errno_with_message!(Errno::EAGAIN, "the socket is not bound"); return_errno_with_message!(Errno::EAGAIN, "the socket is not bound");
}; };
let received = bound_datagram bound_datagram
.try_recv(writer, flags) .try_recv(writer, flags)
.map(|(recv_bytes, remote_endpoint)| (recv_bytes, remote_endpoint.into())); .map(|(recv_bytes, remote_endpoint)| (recv_bytes, remote_endpoint.into()))
drop(inner);
poll_ifaces();
received
} }
fn recv( fn recv(
@ -191,12 +186,13 @@ impl DatagramSocket {
return_errno_with_message!(Errno::EAGAIN, "the socket is not bound") return_errno_with_message!(Errno::EAGAIN, "the socket is not bound")
}; };
let sent_bytes = bound_datagram.try_send(reader, remote, flags); let sent_bytes = bound_datagram.try_send(reader, remote, flags)?;
let iface_to_poll = bound_datagram.iface().clone();
drop(inner); drop(inner);
poll_ifaces(); iface_to_poll.poll();
sent_bytes Ok(sent_bytes)
} }
fn check_io_events(&self) -> IoEvents { fn check_io_events(&self) -> IoEvents {

View File

@ -5,14 +5,14 @@ use core::sync::atomic::{AtomicBool, Ordering};
use aster_bigtcp::{ use aster_bigtcp::{
errors::tcp::{RecvError, SendError}, errors::tcp::{RecvError, SendError},
socket::{SocketEventObserver, TcpStateCheck}, socket::{NeedIfacePoll, SocketEventObserver, TcpStateCheck},
wire::IpEndpoint, wire::IpEndpoint,
}; };
use crate::{ use crate::{
events::IoEvents, events::IoEvents,
net::{ net::{
iface::BoundTcpSocket, iface::{BoundTcpSocket, Iface},
socket::util::{send_recv_flags::SendRecvFlags, shutdown_cmd::SockShutdownCmd}, socket::util::{send_recv_flags::SendRecvFlags, shutdown_cmd::SockShutdownCmd},
}, },
prelude::*, prelude::*,
@ -79,7 +79,11 @@ impl ConnectedStream {
Ok(()) Ok(())
} }
pub fn try_recv(&self, writer: &mut dyn MultiWrite, _flags: SendRecvFlags) -> Result<usize> { pub fn try_recv(
&self,
writer: &mut dyn MultiWrite,
_flags: SendRecvFlags,
) -> Result<(usize, NeedIfacePoll)> {
let result = self.bound_socket.recv(|socket_buffer| { let result = self.bound_socket.recv(|socket_buffer| {
match writer.write(&mut VmReader::from(&*socket_buffer)) { match writer.write(&mut VmReader::from(&*socket_buffer)) {
Ok(len) => (len, Ok(len)), Ok(len) => (len, Ok(len)),
@ -88,18 +92,30 @@ impl ConnectedStream {
}); });
match result { match result {
Ok(Ok(0)) if self.is_receiving_closed.load(Ordering::Relaxed) => Ok(0), Ok((Ok(0), need_poll)) if self.is_receiving_closed.load(Ordering::Relaxed) => {
Ok(Ok(0)) => return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty"), Ok((0, need_poll))
Ok(Ok(recv_bytes)) => Ok(recv_bytes), }
Ok(Err(e)) => Err(e), Ok((Ok(0), need_poll)) => {
Err(RecvError::Finished) => Ok(0), debug_assert!(!*need_poll);
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty")
}
Ok((Ok(recv_bytes), need_poll)) => Ok((recv_bytes, need_poll)),
Ok((Err(e), need_poll)) => {
debug_assert!(!*need_poll);
Err(e)
}
Err(RecvError::Finished) => Ok((0, NeedIfacePoll::FALSE)),
Err(RecvError::InvalidState) => { Err(RecvError::InvalidState) => {
return_errno_with_message!(Errno::ECONNRESET, "the connection is reset") return_errno_with_message!(Errno::ECONNRESET, "the connection is reset")
} }
} }
} }
pub fn try_send(&self, reader: &mut dyn MultiRead, _flags: SendRecvFlags) -> Result<usize> { pub fn try_send(
&self,
reader: &mut dyn MultiRead,
_flags: SendRecvFlags,
) -> Result<(usize, NeedIfacePoll)> {
let result = self.bound_socket.send(|socket_buffer| { let result = self.bound_socket.send(|socket_buffer| {
match reader.read(&mut VmWriter::from(socket_buffer)) { match reader.read(&mut VmWriter::from(socket_buffer)) {
Ok(len) => (len, Ok(len)), Ok(len) => (len, Ok(len)),
@ -108,9 +124,15 @@ impl ConnectedStream {
}); });
match result { match result {
Ok(Ok(0)) => return_errno_with_message!(Errno::EAGAIN, "the send buffer is full"), Ok((Ok(0), need_poll)) => {
Ok(Ok(sent_bytes)) => Ok(sent_bytes), debug_assert!(!*need_poll);
Ok(Err(e)) => Err(e), return_errno_with_message!(Errno::EAGAIN, "the send buffer is full")
}
Ok((Ok(sent_bytes), need_poll)) => Ok((sent_bytes, need_poll)),
Ok((Err(e), need_poll)) => {
debug_assert!(!*need_poll);
Err(e)
}
Err(SendError::InvalidState) => { Err(SendError::InvalidState) => {
// FIXME: `EPIPE` is another possibility, which means that the socket is shut down // FIXME: `EPIPE` is another possibility, which means that the socket is shut down
// for writing. In that case, we should also trigger a `SIGPIPE` if `MSG_NOSIGNAL` // for writing. In that case, we should also trigger a `SIGPIPE` if `MSG_NOSIGNAL`
@ -128,6 +150,10 @@ impl ConnectedStream {
self.remote_endpoint self.remote_endpoint
} }
pub fn iface(&self) -> &Arc<Iface> {
self.bound_socket.iface()
}
pub fn check_new(&mut self) -> Result<()> { pub fn check_new(&mut self) -> Result<()> {
if !self.is_new_connection { if !self.is_new_connection {
return_errno_with_message!(Errno::EISCONN, "the socket is already connected"); return_errno_with_message!(Errno::EISCONN, "the socket is already connected");

View File

@ -3,7 +3,11 @@
use aster_bigtcp::{socket::ConnectState, wire::IpEndpoint}; use aster_bigtcp::{socket::ConnectState, wire::IpEndpoint};
use super::{connected::ConnectedStream, init::InitStream}; use super::{connected::ConnectedStream, init::InitStream};
use crate::{events::IoEvents, net::iface::BoundTcpSocket, prelude::*}; use crate::{
events::IoEvents,
net::iface::{BoundTcpSocket, Iface},
prelude::*,
};
pub struct ConnectingStream { pub struct ConnectingStream {
bound_socket: BoundTcpSocket, bound_socket: BoundTcpSocket,
@ -72,6 +76,10 @@ impl ConnectingStream {
self.remote_endpoint self.remote_endpoint
} }
pub fn iface(&self) -> &Arc<Iface> {
self.bound_socket.iface()
}
pub(super) fn check_io_events(&self) -> IoEvents { pub(super) fn check_io_events(&self) -> IoEvents {
IoEvents::empty() IoEvents::empty()
} }

View File

@ -5,7 +5,11 @@ use aster_bigtcp::{
}; };
use super::connected::ConnectedStream; use super::connected::ConnectedStream;
use crate::{events::IoEvents, net::iface::BoundTcpSocket, prelude::*}; use crate::{
events::IoEvents,
net::iface::{BoundTcpSocket, Iface},
prelude::*,
};
pub struct ListenStream { pub struct ListenStream {
backlog: usize, backlog: usize,
@ -80,6 +84,10 @@ impl ListenStream {
self.bound_socket.local_endpoint().unwrap() self.bound_socket.local_endpoint().unwrap()
} }
pub fn iface(&self) -> &Arc<Iface> {
self.bound_socket.iface()
}
pub(super) fn check_io_events(&self) -> IoEvents { pub(super) fn check_io_events(&self) -> IoEvents {
let backlog_sockets = self.backlog_sockets.read(); let backlog_sockets = self.backlog_sockets.read();

View File

@ -24,7 +24,7 @@ use crate::{
}, },
match_sock_option_mut, match_sock_option_ref, match_sock_option_mut, match_sock_option_ref,
net::{ net::{
iface::poll_ifaces, iface::IfaceEx,
socket::{ socket::{
options::{Error as SocketError, SocketOption}, options::{Error as SocketError, SocketOption},
util::{ util::{
@ -190,32 +190,36 @@ impl StreamSocket {
let is_nonblocking = self.is_nonblocking(); let is_nonblocking = self.is_nonblocking();
let mut state = self.write_updated_state(); let mut state = self.write_updated_state();
let result_or_block = state.borrow_result(|mut owned_state| { let (result_or_block, iface_to_poll) = state.borrow_result(|mut owned_state| {
let init_stream = match owned_state { let init_stream = match owned_state {
State::Init(init_stream) => init_stream, State::Init(init_stream) => init_stream,
State::Connecting(_) if is_nonblocking => { State::Connecting(_) if is_nonblocking => {
return ( return (
owned_state, owned_state,
Some(Err(Error::with_message( (
Errno::EALREADY, Some(Err(Error::with_message(
"the socket is connecting", Errno::EALREADY,
))), "the socket is connecting",
))),
None,
),
); );
} }
State::Connecting(_) => { State::Connecting(_) => return (owned_state, (None, None)),
return (owned_state, None);
}
State::Connected(ref mut connected_stream) => { State::Connected(ref mut connected_stream) => {
let err = connected_stream.check_new(); let err = connected_stream.check_new();
return (owned_state, Some(err)); return (owned_state, (Some(err), None));
} }
State::Listen(_) => { State::Listen(_) => {
return ( return (
owned_state, owned_state,
Some(Err(Error::with_message( (
Errno::EISCONN, Some(Err(Error::with_message(
"the socket is listening", Errno::EISCONN,
))), "the socket is listening",
))),
None,
),
); );
} }
}; };
@ -223,25 +227,30 @@ impl StreamSocket {
let connecting_stream = match init_stream.connect(remote_endpoint) { let connecting_stream = match init_stream.connect(remote_endpoint) {
Ok(connecting_stream) => connecting_stream, Ok(connecting_stream) => connecting_stream,
Err((err, init_stream)) => { Err((err, init_stream)) => {
return (State::Init(init_stream), Some(Err(err))); return (State::Init(init_stream), (Some(Err(err)), None));
} }
}; };
let result_or_block = if is_nonblocking {
Some(Err(Error::with_message(
Errno::EINPROGRESS,
"the socket is connecting",
)))
} else {
None
};
let iface_to_poll = connecting_stream.iface().clone();
( (
State::Connecting(connecting_stream), State::Connecting(connecting_stream),
if is_nonblocking { (result_or_block, Some(iface_to_poll)),
Some(Err(Error::with_message(
Errno::EINPROGRESS,
"the socket is connecting",
)))
} else {
None
},
) )
}); });
drop(state); drop(state);
poll_ifaces(); if let Some(iface) = iface_to_poll {
iface.poll();
}
result_or_block result_or_block
} }
@ -274,9 +283,10 @@ impl StreamSocket {
let accepted_socket = Self::new_connected(connected_stream); let accepted_socket = Self::new_connected(connected_stream);
(accepted_socket as _, remote_endpoint.into()) (accepted_socket as _, remote_endpoint.into())
}); });
let iface_to_poll = listen_stream.iface().clone();
drop(state); drop(state);
poll_ifaces(); iface_to_poll.poll();
accepted accepted
} }
@ -298,15 +308,16 @@ impl StreamSocket {
} }
}; };
let received = connected_stream.try_recv(writer, flags).map(|recv_bytes| { let (recv_bytes, need_poll) = connected_stream.try_recv(writer, flags)?;
let remote_endpoint = connected_stream.remote_endpoint(); let iface_to_poll = need_poll.then(|| connected_stream.iface().clone());
(recv_bytes, remote_endpoint.into()) let remote_endpoint = connected_stream.remote_endpoint();
});
drop(state); drop(state);
poll_ifaces(); if let Some(iface) = iface_to_poll {
iface.poll();
}
received Ok((recv_bytes, remote_endpoint.into()))
} }
fn recv( fn recv(
@ -337,12 +348,15 @@ impl StreamSocket {
} }
}; };
let sent_bytes = connected_stream.try_send(reader, flags); let (sent_bytes, need_poll) = connected_stream.try_send(reader, flags)?;
let iface_to_poll = need_poll.then(|| connected_stream.iface().clone());
drop(state); drop(state);
poll_ifaces(); if let Some(iface) = iface_to_poll {
iface.poll();
}
sent_bytes Ok(sent_bytes)
} }
fn send(&self, reader: &mut dyn MultiRead, flags: SendRecvFlags) -> Result<usize> { fn send(&self, reader: &mut dyn MultiRead, flags: SendRecvFlags) -> Result<usize> {
@ -498,16 +512,20 @@ impl Socket for StreamSocket {
fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
let state = self.read_updated_state(); let state = self.read_updated_state();
let res = match state.as_ref() {
State::Connected(connected_stream) => connected_stream.shutdown(cmd, &self.pollee), let (result, iface_to_poll) = match state.as_ref() {
State::Connected(connected_stream) => (
connected_stream.shutdown(cmd, &self.pollee),
connected_stream.iface().clone(),
),
// TODO: shutdown listening stream // TODO: shutdown listening stream
_ => return_errno_with_message!(Errno::EINVAL, "cannot shutdown"), _ => return_errno_with_message!(Errno::EINVAL, "cannot shutdown"),
}; };
drop(state); drop(state);
poll_ifaces(); iface_to_poll.poll();
res result
} }
fn addr(&self) -> Result<SocketAddr> { fn addr(&self) -> Result<SocketAddr> {
@ -692,8 +710,18 @@ impl SocketEventObserver for StreamSocket {
impl Drop for StreamSocket { impl Drop for StreamSocket {
fn drop(&mut self) { fn drop(&mut self) {
self.state.write().take(); let state = self.state.write().take();
poll_ifaces(); let iface_to_poll = match state {
State::Init(_) => None,
State::Connecting(ref connecting_stream) => Some(connecting_stream.iface().clone()),
State::Connected(ref connected_stream) => Some(connected_stream.iface().clone()),
State::Listen(ref listen_stream) => Some(listen_stream.iface().clone()),
};
drop(state);
if let Some(iface) = iface_to_poll {
iface.poll();
}
} }
} }