Add proper IO events for unbound sockets

This commit is contained in:
Ruihan Li
2024-01-09 23:42:59 +08:00
committed by Tate, Hongliang Tian
parent 8628543067
commit 9211061181
6 changed files with 23 additions and 14 deletions

View File

@ -72,6 +72,7 @@ impl DatagramSocket {
Arc::new_cyclic(|me| { Arc::new_cyclic(|me| {
let unbound_datagram = UnboundDatagram::new(me.clone() as _); let unbound_datagram = UnboundDatagram::new(me.clone() as _);
let pollee = Pollee::new(IoEvents::empty()); let pollee = Pollee::new(IoEvents::empty());
unbound_datagram.init_pollee(&pollee);
Self { Self {
inner: RwLock::new(Takeable::new(Inner::Unbound(unbound_datagram))), inner: RwLock::new(Takeable::new(Inner::Unbound(unbound_datagram))),
nonblocking: AtomicBool::new(nonblocking), nonblocking: AtomicBool::new(nonblocking),

View File

@ -4,12 +4,13 @@ use alloc::sync::Weak;
use super::bound::BoundDatagram; use super::bound::BoundDatagram;
use crate::{ use crate::{
events::Observer, events::{IoEvents, Observer},
net::{ net::{
iface::{AnyUnboundSocket, IpEndpoint, RawUdpSocket}, iface::{AnyUnboundSocket, IpEndpoint, RawUdpSocket},
socket::ip::common::bind_socket, socket::ip::common::bind_socket,
}, },
prelude::*, prelude::*,
process::signal::Pollee,
}; };
pub struct UnboundDatagram { pub struct UnboundDatagram {
@ -36,4 +37,9 @@ impl UnboundDatagram {
Ok(BoundDatagram::new(bound_socket)) Ok(BoundDatagram::new(bound_socket))
} }
pub(super) fn init_pollee(&self, pollee: &Pollee) {
pollee.reset_events();
pollee.add_events(IoEvents::OUT);
}
} }

View File

@ -4,12 +4,13 @@ use alloc::sync::Weak;
use super::{connecting::ConnectingStream, listen::ListenStream}; use super::{connecting::ConnectingStream, listen::ListenStream};
use crate::{ use crate::{
events::Observer, events::{IoEvents, Observer},
net::{ net::{
iface::{AnyBoundSocket, AnyUnboundSocket, IpEndpoint}, iface::{AnyBoundSocket, AnyUnboundSocket, IpEndpoint},
socket::ip::common::{bind_socket, get_ephemeral_endpoint}, socket::ip::common::{bind_socket, get_ephemeral_endpoint},
}, },
prelude::*, prelude::*,
process::signal::Pollee,
}; };
pub enum InitStream { pub enum InitStream {
@ -18,9 +19,6 @@ pub enum InitStream {
} }
impl InitStream { impl InitStream {
// FIXME: In Linux we have the `POLLOUT` event for a newly created socket, while calling
// `write()` on it triggers `SIGPIPE`/`EPIPE`. No documentation found yet, but confirmed by
// experimentation and Linux source code.
pub fn new(observer: Weak<dyn Observer<()>>) -> Self { pub fn new(observer: Weak<dyn Observer<()>>) -> Self {
InitStream::Unbound(Box::new(AnyUnboundSocket::new_tcp(observer))) InitStream::Unbound(Box::new(AnyUnboundSocket::new_tcp(observer)))
} }
@ -91,4 +89,9 @@ impl InitStream {
InitStream::Bound(bound_socket) => Some(bound_socket.local_endpoint().unwrap()), InitStream::Bound(bound_socket) => Some(bound_socket.local_endpoint().unwrap()),
} }
} }
pub(super) fn init_pollee(&self, pollee: &Pollee) {
pollee.reset_events();
pollee.add_events(IoEvents::OUT);
}
} }

View File

@ -82,6 +82,7 @@ impl StreamSocket {
Arc::new_cyclic(|me| { Arc::new_cyclic(|me| {
let init_stream = InitStream::new(me.clone() as _); let init_stream = InitStream::new(me.clone() as _);
let pollee = Pollee::new(IoEvents::empty()); let pollee = Pollee::new(IoEvents::empty());
init_stream.init_pollee(&pollee);
Self { Self {
options: RwLock::new(OptionSet::new()), options: RwLock::new(OptionSet::new()),
state: RwLock::new(Takeable::new(State::Init(init_stream))), state: RwLock::new(Takeable::new(State::Init(init_stream))),
@ -154,6 +155,7 @@ impl StreamSocket {
let connected_stream = match connecting_stream.into_result() { let connected_stream = match connecting_stream.into_result() {
Ok(connected_stream) => connected_stream, Ok(connected_stream) => connected_stream,
Err((err, NonConnectedStream::Init(init_stream))) => { Err((err, NonConnectedStream::Init(init_stream))) => {
init_stream.init_pollee(&self.pollee);
return (State::Init(init_stream), Err(err)); return (State::Init(init_stream), Err(err));
} }
Err((err, NonConnectedStream::Connecting(connecting_stream))) => { Err((err, NonConnectedStream::Connecting(connecting_stream))) => {

View File

@ -234,14 +234,12 @@ FN_TEST(poll)
struct pollfd pfd = { .events = POLLIN | POLLOUT }; struct pollfd pfd = { .events = POLLIN | POLLOUT };
pfd.fd = sk_unbound; pfd.fd = sk_unbound;
// FIXME: Uncomment this TEST_RES(poll(&pfd, 1, 0),
// TEST_RES(poll(&pfd, 1, 0), (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
// (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_bound; pfd.fd = sk_bound;
// FIXME: Uncomment this TEST_RES(poll(&pfd, 1, 0),
// TEST_RES(poll(&pfd, 1, 0), (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
// (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_listen; pfd.fd = sk_listen;
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & (POLLIN | POLLOUT)) == 0); TEST_RES(poll(&pfd, 1, 0), (pfd.revents & (POLLIN | POLLOUT)) == 0);

View File

@ -177,9 +177,8 @@ FN_TEST(poll)
struct pollfd pfd = { .events = POLLIN | POLLOUT }; struct pollfd pfd = { .events = POLLIN | POLLOUT };
pfd.fd = sk_unbound; pfd.fd = sk_unbound;
// FIXME: Uncomment this TEST_RES(poll(&pfd, 1, 0),
// TEST_RES(poll(&pfd, 1, 0), (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
// (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_bound; pfd.fd = sk_bound;
TEST_RES(poll(&pfd, 1, 0), TEST_RES(poll(&pfd, 1, 0),