Use Connected to replace Endpoint

This commit is contained in:
Ruihan Li
2024-08-15 12:22:16 +08:00
committed by Tate, Hongliang Tian
parent 639eedb2e1
commit e3307a6945
6 changed files with 106 additions and 173 deletions

View File

@ -1,44 +1,92 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use super::endpoint::Endpoint;
use crate::{ use crate::{
events::{IoEvents, Observer}, events::{IoEvents, Observer},
fs::utils::{Channel, Consumer, Producer},
net::socket::{unix::addr::UnixSocketAddrBound, SockShutdownCmd}, net::socket::{unix::addr::UnixSocketAddrBound, SockShutdownCmd},
prelude::*, prelude::*,
process::signal::Poller, process::signal::Poller,
}; };
pub(super) struct Connected { pub(super) struct Connected {
local_endpoint: Endpoint, addr: Option<UnixSocketAddrBound>,
peer_addr: Option<UnixSocketAddrBound>,
reader: Consumer<u8>,
writer: Producer<u8>,
} }
impl Connected { impl Connected {
pub(super) fn new(local_endpoint: Endpoint) -> Self { pub(super) fn new_pair(
Connected { local_endpoint } addr: Option<UnixSocketAddrBound>,
peer_addr: Option<UnixSocketAddrBound>,
) -> (Connected, Connected) {
let (writer_this, reader_peer) = Channel::new(DAFAULT_BUF_SIZE).split();
let (writer_peer, reader_this) = Channel::new(DAFAULT_BUF_SIZE).split();
let this = Connected {
addr: addr.clone(),
peer_addr: peer_addr.clone(),
reader: reader_this,
writer: writer_this,
};
let peer = Connected {
addr: peer_addr,
peer_addr: addr,
reader: reader_peer,
writer: writer_peer,
};
(this, peer)
} }
pub(super) fn addr(&self) -> Option<&UnixSocketAddrBound> { pub(super) fn addr(&self) -> Option<&UnixSocketAddrBound> {
self.local_endpoint.addr() self.addr.as_ref()
} }
pub(super) fn peer_addr(&self) -> Option<&UnixSocketAddrBound> { pub(super) fn peer_addr(&self) -> Option<&UnixSocketAddrBound> {
self.local_endpoint.peer_addr() self.peer_addr.as_ref()
}
pub(super) fn try_write(&self, buf: &[u8]) -> Result<usize> {
self.local_endpoint.try_write(buf)
} }
pub(super) fn try_read(&self, buf: &mut [u8]) -> Result<usize> { pub(super) fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
self.local_endpoint.try_read(buf) self.reader.try_read(buf)
}
pub(super) fn try_write(&self, buf: &[u8]) -> Result<usize> {
self.writer.try_write(buf)
} }
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
self.local_endpoint.shutdown(cmd) // FIXME: If the socket has already been shut down, should we return an error code?
if cmd.shut_read() {
self.reader.shutdown();
} }
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents { if cmd.shut_write() {
self.local_endpoint.poll(mask, poller) self.writer.shutdown();
}
Ok(())
}
pub(super) fn poll(&self, mask: IoEvents, mut poller: Option<&mut Poller>) -> IoEvents {
let mut events = IoEvents::empty();
// FIXME: should reader and writer use the same mask?
let reader_events = self.reader.poll(mask, poller.as_deref_mut());
let writer_events = self.writer.poll(mask, poller);
// FIXME: Check this logic later.
if reader_events.contains(IoEvents::HUP) || self.reader.is_shutdown() {
events |= IoEvents::RDHUP | IoEvents::IN;
if writer_events.contains(IoEvents::ERR) || self.writer.is_shutdown() {
events |= IoEvents::HUP | IoEvents::OUT;
}
}
events |= (reader_events & IoEvents::IN) | (writer_events & IoEvents::OUT);
events
} }
pub(super) fn register_observer( pub(super) fn register_observer(
@ -46,13 +94,32 @@ impl Connected {
observer: Weak<dyn Observer<IoEvents>>, observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents, mask: IoEvents,
) -> Result<()> { ) -> Result<()> {
self.local_endpoint.register_observer(observer, mask) if mask.contains(IoEvents::IN) {
self.reader.register_observer(observer.clone(), mask)?
}
if mask.contains(IoEvents::OUT) {
self.writer.register_observer(observer, mask)?
}
Ok(())
} }
pub(super) fn unregister_observer( pub(super) fn unregister_observer(
&self, &self,
observer: &Weak<dyn Observer<IoEvents>>, observer: &Weak<dyn Observer<IoEvents>>,
) -> Option<Weak<dyn Observer<IoEvents>>> { ) -> Option<Weak<dyn Observer<IoEvents>>> {
self.local_endpoint.unregister_observer(observer) let observer0 = self.reader.unregister_observer(observer);
let observer1 = self.writer.unregister_observer(observer);
if observer0.is_some() {
observer0
} else if observer1.is_some() {
observer1
} else {
None
} }
} }
}
const DAFAULT_BUF_SIZE: usize = 65536;

