Implement (un)register_observer for TCP/UDP

This commit is contained in:
Ruihan Li 2024-01-08 23:35:29 +08:00 committed by Tate, Hongliang Tian
parent 29ebf8e60c
commit 8628543067
8 changed files with 82 additions and 108 deletions

View File

@ -210,6 +210,24 @@ impl FileLike for DatagramSocket {
} }
Ok(()) Ok(())
} }
fn register_observer(
&self,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
self.pollee.register_observer(observer, mask);
Ok(())
}
fn unregister_observer(
&self,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
self.pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::ENOENT, "observer is not registered"))
}
} }
impl Socket for DatagramSocket { impl Socket for DatagramSocket {

View File

@ -78,26 +78,6 @@ impl ConnectedStream {
self.update_io_events(pollee); self.update_io_events(pollee);
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
pub(super) fn update_io_events(&self, pollee: &Pollee) { pub(super) fn update_io_events(&self, pollee: &Pollee) {
self.bound_socket.raw_with(|socket: &mut RawTcpSocket| { self.bound_socket.raw_with(|socket: &mut RawTcpSocket| {
if socket.can_recv() { if socket.can_recv() {

View File

@ -2,7 +2,7 @@
use super::{connected::ConnectedStream, init::InitStream}; use super::{connected::ConnectedStream, init::InitStream};
use crate::{ use crate::{
events::{IoEvents, Observer}, events::IoEvents,
net::iface::{AnyBoundSocket, IpEndpoint, RawTcpSocket}, net::iface::{AnyBoundSocket, IpEndpoint, RawTcpSocket},
prelude::*, prelude::*,
process::signal::Pollee, process::signal::Pollee,
@ -71,26 +71,6 @@ impl ConnectingStream {
self.update_io_events(pollee); self.update_io_events(pollee);
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
pub(super) fn update_io_events(&self, pollee: &Pollee) { pub(super) fn update_io_events(&self, pollee: &Pollee) {
if self.conn_result.read().is_some() { if self.conn_result.read().is_some() {
return; return;

View File

@ -4,13 +4,12 @@ use alloc::sync::Weak;
use super::{connecting::ConnectingStream, listen::ListenStream}; use super::{connecting::ConnectingStream, listen::ListenStream};
use crate::{ use crate::{
events::{IoEvents, Observer}, events::Observer,
net::{ net::{
iface::{AnyBoundSocket, AnyUnboundSocket, IpEndpoint}, iface::{AnyBoundSocket, AnyUnboundSocket, IpEndpoint},
socket::ip::common::{bind_socket, get_ephemeral_endpoint}, socket::ip::common::{bind_socket, get_ephemeral_endpoint},
}, },
prelude::*, prelude::*,
process::signal::Pollee,
}; };
pub enum InitStream { pub enum InitStream {
@ -92,24 +91,4 @@ impl InitStream {
InitStream::Bound(bound_socket) => Some(bound_socket.local_endpoint().unwrap()), InitStream::Bound(bound_socket) => Some(bound_socket.local_endpoint().unwrap()),
} }
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
} }

View File

@ -3,7 +3,7 @@ use smoltcp::socket::tcp::ListenError;
use super::connected::ConnectedStream; use super::connected::ConnectedStream;
use crate::{ use crate::{
events::{IoEvents, Observer}, events::IoEvents,
net::iface::{AnyBoundSocket, AnyUnboundSocket, BindPortConfig, IpEndpoint, RawTcpSocket}, net::iface::{AnyBoundSocket, AnyUnboundSocket, BindPortConfig, IpEndpoint, RawTcpSocket},
prelude::*, prelude::*,
process::signal::Pollee, process::signal::Pollee,
@ -95,26 +95,6 @@ impl ListenStream {
pollee.del_events(IoEvents::IN); pollee.del_events(IoEvents::IN);
} }
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
} }
struct BacklogSocket { struct BacklogSocket {

View File

@ -296,32 +296,20 @@ impl FileLike for StreamSocket {
fn register_observer( fn register_observer(
&self, &self,
observer: Weak<dyn crate::events::Observer<IoEvents>>, observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents, mask: IoEvents,
) -> Result<()> { ) -> Result<()> {
let state = self.state.read(); self.pollee.register_observer(observer, mask);
match state.as_ref() { Ok(())
State::Init(init) => init.register_observer(&self.pollee, observer, mask),
State::Connecting(connecting) => {
connecting.register_observer(&self.pollee, observer, mask)
}
State::Connected(connected) => {
connected.register_observer(&self.pollee, observer, mask)
}
State::Listen(listen) => listen.register_observer(&self.pollee, observer, mask),
}
} }
fn unregister_observer( fn unregister_observer(
&self, &self,
observer: &Weak<dyn crate::events::Observer<IoEvents>>, observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn crate::events::Observer<IoEvents>>> { ) -> Result<Weak<dyn Observer<IoEvents>>> {
match self.state.read().as_ref() { self.pollee
State::Init(init) => init.unregister_observer(&self.pollee, observer), .unregister_observer(observer)
State::Connecting(connecting) => connecting.unregister_observer(&self.pollee, observer), .ok_or_else(|| Error::with_message(Errno::ENOENT, "observer is not registered"))
State::Connected(connected) => connected.unregister_observer(&self.pollee, observer),
State::Listen(listen) => listen.unregister_observer(&self.pollee, observer),
}
} }
} }

View File

@ -3,6 +3,7 @@
#include <unistd.h> #include <unistd.h>
#include <sys/signal.h> #include <sys/signal.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/poll.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
@ -70,11 +71,12 @@ FN_SETUP(accpected)
{ {
struct sockaddr addr; struct sockaddr addr;
socklen_t addrlen = sizeof(addr); socklen_t addrlen = sizeof(addr);
struct pollfd pfd = { .fd = sk_listen, .events = POLLIN };
do { CHECK_WITH(poll(&pfd, 1, 1000),
sk_accepted = CHECK_WITH(accept(sk_listen, &addr, &addrlen), _ret >= 0 && ((pfd.revents & (POLLIN | POLLOUT)) & POLLIN));
_ret >= 0 || errno == EAGAIN);
} while (sk_accepted < 0); sk_accepted = CHECK(accept(sk_listen, &addr, &addrlen));
} }
END_SETUP() END_SETUP()
@ -226,3 +228,30 @@ FN_TEST(accept)
TEST_ERRNO(accept(sk_accepted, psaddr, &addrlen), EINVAL); TEST_ERRNO(accept(sk_accepted, psaddr, &addrlen), EINVAL);
} }
END_TEST() END_TEST()
FN_TEST(poll)
{
struct pollfd pfd = { .events = POLLIN | POLLOUT };
pfd.fd = sk_unbound;
// FIXME: Uncomment this
// TEST_RES(poll(&pfd, 1, 0),
// (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_bound;
// FIXME: Uncomment this
// TEST_RES(poll(&pfd, 1, 0),
// (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_listen;
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & (POLLIN | POLLOUT)) == 0);
pfd.fd = sk_connected;
TEST_RES(poll(&pfd, 1, 0),
(pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_accepted;
TEST_RES(poll(&pfd, 1, 0),
(pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
}
END_TEST()

View File

@ -3,6 +3,7 @@
#include <unistd.h> #include <unistd.h>
#include <sys/signal.h> #include <sys/signal.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/poll.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
@ -170,3 +171,22 @@ FN_TEST(accept)
TEST_ERRNO(accept(sk_connected, psaddr, &addrlen), EOPNOTSUPP); TEST_ERRNO(accept(sk_connected, psaddr, &addrlen), EOPNOTSUPP);
} }
END_TEST() END_TEST()
FN_TEST(poll)
{
struct pollfd pfd = { .events = POLLIN | POLLOUT };
pfd.fd = sk_unbound;
// FIXME: Uncomment this
// TEST_RES(poll(&pfd, 1, 0),
// (pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_bound;
TEST_RES(poll(&pfd, 1, 0),
(pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
pfd.fd = sk_connected;
TEST_RES(poll(&pfd, 1, 0),
(pfd.revents & (POLLIN | POLLOUT)) == POLLOUT);
}
END_TEST()