Use (fd, file) as the key

This commit is contained in:
Ruihan Li
2024-09-01 20:55:11 +08:00
committed by Tate, Hongliang Tian
parent d70ae181b5
commit 479d98c8b9
4 changed files with 226 additions and 113 deletions

View File

@ -315,6 +315,12 @@ impl<T: ?Sized> From<KeyableWeak<T>> for Weak<T> {
}
}
impl<T: ?Sized> Clone for KeyableWeak<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: ?Sized + fmt::Debug> fmt::Debug for KeyableWeak<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(KeyableWeak)")

View File

@ -4,14 +4,17 @@
#![allow(unused_variables)]
use core::{
borrow::Borrow,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use keyable_arc::{KeyableArc, KeyableWeak};
use super::*;
use crate::{
events::Observer,
fs::{file_handle::FileLike, file_table::FdEvents, utils::IoctlCmd},
fs::{file_handle::FileLike, utils::IoctlCmd},
process::signal::{Pollable, Pollee, Poller},
};
@ -29,7 +32,7 @@ use crate::{
/// event happens on the file.
pub struct EpollFile {
// All interesting entries.
interest: Mutex<BTreeMap<FileDesc, Arc<EpollEntry>>>,
interest: Mutex<BTreeSet<EpollEntryHolder>>,
// Entries that are probably ready (having events happened).
ready: Mutex<VecDeque<Weak<EpollEntry>>>,
// EpollFile itself is also pollable
@ -42,7 +45,7 @@ impl EpollFile {
/// Creates a new epoll file.
pub fn new() -> Arc<Self> {
Arc::new_cyclic(|me| Self {
interest: Mutex::new(BTreeMap::new()),
interest: Mutex::new(BTreeSet::new()),
ready: Mutex::new(VecDeque::new()),
pollee: Pollee::new(IoEvents::empty()),
weak_self: me.clone(),
@ -51,55 +54,63 @@ impl EpollFile {
/// Control the interest list of the epoll file.
pub fn control(&self, cmd: &EpollCtl) -> Result<()> {
let fd = match cmd {
EpollCtl::Add(fd, ..) => *fd,
EpollCtl::Del(fd) => *fd,
EpollCtl::Mod(fd, ..) => *fd,
};
let file = {
let current = current!();
let file_table = current.file_table().lock();
file_table.get_file(fd)?.clone()
};
match *cmd {
EpollCtl::Add(fd, ep_event, ep_flags) => self.add_interest(fd, ep_event, ep_flags),
EpollCtl::Del(fd) => {
self.del_interest(fd)?;
self.unregister_from_file_table_entry(fd);
Ok(())
EpollCtl::Add(fd, ep_event, ep_flags) => {
self.add_interest(fd, file, ep_event, ep_flags)
}
EpollCtl::Del(fd) => self.del_interest(fd, Arc::downgrade(&file).into()),
EpollCtl::Mod(fd, ep_event, ep_flags) => {
self.mod_interest(fd, file, ep_event, ep_flags)
}
EpollCtl::Mod(fd, ep_event, ep_flags) => self.mod_interest(fd, ep_event, ep_flags),
}
}
fn add_interest(&self, fd: FileDesc, ep_event: EpollEvent, ep_flags: EpollFlags) -> Result<()> {
fn add_interest(
&self,
fd: FileDesc,
file: Arc<dyn FileLike>,
ep_event: EpollEvent,
ep_flags: EpollFlags,
) -> Result<()> {
self.warn_unsupported_flags(&ep_flags);
let current = current!();
let file_table = current.file_table().lock();
let file_table_entry = file_table.get_entry(fd)?;
let file = file_table_entry.file();
let weak_file = Arc::downgrade(file);
let mask = ep_event.events;
let entry = EpollEntry::new(fd, weak_file, ep_event, ep_flags, self.weak_self.clone());
// Add the new entry to the interest list and start monitoring its events
let mut interest = self.interest.lock();
if interest.contains_key(&fd) {
return_errno_with_message!(Errno::EEXIST, "the fd has been added");
}
file.register_observer(entry.self_weak() as _, IoEvents::all())?;
interest.insert(fd, entry.clone());
// Register self to the file table entry
file_table_entry.register_observer(self.weak_self.clone() as _);
let file = file.clone();
drop(file_table);
drop(interest);
let entry = {
let mut interest = self.interest.lock();
if interest.contains(&EpollEntryKey::from((fd, &file))) {
return_errno_with_message!(Errno::EEXIST, "the fd has been added");
}
let entry = EpollEntry::new(fd, &file, ep_event, ep_flags, self.weak_self.clone())?;
let inserted = interest.insert(entry.clone().into());
assert!(inserted);
entry
};
// Add the new entry to the ready list if the file is ready
let events = file.poll(mask, None);
let events = file.poll(ep_event.events, None);
if !events.is_empty() {
self.push_ready(entry);
}
Ok(())
}
fn del_interest(&self, fd: FileDesc) -> Result<()> {
let mut interest = self.interest.lock();
let entry = interest
.remove(&fd)
.ok_or_else(|| Error::with_message(Errno::ENOENT, "fd is not in the interest list"))?;
fn del_interest(&self, fd: FileDesc, file: KeyableWeak<dyn FileLike>) -> Result<()> {
// If this epoll entry is in the ready list, then we should delete it.
// But unfortunately, deleting an entry from the ready list has a
// complexity of O(N).
@ -109,52 +120,47 @@ impl EpollFile {
// because the strong reference count will reach zero and `Weak::upgrade`
// will fail.
let file = match entry.file() {
Some(file) => file,
// TODO: should we warn about it?
None => return Ok(()),
};
if !self
.interest
.lock()
.remove(&EpollEntryKey::from((fd, file)))
{
return_errno_with_message!(Errno::ENOENT, "fd is not in the interest list");
}
file.unregister_observer(&(entry.self_weak() as _)).unwrap();
Ok(())
}
fn mod_interest(
&self,
fd: FileDesc,
file: Arc<dyn FileLike>,
new_ep_event: EpollEvent,
new_ep_flags: EpollFlags,
) -> Result<()> {
self.warn_unsupported_flags(&new_ep_flags);
// Update the epoll entry
let interest = self.interest.lock();
let entry = interest
.get(&fd)
.ok_or_else(|| Error::with_message(Errno::ENOENT, "fd is not in the interest list"))?;
let new_mask = new_ep_event.events;
entry.update(new_ep_event, new_ep_flags);
let entry = entry.clone();
drop(interest);
let entry = {
let interest = self.interest.lock();
let EpollEntryHolder(entry) = interest
.get(&EpollEntryKey::from((fd, &file)))
.ok_or_else(|| {
Error::with_message(Errno::ENOENT, "fd is not in the interest list")
})?;
entry.update(new_ep_event, new_ep_flags);
entry.clone()
};
// Add the updated entry to the ready list if the file is ready
let file = match entry.file() {
Some(file) => file,
None => return Ok(()),
};
let events = file.poll(new_mask, None);
let events = file.poll(new_ep_event.events, None);
if !events.is_empty() {
self.push_ready(entry);
}
Ok(())
}
fn unregister_from_file_table_entry(&self, fd: FileDesc) {
let current = current!();
let file_table = current.file_table().lock();
if let Ok(entry) = file_table.get_entry(fd) {
entry.unregister_observer(&(self.weak_self.clone() as _));
}
Ok(())
}
/// Wait for interesting events happen on the files in the interest list
@ -250,8 +256,8 @@ impl EpollFile {
entry.reset_ready();
// For EPOLLONESHOT flag, this entry should also be removed from the interest list
if ep_flags.intersects(EpollFlags::ONE_SHOT) {
self.del_interest(entry.fd())
.expect("this entry should be in the interest list");
// FIXME: This may fail due to race conditions.
let _ = self.del_interest(entry.fd(), entry.file_weak().clone());
}
}
}
@ -271,35 +277,6 @@ impl EpollFile {
}
}
impl Observer<FdEvents> for EpollFile {
fn on_events(&self, events: &FdEvents) {
// Delete the file from the interest list if it is closed.
if let FdEvents::Close(fd) = events {
let _ = self.del_interest(*fd);
}
}
}
impl Drop for EpollFile {
fn drop(&mut self) {
trace!("EpollFile Drop");
let mut interest = self.interest.lock();
let fds: Vec<_> = interest
.extract_if(|_, _| true)
.map(|(fd, entry)| {
if let Some(file) = entry.file() {
let _ = file.unregister_observer(&(entry.self_weak() as _));
}
fd
})
.collect();
drop(interest);
fds.iter()
.for_each(|&fd| self.unregister_from_file_table_entry(fd));
}
}
impl Pollable for EpollFile {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
@ -337,21 +314,47 @@ impl FileLike for EpollFile {
}
}
/// An epoll entry contained in an epoll file. Each epoll entry is added, modified,
/// or deleted by the `EpollCtl` command.
/// An epoll entry that is contained in an epoll file.
///
/// Each epoll entry can be added, modified, or deleted by the `EpollCtl` command.
pub struct EpollEntry {
fd: FileDesc,
file: Weak<dyn FileLike>,
inner: Mutex<Inner>,
// The file descriptor and the file
key: EpollEntryKey,
// The event masks and flags
inner: Mutex<EpollEntryInner>,
// Whether the entry is in the ready list
is_ready: AtomicBool,
// Refers to the epoll file containing this epoll entry
// The epoll file that contains this epoll entry
weak_epoll: Weak<EpollFile>,
// An EpollEntry is always contained inside Arc
// The epoll entry itself (always inside an `Arc`)
weak_self: Weak<Self>,
}
struct Inner {
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct EpollEntryKey {
fd: FileDesc,
file: KeyableWeak<dyn FileLike>,
}
impl From<(FileDesc, KeyableWeak<dyn FileLike>)> for EpollEntryKey {
fn from(value: (FileDesc, KeyableWeak<dyn FileLike>)) -> Self {
Self {
fd: value.0,
file: value.1,
}
}
}
impl From<(FileDesc, &Arc<dyn FileLike>)> for EpollEntryKey {
fn from(value: (FileDesc, &Arc<dyn FileLike>)) -> Self {
Self {
fd: value.0,
file: KeyableWeak::from(Arc::downgrade(value.1)),
}
}
}
struct EpollEntryInner {
event: EpollEvent,
flags: EpollFlags,
}
@ -360,19 +363,25 @@ impl EpollEntry {
/// Creates a new epoll entry associated with the given epoll file.
pub fn new(
fd: FileDesc,
file: Weak<dyn FileLike>,
file: &Arc<dyn FileLike>,
event: EpollEvent,
flags: EpollFlags,
weak_epoll: Weak<EpollFile>,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
fd,
file,
inner: Mutex::new(Inner { event, flags }),
) -> Result<Arc<Self>> {
let entry = Arc::new_cyclic(|me| Self {
key: EpollEntryKey {
fd,
file: Arc::downgrade(file).into(),
},
inner: Mutex::new(EpollEntryInner { event, flags }),
is_ready: AtomicBool::new(false),
weak_epoll,
weak_self: me.clone(),
})
});
file.register_observer(entry.weak_self.clone(), event.events)?;
Ok(entry)
}
/// Get the epoll file associated with this epoll entry.
@ -395,7 +404,7 @@ impl EpollEntry {
/// Since an epoll entry only holds a weak reference to the file,
/// it is possible (albeit unlikely) that the file has been dropped.
pub fn file(&self) -> Option<Arc<dyn FileLike>> {
self.file.upgrade()
self.key.file.upgrade().map(KeyableArc::into)
}
/// Get the epoll event associated with the epoll entry.
@ -420,7 +429,7 @@ impl EpollEntry {
///
/// If the returned events is not empty, then the file is considered ready.
pub fn poll(&self) -> IoEvents {
match self.file.upgrade() {
match self.file() {
Some(file) => file.poll(IoEvents::all(), None),
None => IoEvents::empty(),
}
@ -429,7 +438,7 @@ impl EpollEntry {
/// Update the epoll entry, most likely to be triggered via `EpollCtl::Mod`.
pub fn update(&self, event: EpollEvent, flags: EpollFlags) {
let mut inner = self.inner.lock();
*inner = Inner { event, flags }
*inner = EpollEntryInner { event, flags }
}
/// Returns whether the epoll entry is in the ready list.
@ -449,7 +458,12 @@ impl EpollEntry {
/// Get the file descriptor associated with the epoll entry.
pub fn fd(&self) -> FileDesc {
self.fd
self.key.fd
}
/// Get the file associated with this epoll entry.
pub fn file_weak(&self) -> &KeyableWeak<dyn FileLike> {
&self.key.file
}
}
@ -460,3 +474,43 @@ impl Observer<IoEvents> for EpollEntry {
}
}
}
struct EpollEntryHolder(pub Arc<EpollEntry>);
impl PartialOrd for EpollEntryHolder {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for EpollEntryHolder {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.0.key.cmp(&other.0.key)
}
}
impl PartialEq for EpollEntryHolder {
fn eq(&self, other: &Self) -> bool {
self.0.key.eq(&other.0.key)
}
}
impl Eq for EpollEntryHolder {}
impl Borrow<EpollEntryKey> for EpollEntryHolder {
fn borrow(&self) -> &EpollEntryKey {
&self.0.key
}
}
impl From<Arc<EpollEntry>> for EpollEntryHolder {
fn from(value: Arc<EpollEntry>) -> Self {
Self(value)
}
}
impl Drop for EpollEntryHolder {
fn drop(&mut self) {
let Some(file) = self.file() else {
return;
};
file.unregister_observer(&(self.self_weak() as _)).unwrap();
}
}

View File

@ -0,0 +1,52 @@
// SPDX-License-Identifier: MPL-2.0
#include "../network/test.h"
#include <unistd.h>
#include <sys/epoll.h>
FN_TEST(epoll_add_del)
{
int fildes[2];
int epfd, rfd, wfd, rfd2;
struct epoll_event ev;
// Setup pipes
TEST_SUCC(pipe(fildes));
rfd = fildes[0];
wfd = fildes[1];
TEST_SUCC(write(wfd, "", 1));
// Setup epoll
epfd = TEST_SUCC(epoll_create1(0));
ev.events = EPOLLIN;
ev.data.fd = rfd;
TEST_SUCC(epoll_ctl(epfd, EPOLL_CTL_ADD, rfd, &ev));
// Dup and close
rfd2 = dup(rfd);
close(rfd);
// No way to operate on closed file descriptors
TEST_RES(epoll_wait(epfd, &ev, 1, 0), _ret == 1 && ev.data.fd == rfd);
TEST_ERRNO(epoll_ctl(epfd, EPOLL_CTL_DEL, rfd, NULL), EBADF);
TEST_ERRNO(epoll_ctl(epfd, EPOLL_CTL_DEL, rfd2, NULL), ENOENT);
// Old file descriptor and new file
TEST_RES(pipe(fildes), fildes[0] == rfd);
TEST_ERRNO(epoll_ctl(epfd, EPOLL_CTL_DEL, rfd, NULL), ENOENT);
TEST_SUCC(epoll_ctl(epfd, EPOLL_CTL_ADD, rfd, &ev));
TEST_SUCC(epoll_ctl(epfd, EPOLL_CTL_DEL, rfd, NULL));
TEST_SUCC(close(fildes[0]));
TEST_SUCC(close(fildes[1]));
// Old file descriptor and old file
TEST_RES(dup(rfd2), _ret == rfd);
TEST_SUCC(epoll_ctl(epfd, EPOLL_CTL_DEL, rfd, NULL));
// Clean up
TEST_SUCC(close(epfd));
TEST_SUCC(close(rfd));
TEST_SUCC(close(wfd));
TEST_SUCC(close(rfd2));
}
END_TEST()

View File

@ -62,3 +62,4 @@ test_fdatasync
echo "All fdatasync test passed."
pipe/pipe_err
epoll/epoll_err