View File

@ -1,125 +0,0 @@
// SPDX-License-Identifier: MPL-2.0
use crate::{
events::{IoEvents, Observer},
fs::utils::{Channel, Consumer, Producer},
net::socket::{unix::addr::UnixSocketAddrBound, SockShutdownCmd},
prelude::*,
process::signal::Poller,
};
pub(super) struct Endpoint {
addr: Option<UnixSocketAddrBound>,
peer_addr: Option<UnixSocketAddrBound>,
reader: Consumer<u8>,
writer: Producer<u8>,
}
impl Endpoint {
pub(super) fn new_pair(
addr: Option<UnixSocketAddrBound>,
peer_addr: Option<UnixSocketAddrBound>,
) -> (Endpoint, Endpoint) {
let (writer_this, reader_peer) = Channel::new(DAFAULT_BUF_SIZE).split();
let (writer_peer, reader_this) = Channel::new(DAFAULT_BUF_SIZE).split();
let this = Endpoint {
addr: addr.clone(),
peer_addr: peer_addr.clone(),
reader: reader_this,
writer: writer_this,
};
let peer = Endpoint {
addr: peer_addr,
peer_addr: addr,
reader: reader_peer,
writer: writer_peer,
};
(this, peer)
}
pub(super) fn addr(&self) -> Option<&UnixSocketAddrBound> {
self.addr.as_ref()
}
pub(super) fn peer_addr(&self) -> Option<&UnixSocketAddrBound> {
self.peer_addr.as_ref()
}
pub(super) fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
self.reader.try_read(buf)
}
pub(super) fn try_write(&self, buf: &[u8]) -> Result<usize> {
self.writer.try_write(buf)
}
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
// FIXME: If the socket has already been shut down, should we return an error code?
if cmd.shut_read() {
self.reader.shutdown();
}
if cmd.shut_write() {
self.writer.shutdown();
}
Ok(())
}
pub(super) fn poll(&self, mask: IoEvents, mut poller: Option<&mut Poller>) -> IoEvents {
let mut events = IoEvents::empty();
// FIXME: should reader and writer use the same mask?
let reader_events = self.reader.poll(mask, poller.as_deref_mut());
let writer_events = self.writer.poll(mask, poller);
// FIXME: Check this logic later.
if reader_events.contains(IoEvents::HUP) || self.reader.is_shutdown() {
events |= IoEvents::RDHUP | IoEvents::IN;
if writer_events.contains(IoEvents::ERR) || self.writer.is_shutdown() {
events |= IoEvents::HUP | IoEvents::OUT;
}
}
events |= (reader_events & IoEvents::IN) | (writer_events & IoEvents::OUT);
events
}
pub(super) fn register_observer(
&self,
observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents,
) -> Result<()> {
if mask.contains(IoEvents::IN) {
self.reader.register_observer(observer.clone(), mask)?
}
if mask.contains(IoEvents::OUT) {
self.writer.register_observer(observer, mask)?
}
Ok(())
}
pub(super) fn unregister_observer(
&self,
observer: &Weak<dyn Observer<IoEvents>>,
) -> Option<Weak<dyn Observer<IoEvents>>> {
let observer0 = self.reader.unregister_observer(observer);
let observer1 = self.writer.unregister_observer(observer);
if observer0.is_some() {
observer0
} else if observer1.is_some() {
observer1
} else {
None
}
}
}
const DAFAULT_BUF_SIZE: usize = 65536;

View File

@ -45,8 +45,7 @@ impl Init {
} }
pub(super) fn connect(&self, remote_addr: &UnixSocketAddrBound) -> Result<Connected> { pub(super) fn connect(&self, remote_addr: &UnixSocketAddrBound) -> Result<Connected> {
let endpoint = push_incoming(remote_addr, self.addr.clone())?; push_incoming(remote_addr, self.addr.clone())
Ok(Connected::new(endpoint))
} }
pub(super) fn addr(&self) -> Option<&UnixSocketAddrBound> { pub(super) fn addr(&self) -> Option<&UnixSocketAddrBound> {

View File

@ -4,7 +4,7 @@ use core::sync::atomic::{AtomicUsize, Ordering};
use keyable_arc::KeyableWeak; use keyable_arc::KeyableWeak;
use super::{connected::Connected, endpoint::Endpoint, UnixStreamSocket}; use super::{connected::Connected, UnixStreamSocket};
use crate::{ use crate::{
events::{IoEvents, Observer}, events::{IoEvents, Observer},
fs::{file_handle::FileLike, path::Dentry, utils::Inode}, fs::{file_handle::FileLike, path::Dentry, utils::Inode},
@ -28,15 +28,10 @@ impl Listener {
} }
pub(super) fn try_accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> { pub(super) fn try_accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
let connected = { let connected = self.backlog.pop_incoming()?;
let local_endpoint = self.backlog.pop_incoming()?;
Connected::new(local_endpoint)
};
let peer_addr = connected.peer_addr().cloned().into(); let peer_addr = connected.peer_addr().cloned().into();
let socket = UnixStreamSocket::new_connected(connected, false); let socket = UnixStreamSocket::new_connected(connected, false);
Ok((socket, peer_addr)) Ok((socket, peer_addr))
} }
@ -119,7 +114,7 @@ impl BacklogTable {
&self, &self,
server_addr: &UnixSocketAddrBound, server_addr: &UnixSocketAddrBound,
client_addr: Option<UnixSocketAddrBound>, client_addr: Option<UnixSocketAddrBound>,
) -> Result<Endpoint> { ) -> Result<Connected> {
let backlog = self.get_backlog(server_addr).ok_or_else(|| { let backlog = self.get_backlog(server_addr).ok_or_else(|| {
Error::with_message( Error::with_message(
Errno::ECONNREFUSED, Errno::ECONNREFUSED,
@ -144,7 +139,7 @@ struct Backlog {
addr: UnixSocketAddrBound, addr: UnixSocketAddrBound,
pollee: Pollee, pollee: Pollee,
backlog: AtomicUsize, backlog: AtomicUsize,
incoming_endpoints: Mutex<VecDeque<Endpoint>>, incoming_conns: Mutex<VecDeque<Connected>>,
} }
impl Backlog { impl Backlog {
@ -153,7 +148,7 @@ impl Backlog {
addr, addr,
pollee: Pollee::new(IoEvents::empty()), pollee: Pollee::new(IoEvents::empty()),
backlog: AtomicUsize::new(backlog), backlog: AtomicUsize::new(backlog),
incoming_endpoints: Mutex::new(VecDeque::with_capacity(backlog)), incoming_conns: Mutex::new(VecDeque::with_capacity(backlog)),
} }
} }
@ -161,33 +156,31 @@ impl Backlog {
&self.addr &self.addr
} }
fn push_incoming(&self, client_addr: Option<UnixSocketAddrBound>) -> Result<Endpoint> { fn push_incoming(&self, client_addr: Option<UnixSocketAddrBound>) -> Result<Connected> {
let mut endpoints = self.incoming_endpoints.lock(); let mut incoming_conns = self.incoming_conns.lock();
if endpoints.len() >= self.backlog.load(Ordering::Relaxed) { if incoming_conns.len() >= self.backlog.load(Ordering::Relaxed) {
return_errno_with_message!( return_errno_with_message!(
Errno::ECONNREFUSED, Errno::ECONNREFUSED,
"the pending connection queue on the listening socket is full" "the pending connection queue on the listening socket is full"
); );
} }
let (server_endpoint, client_endpoint) = let (server_conn, client_conn) = Connected::new_pair(Some(self.addr.clone()), client_addr);
Endpoint::new_pair(Some(self.addr.clone()), client_addr); incoming_conns.push_back(server_conn);
endpoints.push_back(server_endpoint);
self.pollee.add_events(IoEvents::IN); self.pollee.add_events(IoEvents::IN);
Ok(client_endpoint) Ok(client_conn)
} }
fn pop_incoming(&self) -> Result<Endpoint> { fn pop_incoming(&self) -> Result<Connected> {
let mut incoming_endpoints = self.incoming_endpoints.lock(); let mut incoming_conns = self.incoming_conns.lock();
let endpoint = incoming_endpoints.pop_front(); let conn = incoming_conns.pop_front();
if incoming_endpoints.is_empty() { if incoming_conns.is_empty() {
self.pollee.del_events(IoEvents::IN); self.pollee.del_events(IoEvents::IN);
} }
endpoint conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
} }
fn set_backlog(&self, backlog: usize) { fn set_backlog(&self, backlog: usize) {
@ -196,7 +189,7 @@ impl Backlog {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents { fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
// Lock to avoid any events may change pollee state when we poll // Lock to avoid any events may change pollee state when we poll
let _lock = self.incoming_endpoints.lock(); let _lock = self.incoming_conns.lock();
self.pollee.poll(mask, poller) self.pollee.poll(mask, poller)
} }
@ -229,6 +222,6 @@ fn unregister_backlog(addr: &UnixSocketAddrBound) {
pub(super) fn push_incoming( pub(super) fn push_incoming(
server_addr: &UnixSocketAddrBound, server_addr: &UnixSocketAddrBound,
client_addr: Option<UnixSocketAddrBound>, client_addr: Option<UnixSocketAddrBound>,
) -> Result<Endpoint> { ) -> Result<Connected> {
BACKLOG_TABLE.push_incoming(server_addr, client_addr) BACKLOG_TABLE.push_incoming(server_addr, client_addr)
} }

View File

@ -1,7 +1,6 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
mod connected; mod connected;
mod endpoint;
mod init; mod init;
mod listener; mod listener;
mod socket; mod socket;

View File

@ -4,7 +4,7 @@ use core::sync::atomic::AtomicBool;
use atomic::Ordering; use atomic::Ordering;
use super::{connected::Connected, endpoint::Endpoint, init::Init, listener::Listener}; use super::{connected::Connected, init::Init, listener::Listener};
use crate::{ use crate::{
events::{IoEvents, Observer}, events::{IoEvents, Observer},
fs::{ fs::{
@ -59,10 +59,10 @@ impl UnixStreamSocket {
} }
pub fn new_pair(is_nonblocking: bool) -> (Arc<Self>, Arc<Self>) { pub fn new_pair(is_nonblocking: bool) -> (Arc<Self>, Arc<Self>) {
let (end_a, end_b) = Endpoint::new_pair(None, None); let (conn_a, conn_b) = Connected::new_pair(None, None);
( (
Self::new_connected(Connected::new(end_a), is_nonblocking), Self::new_connected(conn_a, is_nonblocking),
Self::new_connected(Connected::new(end_b), is_nonblocking), Self::new_connected(conn_b, is_nonblocking),
) )
} }