Avoid deadlocks due to drops

This commit is contained in:
Ruihan Li
2024-10-24 23:21:19 +08:00
committed by Tate, Hongliang Tian
parent 885950c2a4
commit 60440d1062

View File

@ -34,27 +34,19 @@ use crate::{
pub struct EpollFile { pub struct EpollFile {
// All interesting entries. // All interesting entries.
interest: Mutex<BTreeSet<EpollEntryHolder>>, interest: Mutex<BTreeSet<EpollEntryHolder>>,
// Entries that are probably ready (having events happened). // A set of ready entries.
ready: SpinLock<VecDeque<Weak<EpollEntry>>, LocalIrqDisabled>, //
// A guard to ensure that ready entries can be popped by one thread at a time. // Keep this in a separate `Arc` to avoid dropping `EpollFile` in the observer callback, which
pop_guard: Mutex<PopGuard>, // may cause deadlocks.
// EpollFile itself is also pollable ready: Arc<ReadySet>,
pollee: Pollee,
// Any EpollFile is wrapped with Arc when created.
weak_self: Weak<Self>,
} }
struct PopGuard;
impl EpollFile { impl EpollFile {
/// Creates a new epoll file. /// Creates a new epoll file.
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new_cyclic(|me| Self { Arc::new(Self {
interest: Mutex::new(BTreeSet::new()), interest: Mutex::new(BTreeSet::new()),
ready: SpinLock::new(VecDeque::new()), ready: Arc::new(ReadySet::new()),
pop_guard: Mutex::new(PopGuard),
pollee: Pollee::new(IoEvents::empty()),
weak_self: me.clone(),
}) })
} }
@ -103,7 +95,7 @@ impl EpollFile {
); );
} }
let entry = EpollEntry::new(fd, Arc::downgrade(&file).into(), self.weak_self.clone()); let entry = EpollEntry::new(fd, Arc::downgrade(&file).into(), self.ready.clone());
let events = entry.update(ep_event, ep_flags); let events = entry.update(ep_event, ep_flags);
let ready_entry = if !events.is_empty() { let ready_entry = if !events.is_empty() {
@ -120,7 +112,7 @@ impl EpollFile {
// Add the new entry to the ready list if the file is ready // Add the new entry to the ready list if the file is ready
if let Some(entry) = ready_entry { if let Some(entry) = ready_entry {
self.push_ready(entry); self.ready.push(entry.observer());
} }
Ok(()) Ok(())
@ -176,7 +168,7 @@ impl EpollFile {
// Add the updated entry to the ready list if the file is ready // Add the updated entry to the ready list if the file is ready
if let Some(entry) = ready_entry { if let Some(entry) = ready_entry {
self.push_ready(entry); self.ready.push(entry.observer());
} }
Ok(()) Ok(())
@ -211,50 +203,20 @@ impl EpollFile {
Ok(ep_events) Ok(ep_events)
} }
fn push_ready(&self, entry: Arc<EpollEntry>) {
// Note that we cannot take the `EpollEntryInner` lock because we are in the callback of
// the event observer. Doing so will cause dead locks due to inconsistent locking orders.
//
// We don't need to take the lock because
// - We always call `file.poll()` immediately after calling `self.set_enabled()` and
// `file.register_observer()`, so all events are caught either here or by the immediate
// poll; in other words, we don't lose any events.
// - Catching spurious events here is always fine because we always check them later before
// returning events to the user (in `EpollEntry::poll`).
if !entry.is_enabled() {
return;
}
let mut ready = self.ready.lock();
if !entry.is_ready() {
entry.set_ready(&ready);
ready.push_back(Arc::downgrade(&entry));
}
// Even if the entry is already set to ready,
// there might be new events that we are interested in.
// Wake the poller anyway.
self.pollee.add_events(IoEvents::IN);
}
fn pop_multi_ready(&self, max_events: usize, ep_events: &mut Vec<EpollEvent>) { fn pop_multi_ready(&self, max_events: usize, ep_events: &mut Vec<EpollEvent>) {
let pop_guard = self.pop_guard.lock(); let mut pop_iter = self.ready.lock_pop();
let mut limit = None;
loop { loop {
if ep_events.len() >= max_events { if ep_events.len() >= max_events {
break; break;
} }
// Since we're holding `pop_guard`, no one else can pop the entries from the ready // Since we're holding `pop_guard` (in `pop_iter`), no one else can pop the entries
// list. This guarantees that `pop_one_ready` will pop the ready entries we see when // from the ready list. This guarantees that `next` will pop the ready entries we see
// `pop_multi_ready` starts executing, so that such entries are never duplicated. // when `pop_multi_ready` starts executing, so that such entries are never duplicated.
let Some((entry, new_limit)) = self.pop_one_ready(limit, &pop_guard) else { let Some(entry) = pop_iter.next() else {
break; break;
}; };
limit = Some(new_limit);
// Poll the events. If the file is dead, we will remove the entry. // Poll the events. If the file is dead, we will remove the entry.
let Some((ep_event, is_still_ready)) = entry.poll() else { let Some((ep_event, is_still_ready)) = entry.poll() else {
@ -274,50 +236,11 @@ impl EpollFile {
// Add the entry back to the ready list, if necessary. // Add the entry back to the ready list, if necessary.
if is_still_ready { if is_still_ready {
self.push_ready(entry); self.ready.push(entry.observer());
} }
} }
} }
fn pop_one_ready(
&self,
limit: Option<usize>,
_guard: &MutexGuard<PopGuard>,
) -> Option<(Arc<EpollEntry>, usize)> {
if limit == Some(0) {
return None;
}
let mut ready = self.ready.lock();
let mut limit = limit.unwrap_or_else(|| ready.len());
while limit > 0 {
limit -= 1;
// Pop the front entry. Note that `_guard` and `limit` guarantee that this entry must
// exist, so we can just unwrap it.
let weak_entry = ready.pop_front().unwrap();
// Clear the epoll file's events if there are no ready entries.
if ready.len() == 0 {
self.pollee.del_events(IoEvents::IN);
}
let Some(entry) = Weak::upgrade(&weak_entry) else {
// The entry has been deleted.
continue;
};
// Mark the entry as not ready. We can invoke `push_ready` later to add it back to the
// ready list if we need to.
entry.reset_ready(&ready);
return Some((entry, limit));
}
None
}
fn warn_unsupported_flags(&self, flags: &EpollFlags) { fn warn_unsupported_flags(&self, flags: &EpollFlags) {
if flags.intersects(EpollFlags::EXCLUSIVE | EpollFlags::WAKE_UP) { if flags.intersects(EpollFlags::EXCLUSIVE | EpollFlags::WAKE_UP) {
warn!("{:?} contains unsupported flags", flags); warn!("{:?} contains unsupported flags", flags);
@ -327,7 +250,7 @@ impl EpollFile {
impl Pollable for EpollFile { impl Pollable for EpollFile {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents { fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.pollee.poll(mask, poller) self.ready.poll(mask, poller)
} }
} }
@ -355,6 +278,115 @@ impl FileLike for EpollFile {
} }
} }
/// A set of ready epoll entries.
pub struct ReadySet {
// Entries that are probably ready (having events happened).
entries: SpinLock<VecDeque<Weak<EpollEntry>>, LocalIrqDisabled>,
// A guard to ensure that ready entries can be popped by one thread at a time.
pop_guard: Mutex<PopGuard>,
// A pollee for the ready set (i.e., for `EpollFile` itself).
pollee: Pollee,
}
struct PopGuard;
impl ReadySet {
pub fn new() -> Self {
Self {
entries: SpinLock::new(VecDeque::new()),
pop_guard: Mutex::new(PopGuard),
pollee: Pollee::new(IoEvents::empty()),
}
}
pub fn push(&self, observer: &EpollEntryObserver) {
// Note that we cannot take the `EpollEntryInner` lock because we are in the callback of
// the event observer. Doing so will cause dead locks due to inconsistent locking orders.
//
// We don't need to take the lock because
// - We always call `file.poll()` immediately after calling `self.set_enabled()` and
// `file.register_observer()`, so all events are caught either here or by the immediate
// poll; in other words, we don't lose any events.
// - Catching spurious events here is always fine because we always check them later before
// returning events to the user (in `EpollEntry::poll`).
if !observer.is_enabled() {
return;
}
let mut entries = self.entries.lock();
if !observer.is_ready() {
observer.set_ready(&entries);
entries.push_back(observer.weak_entry().clone())
}
// Even if the entry is already set to ready,
// there might be new events that we are interested in.
// Wake the poller anyway.
self.pollee.add_events(IoEvents::IN);
}
pub fn lock_pop(&self) -> ReadySetPopIter {
ReadySetPopIter {
ready_set: self,
_pop_guard: self.pop_guard.lock(),
limit: None,
}
}
pub fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}
/// An iterator to pop ready entries from a [`ReadySet`].
pub struct ReadySetPopIter<'a> {
ready_set: &'a ReadySet,
_pop_guard: MutexGuard<'a, PopGuard>,
limit: Option<usize>,
}
impl Iterator for ReadySetPopIter<'_> {
type Item = Arc<EpollEntry>;
fn next(&mut self) -> Option<Self::Item> {
if self.limit == Some(0) {
return None;
}
let mut entries = self.ready_set.entries.lock();
let mut limit = self.limit.unwrap_or_else(|| entries.len());
while limit > 0 {
limit -= 1;
// Pop the front entry. Note that `_pop_guard` and `limit` guarantee that this entry
// must exist, so we can just unwrap it.
let weak_entry = entries.pop_front().unwrap();
// Clear the epoll file's events if there are no ready entries.
if entries.len() == 0 {
self.ready_set.pollee.del_events(IoEvents::IN);
}
let Some(entry) = Weak::upgrade(&weak_entry) else {
// The entry has been deleted.
continue;
};
// Mark the entry as not ready. We can invoke `ReadySet::push` later to add it back to
// the ready list if we need to.
entry.observer().reset_ready(&entries);
self.limit = Some(limit);
return Some(entry);
}
self.limit = None;
None
}
}
/// An epoll entry that is contained in an epoll file. /// An epoll entry that is contained in an epoll file.
/// ///
/// Each epoll entry can be added, modified, or deleted by the `EpollCtl` command. /// Each epoll entry can be added, modified, or deleted by the `EpollCtl` command.
@ -363,14 +395,11 @@ pub struct EpollEntry {
key: EpollEntryKey, key: EpollEntryKey,
// The event masks and flags // The event masks and flags
inner: Mutex<EpollEntryInner>, inner: Mutex<EpollEntryInner>,
// Whether the entry is enabled // The observer that receives events.
is_enabled: AtomicBool, //
// Whether the entry is in the ready list // Keep this in a separate `Arc` to avoid dropping `EpollEntry` in the observer callback, which
is_ready: AtomicBool, // may cause deadlocks.
// The epoll file that contains this epoll entry observer: Arc<EpollEntryObserver>,
weak_epoll: Weak<EpollFile>,
// The epoll entry itself (always inside an `Arc`)
weak_self: Weak<Self>,
} }
#[derive(PartialEq, Eq, PartialOrd, Ord)] #[derive(PartialEq, Eq, PartialOrd, Ord)]
@ -408,39 +437,28 @@ impl EpollEntry {
pub fn new( pub fn new(
fd: FileDesc, fd: FileDesc,
file: KeyableWeak<dyn FileLike>, file: KeyableWeak<dyn FileLike>,
weak_epoll: Weak<EpollFile>, ready_set: Arc<ReadySet>,
) -> Arc<Self> { ) -> Arc<Self> {
Arc::new_cyclic(|me| { Arc::new_cyclic(|me| {
let observer = Arc::new(EpollEntryObserver::new(ready_set, me.clone()));
let inner = EpollEntryInner { let inner = EpollEntryInner {
event: EpollEvent { event: EpollEvent {
events: IoEvents::empty(), events: IoEvents::empty(),
user_data: 0, user_data: 0,
}, },
flags: EpollFlags::empty(), flags: EpollFlags::empty(),
poller: PollHandle::new(me.clone() as _), poller: PollHandle::new(Arc::downgrade(&observer) as _),
}; };
Self { Self {
key: EpollEntryKey { fd, file }, key: EpollEntryKey { fd, file },
inner: Mutex::new(inner), inner: Mutex::new(inner),
is_enabled: AtomicBool::new(false), observer,
is_ready: AtomicBool::new(false),
weak_epoll,
weak_self: me.clone(),
} }
}) })
} }
/// Get the epoll file associated with this epoll entry.
pub fn epoll_file(&self) -> Option<Arc<EpollFile>> {
self.weak_epoll.upgrade()
}
/// Get an instance of `Arc` that refers to this epoll entry.
pub fn self_arc(&self) -> Arc<Self> {
self.weak_self.upgrade().unwrap()
}
/// Get the file associated with this epoll entry. /// Get the file associated with this epoll entry.
/// ///
/// Since an epoll entry only holds a weak reference to the file, /// Since an epoll entry only holds a weak reference to the file,
@ -459,7 +477,7 @@ impl EpollEntry {
let inner = self.inner.lock(); let inner = self.inner.lock();
// There are no events if the entry is disabled. // There are no events if the entry is disabled.
if !self.is_enabled() { if !self.observer.is_enabled() {
return Some((None, false)); return Some((None, false));
} }
@ -483,7 +501,7 @@ impl EpollEntry {
// If there are events and the epoll entry is one-shot, we need to disable the entry until // If there are events and the epoll entry is one-shot, we need to disable the entry until
// the user enables it again via `EpollCtl::Mod`. // the user enables it again via `EpollCtl::Mod`.
if ep_event.is_some() && inner.flags.contains(EpollFlags::ONE_SHOT) { if ep_event.is_some() && inner.flags.contains(EpollFlags::ONE_SHOT) {
self.reset_enabled(&inner); self.observer.reset_enabled(&inner);
} }
Some((ep_event, is_still_ready)) Some((ep_event, is_still_ready))
@ -500,7 +518,7 @@ impl EpollEntry {
inner.event = event; inner.event = event;
inner.flags = flags; inner.flags = flags;
self.set_enabled(&inner); self.observer.set_enabled(&inner);
file.poll(event.events, Some(&mut inner.poller)) file.poll(event.events, Some(&mut inner.poller))
} }
@ -511,10 +529,48 @@ impl EpollEntry {
pub fn shutdown(&self) { pub fn shutdown(&self) {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
self.reset_enabled(&inner); self.observer.reset_enabled(&inner);
inner.poller.reset(); inner.poller.reset();
} }
/// Gets the underlying observer.
pub fn observer(&self) -> &EpollEntryObserver {
&self.observer
}
/// Get the file descriptor associated with the epoll entry.
pub fn fd(&self) -> FileDesc {
self.key.fd
}
/// Get the file associated with this epoll entry.
pub fn file_weak(&self) -> &KeyableWeak<dyn FileLike> {
&self.key.file
}
}
/// A observer for [`EpollEntry`] that can receive events.
pub struct EpollEntryObserver {
// Whether the entry is enabled
is_enabled: AtomicBool,
// Whether the entry is in the ready list
is_ready: AtomicBool,
// The ready set of the epoll file that contains this epoll entry
ready_set: Arc<ReadySet>,
// The epoll entry itself (always inside an `Arc`)
weak_entry: Weak<EpollEntry>,
}
impl EpollEntryObserver {
pub fn new(ready_set: Arc<ReadySet>, weak_entry: Weak<EpollEntry>) -> Self {
Self {
is_enabled: AtomicBool::new(false),
is_ready: AtomicBool::new(false),
ready_set,
weak_entry,
}
}
/// Returns whether the epoll entry is in the ready list. /// Returns whether the epoll entry is in the ready list.
/// ///
/// *Caution:* If this method is called without holding the lock of the ready list, the user /// *Caution:* If this method is called without holding the lock of the ready list, the user
@ -572,22 +628,15 @@ impl EpollEntry {
self.is_enabled.store(false, Ordering::Relaxed) self.is_enabled.store(false, Ordering::Relaxed)
} }
/// Get the file descriptor associated with the epoll entry. /// Gets an instance of `Weak` that refers to the epoll entry.
pub fn fd(&self) -> FileDesc { pub fn weak_entry(&self) -> &Weak<EpollEntry> {
self.key.fd &self.weak_entry
}
/// Get the file associated with this epoll entry.
pub fn file_weak(&self) -> &KeyableWeak<dyn FileLike> {
&self.key.file
} }
} }
impl Observer<IoEvents> for EpollEntry { impl Observer<IoEvents> for EpollEntryObserver {
fn on_events(&self, _events: &IoEvents) { fn on_events(&self, _events: &IoEvents) {
if let Some(epoll_file) = self.epoll_file() { self.ready_set.push(self);
epoll_file.push_ready(self.self_arc());
}
} }
} }