Store Arc<Backlog> in UNIX listeners

This commit is contained in:
Ruihan Li 2024-07-27 09:27:56 +08:00 committed by Tate, Hongliang Tian
parent 994f3727d8
commit 98e2a99195

View File

@ -12,24 +12,22 @@ use crate::{
}; };
pub(super) struct Listener { pub(super) struct Listener {
addr: UnixSocketAddrBound, backlog: Arc<Backlog>,
} }
impl Listener { impl Listener {
pub(super) fn new(addr: UnixSocketAddrBound, backlog: usize) -> Result<Self> { pub(super) fn new(addr: UnixSocketAddrBound, backlog: usize) -> Result<Self> {
BACKLOG_TABLE.add_backlog(&addr, backlog)?; let backlog = BACKLOG_TABLE.add_backlog(addr, backlog)?;
Ok(Self { addr }) Ok(Self { backlog })
} }
pub(super) fn addr(&self) -> &UnixSocketAddrBound { pub(super) fn addr(&self) -> &UnixSocketAddrBound {
&self.addr self.backlog.addr()
} }
pub(super) fn try_accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> { pub(super) fn try_accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
let addr = self.addr().clone();
let connected = { let connected = {
let local_endpoint = BACKLOG_TABLE.pop_incoming(&addr)?; let local_endpoint = self.backlog.pop_incoming()?;
Connected::new(local_endpoint) Connected::new(local_endpoint)
}; };
@ -41,9 +39,7 @@ impl Listener {
} }
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents { pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
let addr = self.addr(); self.backlog.poll(mask, poller)
let backlog = BACKLOG_TABLE.get_backlog(addr).unwrap();
backlog.poll(mask, poller)
} }
pub(super) fn register_observer( pub(super) fn register_observer(
@ -51,18 +47,14 @@ impl Listener {
observer: Weak<dyn Observer<IoEvents>>, observer: Weak<dyn Observer<IoEvents>>,
mask: IoEvents, mask: IoEvents,
) -> Result<()> { ) -> Result<()> {
let addr = self.addr(); self.backlog.register_observer(observer, mask)
let backlog = BACKLOG_TABLE.get_backlog(addr)?;
backlog.register_observer(observer, mask)
} }
pub(super) fn unregister_observer( pub(super) fn unregister_observer(
&self, &self,
observer: &Weak<dyn Observer<IoEvents>>, observer: &Weak<dyn Observer<IoEvents>>,
) -> Option<Weak<dyn Observer<IoEvents>>> { ) -> Option<Weak<dyn Observer<IoEvents>>> {
let addr = self.addr(); self.backlog.unregister_observer(observer)
let backlog = BACKLOG_TABLE.get_backlog(addr).ok()?;
backlog.unregister_observer(observer)
} }
} }
@ -80,9 +72,9 @@ impl BacklogTable {
} }
} }
fn add_backlog(&self, addr: &UnixSocketAddrBound, backlog: usize) -> Result<()> { fn add_backlog(&self, addr: UnixSocketAddrBound, backlog: usize) -> Result<Arc<Backlog>> {
let inode = { let inode = {
let UnixSocketAddrBound::Path(_, dentry) = addr else { let UnixSocketAddrBound::Path(_, ref dentry) = addr else {
todo!() todo!()
}; };
create_keyable_inode(dentry) create_keyable_inode(dentry)
@ -92,9 +84,9 @@ impl BacklogTable {
if backlog_sockets.contains_key(&inode) { if backlog_sockets.contains_key(&inode) {
return_errno_with_message!(Errno::EADDRINUSE, "the addr is already used"); return_errno_with_message!(Errno::EADDRINUSE, "the addr is already used");
} }
let new_backlog = Arc::new(Backlog::new(backlog)); let new_backlog = Arc::new(Backlog::new(addr, backlog));
backlog_sockets.insert(inode, new_backlog); backlog_sockets.insert(inode, new_backlog.clone());
Ok(()) Ok(new_backlog)
} }
fn get_backlog(&self, addr: &UnixSocketAddrBound) -> Result<Arc<Backlog>> { fn get_backlog(&self, addr: &UnixSocketAddrBound) -> Result<Arc<Backlog>> {
@ -112,16 +104,6 @@ impl BacklogTable {
.ok_or_else(|| Error::with_message(Errno::EINVAL, "the socket is not listened")) .ok_or_else(|| Error::with_message(Errno::EINVAL, "the socket is not listened"))
} }
fn pop_incoming(&self, addr: &UnixSocketAddrBound) -> Result<Endpoint> {
let backlog = self.get_backlog(addr)?;
if let Some(endpoint) = backlog.pop_incoming() {
Ok(endpoint)
} else {
return_errno_with_message!(Errno::EAGAIN, "no pending connection is available")
}
}
fn push_incoming(&self, addr: &UnixSocketAddrBound, endpoint: Endpoint) -> Result<()> { fn push_incoming(&self, addr: &UnixSocketAddrBound, endpoint: Endpoint) -> Result<()> {
let backlog = self.get_backlog(addr).map_err(|_| { let backlog = self.get_backlog(addr).map_err(|_| {
Error::with_message( Error::with_message(
@ -144,20 +126,26 @@ impl BacklogTable {
} }
struct Backlog { struct Backlog {
addr: UnixSocketAddrBound,
pollee: Pollee, pollee: Pollee,
backlog: usize, backlog: usize,
incoming_endpoints: Mutex<VecDeque<Endpoint>>, incoming_endpoints: Mutex<VecDeque<Endpoint>>,
} }
impl Backlog { impl Backlog {
fn new(backlog: usize) -> Self { fn new(addr: UnixSocketAddrBound, backlog: usize) -> Self {
Self { Self {
addr,
pollee: Pollee::new(IoEvents::empty()), pollee: Pollee::new(IoEvents::empty()),
backlog, backlog,
incoming_endpoints: Mutex::new(VecDeque::with_capacity(backlog)), incoming_endpoints: Mutex::new(VecDeque::with_capacity(backlog)),
} }
} }
fn addr(&self) -> &UnixSocketAddrBound {
&self.addr
}
fn push_incoming(&self, endpoint: Endpoint) -> Result<()> { fn push_incoming(&self, endpoint: Endpoint) -> Result<()> {
let mut endpoints = self.incoming_endpoints.lock(); let mut endpoints = self.incoming_endpoints.lock();
if endpoints.len() >= self.backlog { if endpoints.len() >= self.backlog {
@ -168,13 +156,14 @@ impl Backlog {
Ok(()) Ok(())
} }
fn pop_incoming(&self) -> Option<Endpoint> { fn pop_incoming(&self) -> Result<Endpoint> {
let mut incoming_endpoints = self.incoming_endpoints.lock(); let mut incoming_endpoints = self.incoming_endpoints.lock();
let endpoint = incoming_endpoints.pop_front(); let endpoint = incoming_endpoints.pop_front();
if incoming_endpoints.is_empty() { if incoming_endpoints.is_empty() {
self.pollee.del_events(IoEvents::IN); self.pollee.del_events(IoEvents::IN);
} }
endpoint endpoint
.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
} }
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents { fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {