mirror of
https://github.com/asterinas/asterinas.git
synced 2025-06-24 01:43:22 +00:00
Implement blocking connect
properly
This commit is contained in:
committed by
Tate, Hongliang Tian
parent
6f915133b5
commit
320092eda2
@ -15,7 +15,7 @@ use crate::{
|
|||||||
SockShutdownCmd, SocketAddr,
|
SockShutdownCmd, SocketAddr,
|
||||||
},
|
},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
process::signal::{Pollee, Poller},
|
process::signal::{Pauser, Pollee, Poller},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(super) struct Listener {
|
pub(super) struct Listener {
|
||||||
@ -105,6 +105,8 @@ impl Listener {
|
|||||||
|
|
||||||
impl Drop for Listener {
|
impl Drop for Listener {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
self.backlog.shutdown();
|
||||||
|
|
||||||
unregister_backlog(&self.backlog.addr().to_key())
|
unregister_backlog(&self.backlog.addr().to_key())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -147,37 +149,17 @@ impl BacklogTable {
|
|||||||
self.backlog_sockets.read().get(addr).cloned()
|
self.backlog_sockets.read().get(addr).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push_incoming(
|
|
||||||
&self,
|
|
||||||
server_key: &UnixSocketAddrKey,
|
|
||||||
init: Init,
|
|
||||||
) -> core::result::Result<Connected, (Error, Init)> {
|
|
||||||
let backlog = match self.get_backlog(server_key) {
|
|
||||||
Some(backlog) => backlog,
|
|
||||||
None => {
|
|
||||||
return Err((
|
|
||||||
Error::with_message(
|
|
||||||
Errno::ECONNREFUSED,
|
|
||||||
"no socket is listening at the remote address",
|
|
||||||
),
|
|
||||||
init,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
backlog.push_incoming(init)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_backlog(&self, addr_key: &UnixSocketAddrKey) {
|
fn remove_backlog(&self, addr_key: &UnixSocketAddrKey) {
|
||||||
self.backlog_sockets.write().remove(addr_key);
|
self.backlog_sockets.write().remove(addr_key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Backlog {
|
pub(super) struct Backlog {
|
||||||
addr: UnixSocketAddrBound,
|
addr: UnixSocketAddrBound,
|
||||||
pollee: Pollee,
|
pollee: Pollee,
|
||||||
backlog: AtomicUsize,
|
backlog: AtomicUsize,
|
||||||
incoming_conns: Mutex<Option<VecDeque<Connected>>>,
|
incoming_conns: Mutex<Option<VecDeque<Connected>>>,
|
||||||
|
pauser: Arc<Pauser>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Backlog {
|
impl Backlog {
|
||||||
@ -193,6 +175,7 @@ impl Backlog {
|
|||||||
pollee,
|
pollee,
|
||||||
backlog: AtomicUsize::new(backlog),
|
backlog: AtomicUsize::new(backlog),
|
||||||
incoming_conns: Mutex::new(incoming_sockets),
|
incoming_conns: Mutex::new(incoming_sockets),
|
||||||
|
pauser: Pauser::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,7 +183,73 @@ impl Backlog {
|
|||||||
&self.addr
|
&self.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push_incoming(&self, init: Init) -> core::result::Result<Connected, (Error, Init)> {
|
fn pop_incoming(&self) -> Result<Connected> {
|
||||||
|
let mut locked_incoming_conns = self.incoming_conns.lock();
|
||||||
|
|
||||||
|
let Some(incoming_conns) = &mut *locked_incoming_conns else {
|
||||||
|
return_errno_with_message!(Errno::EINVAL, "the socket is shut down for reading");
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = incoming_conns.pop_front();
|
||||||
|
if incoming_conns.is_empty() {
|
||||||
|
self.pollee.del_events(IoEvents::IN);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(locked_incoming_conns);
|
||||||
|
|
||||||
|
if conn.is_some() {
|
||||||
|
self.pauser.resume_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_backlog(&self, backlog: usize) {
|
||||||
|
let old_backlog = self.backlog.swap(backlog, Ordering::Relaxed);
|
||||||
|
|
||||||
|
if old_backlog < backlog {
|
||||||
|
self.pauser.resume_all();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shutdown(&self) {
|
||||||
|
let mut incoming_conns = self.incoming_conns.lock();
|
||||||
|
|
||||||
|
*incoming_conns = None;
|
||||||
|
self.pollee.add_events(IoEvents::HUP);
|
||||||
|
self.pollee.del_events(IoEvents::IN);
|
||||||
|
|
||||||
|
drop(incoming_conns);
|
||||||
|
|
||||||
|
self.pauser.resume_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
||||||
|
self.pollee.poll(mask, poller)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_observer(
|
||||||
|
&self,
|
||||||
|
observer: Weak<dyn Observer<IoEvents>>,
|
||||||
|
mask: IoEvents,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.pollee.register_observer(observer, mask);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unregister_observer(
|
||||||
|
&self,
|
||||||
|
observer: &Weak<dyn Observer<IoEvents>>,
|
||||||
|
) -> Option<Weak<dyn Observer<IoEvents>>> {
|
||||||
|
self.pollee.unregister_observer(observer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Backlog {
|
||||||
|
pub(super) fn push_incoming(
|
||||||
|
&self,
|
||||||
|
init: Init,
|
||||||
|
) -> core::result::Result<Connected, (Error, Init)> {
|
||||||
let mut locked_incoming_conns = self.incoming_conns.lock();
|
let mut locked_incoming_conns = self.incoming_conns.lock();
|
||||||
|
|
||||||
let Some(incoming_conns) = &mut *locked_incoming_conns else {
|
let Some(incoming_conns) = &mut *locked_incoming_conns else {
|
||||||
@ -231,51 +280,14 @@ impl Backlog {
|
|||||||
Ok(client_conn)
|
Ok(client_conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pop_incoming(&self) -> Result<Connected> {
|
pub(super) fn pause_until<F>(&self, mut cond: F) -> Result<()>
|
||||||
let mut locked_incoming_conns = self.incoming_conns.lock();
|
where
|
||||||
|
F: FnMut() -> Result<()>,
|
||||||
let Some(incoming_conns) = &mut *locked_incoming_conns else {
|
{
|
||||||
return_errno_with_message!(Errno::EINVAL, "the socket is shut down for reading");
|
self.pauser.pause_until(|| match cond() {
|
||||||
};
|
Err(err) if err.error() == Errno::EAGAIN => None,
|
||||||
|
result => Some(result),
|
||||||
let conn = incoming_conns.pop_front();
|
})?
|
||||||
if incoming_conns.is_empty() {
|
|
||||||
self.pollee.del_events(IoEvents::IN);
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_backlog(&self, backlog: usize) {
|
|
||||||
self.backlog.store(backlog, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn shutdown(&self) {
|
|
||||||
let mut incoming_conns = self.incoming_conns.lock();
|
|
||||||
|
|
||||||
*incoming_conns = None;
|
|
||||||
self.pollee.add_events(IoEvents::HUP);
|
|
||||||
self.pollee.del_events(IoEvents::IN);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
|
||||||
self.pollee.poll(mask, poller)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register_observer(
|
|
||||||
&self,
|
|
||||||
observer: Weak<dyn Observer<IoEvents>>,
|
|
||||||
mask: IoEvents,
|
|
||||||
) -> Result<()> {
|
|
||||||
self.pollee.register_observer(observer, mask);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn unregister_observer(
|
|
||||||
&self,
|
|
||||||
observer: &Weak<dyn Observer<IoEvents>>,
|
|
||||||
) -> Option<Weak<dyn Observer<IoEvents>>> {
|
|
||||||
self.pollee.unregister_observer(observer)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,9 +295,11 @@ fn unregister_backlog(addr: &UnixSocketAddrKey) {
|
|||||||
BACKLOG_TABLE.remove_backlog(addr);
|
BACKLOG_TABLE.remove_backlog(addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn push_incoming(
|
pub(super) fn get_backlog(server_key: &UnixSocketAddrKey) -> Result<Arc<Backlog>> {
|
||||||
server_key: &UnixSocketAddrKey,
|
BACKLOG_TABLE.get_backlog(server_key).ok_or_else(|| {
|
||||||
init: Init,
|
Error::with_message(
|
||||||
) -> core::result::Result<Connected, (Error, Init)> {
|
Errno::ECONNREFUSED,
|
||||||
BACKLOG_TABLE.push_incoming(server_key, init)
|
"no socket is listening at the remote address",
|
||||||
|
)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -8,13 +8,13 @@ use takeable::Takeable;
|
|||||||
use super::{
|
use super::{
|
||||||
connected::Connected,
|
connected::Connected,
|
||||||
init::Init,
|
init::Init,
|
||||||
listener::{push_incoming, Listener},
|
listener::{get_backlog, Backlog, Listener},
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
events::{IoEvents, Observer},
|
events::{IoEvents, Observer},
|
||||||
fs::{file_handle::FileLike, utils::StatusFlags},
|
fs::{file_handle::FileLike, utils::StatusFlags},
|
||||||
net::socket::{
|
net::socket::{
|
||||||
unix::{addr::UnixSocketAddrKey, UnixSocketAddr},
|
unix::UnixSocketAddr,
|
||||||
util::{
|
util::{
|
||||||
copy_message_from_user, copy_message_to_user, create_message_buffer,
|
copy_message_from_user, copy_message_to_user, create_message_buffer,
|
||||||
send_recv_flags::SendRecvFlags, socket_addr::SocketAddr, MessageHeader,
|
send_recv_flags::SendRecvFlags, socket_addr::SocketAddr, MessageHeader,
|
||||||
@ -23,7 +23,6 @@ use crate::{
|
|||||||
},
|
},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
process::signal::{Pollable, Poller},
|
process::signal::{Pollable, Poller},
|
||||||
thread::Thread,
|
|
||||||
util::IoVec,
|
util::IoVec,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -101,7 +100,7 @@ impl UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_connect(&self, remote_addr: &UnixSocketAddrKey) -> Result<()> {
|
fn try_connect(&self, backlog: &Arc<Backlog>) -> Result<()> {
|
||||||
let mut state = self.state.write();
|
let mut state = self.state.write();
|
||||||
|
|
||||||
state.borrow_result(|owned_state| {
|
state.borrow_result(|owned_state| {
|
||||||
@ -127,7 +126,7 @@ impl UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let connected = match push_incoming(remote_addr, init) {
|
let connected = match backlog.push_incoming(init) {
|
||||||
Ok(connected) => connected,
|
Ok(connected) => connected,
|
||||||
Err((err, init)) => return (State::Init(init), Err(err)),
|
Err((err, init)) => return (State::Init(init), Err(err)),
|
||||||
};
|
};
|
||||||
@ -239,29 +238,23 @@ impl Socket for UnixStreamSocket {
|
|||||||
|
|
||||||
fn connect(&self, socket_addr: SocketAddr) -> Result<()> {
|
fn connect(&self, socket_addr: SocketAddr) -> Result<()> {
|
||||||
let remote_addr = UnixSocketAddr::try_from(socket_addr)?.connect()?;
|
let remote_addr = UnixSocketAddr::try_from(socket_addr)?.connect()?;
|
||||||
|
let backlog = get_backlog(&remote_addr)?;
|
||||||
|
|
||||||
// Note that the Linux kernel implementation locks the remote socket and checks to see if
|
if self.is_nonblocking() {
|
||||||
// it is listening first. This is different from our implementation, which locks the local
|
self.try_connect(&backlog)
|
||||||
// socket and checks the state of the local socket first.
|
} else {
|
||||||
//
|
backlog.pause_until(|| self.try_connect(&backlog))
|
||||||
// The difference may result in different error codes, but it's doubtful that this will
|
|
||||||
// ever lead to real problems.
|
|
||||||
//
|
|
||||||
// See also <https://elixir.bootlin.com/linux/v6.10.4/source/net/unix/af_unix.c#L1527>.
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let res = self.try_connect(&remote_addr);
|
|
||||||
|
|
||||||
if !res.is_err_and(|err| err.error() == Errno::EAGAIN) {
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: Add `Pauser` in `Backlog` and use it to avoid this `Thread::yield_now`.
|
|
||||||
Thread::yield_now();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listen(&self, backlog: usize) -> Result<()> {
|
fn listen(&self, backlog: usize) -> Result<()> {
|
||||||
|
const SOMAXCONN: usize = 4096;
|
||||||
|
|
||||||
|
// Linux allows a maximum of `backlog + 1` sockets in the backlog queue. Although this
|
||||||
|
// seems to be mostly an implementation detail, we follow the exact Linux behavior to
|
||||||
|
// ensure that our regression tests pass with the Linux kernel.
|
||||||
|
let backlog = backlog.saturating_add(1).min(SOMAXCONN);
|
||||||
|
|
||||||
let mut state = self.state.write();
|
let mut state = self.state.write();
|
||||||
|
|
||||||
state.borrow_result(|owned_state| {
|
state.borrow_result(|owned_state| {
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <sys/poll.h>
|
#include <sys/poll.h>
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
#include <sys/wait.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
@ -339,6 +340,67 @@ FN_TEST(recv)
|
|||||||
}
|
}
|
||||||
END_TEST()
|
END_TEST()
|
||||||
|
|
||||||
|
FN_TEST(blocking_connect)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
int sk, sks[4];
|
||||||
|
int pid;
|
||||||
|
|
||||||
|
// Setup
|
||||||
|
|
||||||
|
sk = TEST_SUCC(socket(PF_UNIX, SOCK_STREAM, 0));
|
||||||
|
TEST_SUCC(
|
||||||
|
bind(sk, (struct sockaddr *)&UNIX_ADDR("\0"), PATH_OFFSET + 1));
|
||||||
|
TEST_SUCC(listen(sk, 2));
|
||||||
|
|
||||||
|
for (i = 0; i < 3; ++i) {
|
||||||
|
sks[i] = TEST_SUCC(
|
||||||
|
socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0));
|
||||||
|
TEST_SUCC(connect(sks[i], (struct sockaddr *)&UNIX_ADDR("\0"),
|
||||||
|
PATH_OFFSET + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
#define MAKE_TEST(child, parent, errno) \
|
||||||
|
sks[i] = TEST_SUCC(socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0)); \
|
||||||
|
TEST_ERRNO(connect(sks[i], (struct sockaddr *)&UNIX_ADDR("\0"), \
|
||||||
|
PATH_OFFSET + 1), \
|
||||||
|
EAGAIN); \
|
||||||
|
TEST_SUCC(close(sks[i])); \
|
||||||
|
\
|
||||||
|
pid = TEST_SUCC(fork()); \
|
||||||
|
if (pid == 0) { \
|
||||||
|
usleep(300 * 1000); \
|
||||||
|
CHECK(child); \
|
||||||
|
exit(0); \
|
||||||
|
} \
|
||||||
|
TEST_SUCC(parent); \
|
||||||
|
\
|
||||||
|
sks[i] = TEST_SUCC(socket(PF_UNIX, SOCK_STREAM, 0)); \
|
||||||
|
TEST_ERRNO(connect(sks[i], (struct sockaddr *)&UNIX_ADDR("\0"), \
|
||||||
|
PATH_OFFSET + 1), \
|
||||||
|
errno); \
|
||||||
|
\
|
||||||
|
TEST_SUCC(close(sks[i])); \
|
||||||
|
TEST_SUCC(wait(NULL));
|
||||||
|
|
||||||
|
// Test 1: Accepting a connection resumes the blocked connection request
|
||||||
|
MAKE_TEST(accept(sk, NULL, NULL), 0, 0);
|
||||||
|
|
||||||
|
// Test 2: Resetting the backlog resumes the blocked connection request
|
||||||
|
MAKE_TEST(listen(sk, 3), 0, 0);
|
||||||
|
|
||||||
|
// Test 3: Closing the listener resumes the blocked connection request
|
||||||
|
MAKE_TEST(close(sk), close(sk), ECONNREFUSED);
|
||||||
|
|
||||||
|
#undef MAKE_TEST
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
|
||||||
|
for (i = 0; i < 3; ++i)
|
||||||
|
TEST_SUCC(close(sks[i]));
|
||||||
|
}
|
||||||
|
END_TEST()
|
||||||
|
|
||||||
FN_TEST(ns_path)
|
FN_TEST(ns_path)
|
||||||
{
|
{
|
||||||
int fd;
|
int fd;
|
||||||
|
Reference in New Issue
Block a user