Fix observer missing in socket for mongoose support

This commit is contained in:
Fabing Li
2024-03-25 16:01:26 +08:00
committed by Tate, Hongliang Tian
parent a2e9b0aaae
commit 30a2553a70
8 changed files with 122 additions and 4 deletions

View File

@ -68,6 +68,26 @@ impl ConnectedStream {
self.update_io_events(pollee); self.update_io_events(pollee);
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
pub(super) fn update_io_events(&self, pollee: &Pollee) { pub(super) fn update_io_events(&self, pollee: &Pollee) {
self.bound_socket.raw_with(|socket: &mut RawTcpSocket| { self.bound_socket.raw_with(|socket: &mut RawTcpSocket| {
if socket.can_recv() { if socket.can_recv() {

View File

@ -2,7 +2,7 @@
use super::{connected::ConnectedStream, init::InitStream}; use super::{connected::ConnectedStream, init::InitStream};
use crate::{ use crate::{
events::IoEvents, events::{IoEvents, Observer},
net::iface::{AnyBoundSocket, IpEndpoint, RawTcpSocket}, net::iface::{AnyBoundSocket, IpEndpoint, RawTcpSocket},
prelude::*, prelude::*,
process::signal::Pollee, process::signal::Pollee,
@ -71,6 +71,26 @@ impl ConnectingStream {
self.update_io_events(pollee); self.update_io_events(pollee);
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
pub(super) fn update_io_events(&self, pollee: &Pollee) { pub(super) fn update_io_events(&self, pollee: &Pollee) {
if self.conn_result.read().is_some() { if self.conn_result.read().is_some() {
return; return;

View File

@ -4,12 +4,13 @@ use alloc::sync::Weak;
use super::{connecting::ConnectingStream, listen::ListenStream}; use super::{connecting::ConnectingStream, listen::ListenStream};
use crate::{ use crate::{
events::Observer, events::{IoEvents, Observer},
net::{ net::{
iface::{AnyBoundSocket, AnyUnboundSocket, IpEndpoint}, iface::{AnyBoundSocket, AnyUnboundSocket, IpEndpoint},
socket::ip::common::{bind_socket, get_ephemeral_endpoint}, socket::ip::common::{bind_socket, get_ephemeral_endpoint},
}, },
prelude::*, prelude::*,
process::signal::Pollee,
}; };
pub enum InitStream { pub enum InitStream {
@ -90,4 +91,24 @@ impl InitStream {
InitStream::Bound(bound_socket) => Ok(bound_socket.local_endpoint().unwrap()), InitStream::Bound(bound_socket) => Ok(bound_socket.local_endpoint().unwrap()),
} }
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
} }

View File

@ -2,7 +2,7 @@
use super::connected::ConnectedStream; use super::connected::ConnectedStream;
use crate::{ use crate::{
events::IoEvents, events::{IoEvents, Observer},
net::iface::{AnyBoundSocket, AnyUnboundSocket, BindPortConfig, IpEndpoint, RawTcpSocket}, net::iface::{AnyBoundSocket, AnyUnboundSocket, BindPortConfig, IpEndpoint, RawTcpSocket},
prelude::*, prelude::*,
process::signal::Pollee, process::signal::Pollee,
@ -92,6 +92,26 @@ impl ListenStream {
pollee.del_events(IoEvents::IN); pollee.del_events(IoEvents::IN);
} }
} }
pub fn register_observer(
&self,
pollee: &Pollee,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
pollee.register_observer(observer, mask);
Ok(())
}
pub fn unregister_observer(
&self,
pollee: &Pollee,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Result<Weak<dyn Observer<IoEvents>>> {
pollee
.unregister_observer(observer)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "fails to unregister observer"))
}
} }
struct BacklogSocket { struct BacklogSocket {

View File

@ -275,6 +275,36 @@ impl FileLike for StreamSocket {
fn as_socket(self: Arc<Self>) -> Option<Arc<dyn Socket>> { fn as_socket(self: Arc<Self>) -> Option<Arc<dyn Socket>> {
Some(self) Some(self)
} }
fn register_observer(
&self,
observer: Weak<dyn crate::events::Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
let state = self.state.read();
match state.as_ref() {
State::Init(init) => init.register_observer(&self.pollee, observer, mask),
State::Connecting(connecting) => {
connecting.register_observer(&self.pollee, observer, mask)
}
State::Connected(connected) => {
connected.register_observer(&self.pollee, observer, mask)
}
State::Listen(listen) => listen.register_observer(&self.pollee, observer, mask),
}
}
fn unregister_observer(
&self,
observer: &Weak<dyn crate::events::Observer<IoEvents>>,
) -> Result<Weak<dyn crate::events::Observer<IoEvents>>> {
match self.state.read().as_ref() {
State::Init(init) => init.unregister_observer(&self.pollee, observer),
State::Connecting(connecting) => connecting.unregister_observer(&self.pollee, observer),
State::Connected(connected) => connected.unregister_observer(&self.pollee, observer),
State::Listen(listen) => listen.unregister_observer(&self.pollee, observer),
}
}
} }
impl Socket for StreamSocket { impl Socket for StreamSocket {

View File

@ -19,4 +19,5 @@ impl_socket_options!(
pub struct RecvBuf(u32); pub struct RecvBuf(u32);
pub struct Error(Option<crate::error::Error>); pub struct Error(Option<crate::error::Error>);
pub struct Linger(LingerOption); pub struct Linger(LingerOption);
pub struct KeepAlive(bool);
); );

View File

@ -17,6 +17,7 @@ pub struct SocketOptionSet {
send_buf: u32, send_buf: u32,
recv_buf: u32, recv_buf: u32,
linger: LingerOption, linger: LingerOption,
keep_alive: bool,
} }
impl SocketOptionSet { impl SocketOptionSet {
@ -29,6 +30,7 @@ impl SocketOptionSet {
send_buf: SEND_BUF_LEN as u32, send_buf: SEND_BUF_LEN as u32,
recv_buf: RECV_BUF_LEN as u32, recv_buf: RECV_BUF_LEN as u32,
linger: LingerOption::default(), linger: LingerOption::default(),
keep_alive: false,
} }
} }
} }

View File

@ -5,7 +5,9 @@ use aster_rights::Full;
use super::RawSocketOption; use super::RawSocketOption;
use crate::{ use crate::{
impl_raw_sock_option_get_only, impl_raw_socket_option, impl_raw_sock_option_get_only, impl_raw_socket_option,
net::socket::options::{Error, Linger, RecvBuf, ReuseAddr, ReusePort, SendBuf, SocketOption}, net::socket::options::{
Error, KeepAlive, Linger, RecvBuf, ReuseAddr, ReusePort, SendBuf, SocketOption,
},
prelude::*, prelude::*,
vm::vmar::Vmar, vm::vmar::Vmar,
}; };
@ -48,6 +50,7 @@ pub fn new_socket_option(name: i32) -> Result<Box<dyn RawSocketOption>> {
CSocketOptionName::ERROR => Ok(Box::new(Error::new())), CSocketOptionName::ERROR => Ok(Box::new(Error::new())),
CSocketOptionName::REUSEPORT => Ok(Box::new(ReusePort::new())), CSocketOptionName::REUSEPORT => Ok(Box::new(ReusePort::new())),
CSocketOptionName::LINGER => Ok(Box::new(Linger::new())), CSocketOptionName::LINGER => Ok(Box::new(Linger::new())),
CSocketOptionName::KEEPALIVE => Ok(Box::new(KeepAlive::new())),
_ => todo!(), _ => todo!(),
} }
} }
@ -58,3 +61,4 @@ impl_raw_socket_option!(ReuseAddr);
impl_raw_sock_option_get_only!(Error); impl_raw_sock_option_get_only!(Error);
impl_raw_socket_option!(ReusePort); impl_raw_socket_option!(ReusePort);
impl_raw_socket_option!(Linger); impl_raw_socket_option!(Linger);
impl_raw_socket_option!(KeepAlive);