Don't lock Mutex in EpollEntry::on_events

This commit is contained in:
Ruihan Li
2024-10-03 01:00:39 +08:00
committed by Tate, Hongliang Tian
parent 6e7b12c3c4
commit b9b09b8142

View File

@ -7,6 +7,7 @@ use core::{
}; };
use keyable_arc::{KeyableArc, KeyableWeak}; use keyable_arc::{KeyableArc, KeyableWeak};
use ostd::sync::LocalIrqDisabled;
use super::*; use super::*;
use crate::{ use crate::{
@ -31,19 +32,24 @@ pub struct EpollFile {
// All interesting entries. // All interesting entries.
interest: Mutex<BTreeSet<EpollEntryHolder>>, interest: Mutex<BTreeSet<EpollEntryHolder>>,
// Entries that are probably ready (having events happened). // Entries that are probably ready (having events happened).
ready: Mutex<VecDeque<Weak<EpollEntry>>>, ready: SpinLock<VecDeque<Weak<EpollEntry>>, LocalIrqDisabled>,
// A guard to ensure that ready entries can be popped by one thread at a time.
pop_guard: Mutex<PopGuard>,
// EpollFile itself is also pollable // EpollFile itself is also pollable
pollee: Pollee, pollee: Pollee,
// Any EpollFile is wrapped with Arc when created. // Any EpollFile is wrapped with Arc when created.
weak_self: Weak<Self>, weak_self: Weak<Self>,
} }
struct PopGuard;
impl EpollFile { impl EpollFile {
/// Creates a new epoll file. /// Creates a new epoll file.
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new_cyclic(|me| Self { Arc::new_cyclic(|me| Self {
interest: Mutex::new(BTreeSet::new()), interest: Mutex::new(BTreeSet::new()),
ready: Mutex::new(VecDeque::new()), ready: SpinLock::new(VecDeque::new()),
pop_guard: Mutex::new(PopGuard),
pollee: Pollee::new(IoEvents::empty()), pollee: Pollee::new(IoEvents::empty()),
weak_self: me.clone(), weak_self: me.clone(),
}) })
@ -188,7 +194,7 @@ impl EpollFile {
let mut poller = None; let mut poller = None;
loop { loop {
// Try to pop some ready entries // Try to pop some ready entries
self.pop_ready(max_events, &mut ep_events); self.pop_multi_ready(max_events, &mut ep_events);
if !ep_events.is_empty() { if !ep_events.is_empty() {
return Ok(ep_events); return Ok(ep_events);
} }
@ -242,27 +248,32 @@ impl EpollFile {
self.pollee.add_events(IoEvents::IN); self.pollee.add_events(IoEvents::IN);
} }
fn pop_ready(&self, max_events: usize, ep_events: &mut Vec<EpollEvent>) { fn pop_multi_ready(&self, max_events: usize, ep_events: &mut Vec<EpollEvent>) {
let mut ready = self.ready.lock(); let pop_guard = self.pop_guard.lock();
let mut dead_entries = Vec::new();
for _ in 0..ready.len() { let mut limit = None;
loop {
if ep_events.len() >= max_events { if ep_events.len() >= max_events {
break; break;
} }
let weak_entry = ready.pop_front().unwrap(); // Since we're holding `pop_guard`, no one else can pop the entries from the ready
let Some(entry) = Weak::upgrade(&weak_entry) else { // list. This guarantees that `pop_one_ready` will pop the ready entries we see when
// The entry has been deleted. // `pop_multi_ready` starts executing, so that such entries are never duplicated.
continue; let Some((entry, new_limit)) = self.pop_one_ready(limit, &pop_guard) else {
break;
}; };
limit = Some(new_limit);
// Mark the entry as not ready. We will (re)mark it as ready later if we need to. // Poll the events. If the file is dead, we will remove the entry.
entry.reset_ready(&ready);
// Poll the events. If the file is dead, we will remove the entry later.
let Some((ep_event, is_still_ready)) = entry.poll() else { let Some((ep_event, is_still_ready)) = entry.poll() else {
dead_entries.push((entry.fd(), entry.file_weak().clone())); // We're removing entries whose files are dead. This can only fail if user programs
// remove the entry at the same time, and we run into some race conditions.
//
// However, this has very limited impact because we will never remove a wrong entry. So
// the error can be silently ignored.
let _ = self.del_interest(entry.fd(), entry.file_weak().clone());
continue; continue;
}; };
@ -273,32 +284,48 @@ impl EpollFile {
// Add the entry back to the ready list, if necessary. // Add the entry back to the ready list, if necessary.
if is_still_ready { if is_still_ready {
entry.set_ready(&ready); self.push_ready(entry);
ready.push_back(weak_entry);
} }
} }
}
// Clear the epoll file's events if there are no ready entries. fn pop_one_ready(
if ready.len() == 0 { &self,
self.pollee.del_events(IoEvents::IN); limit: Option<usize>,
_guard: &MutexGuard<PopGuard>,
) -> Option<(Arc<EpollEntry>, usize)> {
if limit == Some(0) {
return None;
} }
// Remove entries whose files are dead. let mut ready = self.ready.lock();
// let mut limit = limit.unwrap_or_else(|| ready.len());
// We must do this after unlocking the ready list. The ready list is locked in the event
// observer's callback, so we cannot unregister the observer while holding the lock. while limit > 0 {
// Otherwise we may get dead locks due to inconsistent locking orders. limit -= 1;
drop(ready);
for (fd, file) in dead_entries { // Pop the front entry. Note that `_guard` and `limit` guarantee that this entry must
// We're removing entries whose files are dead. This can only fail if there are events // exist, so we can just unwrap it.
// generated for dead files (even though files are dead, their pollees can still be let weak_entry = ready.pop_front().unwrap();
// alive) and they hit the race conditions (since we have released the lock of the
// ready list). // Clear the epoll file's events if there are no ready entries.
// if ready.len() == 0 {
// However, this has very limited impact because we will never remove a wrong entry. So self.pollee.del_events(IoEvents::IN);
// the error can be silently ignored. }
let _ = self.del_interest(fd, file);
let Some(entry) = Weak::upgrade(&weak_entry) else {
// The entry has been deleted.
continue;
};
// Mark the entry as not ready. We can invoke `push_ready` later to add it back to the
// ready list if we need to.
entry.reset_ready(&ready);
return Some((entry, limit));
} }
None
} }
fn warn_unsupported_flags(&self, flags: &EpollFlags) { fn warn_unsupported_flags(&self, flags: &EpollFlags) {
@ -527,7 +554,7 @@ impl EpollEntry {
/// This method must be called while holding the lock of the ready list. This is the only way /// This method must be called while holding the lock of the ready list. This is the only way
/// to ensure that the "is ready" state matches the fact that the entry is actually in the /// to ensure that the "is ready" state matches the fact that the entry is actually in the
/// ready list. /// ready list.
pub fn set_ready(&self, _guard: &MutexGuard<VecDeque<Weak<EpollEntry>>>) { pub fn set_ready(&self, _guard: &SpinLockGuard<VecDeque<Weak<EpollEntry>>, LocalIrqDisabled>) {
self.is_ready.store(true, Ordering::Relaxed); self.is_ready.store(true, Ordering::Relaxed);
} }
@ -536,7 +563,10 @@ impl EpollEntry {
/// This method must be called while holding the lock of the ready list. This is the only way /// This method must be called while holding the lock of the ready list. This is the only way
/// to ensure that the "is ready" state matches the fact that the entry is actually in the /// to ensure that the "is ready" state matches the fact that the entry is actually in the
/// ready list. /// ready list.
pub fn reset_ready(&self, _guard: &MutexGuard<VecDeque<Weak<EpollEntry>>>) { pub fn reset_ready(
&self,
_guard: &SpinLockGuard<VecDeque<Weak<EpollEntry>>, LocalIrqDisabled>,
) {
self.is_ready.store(false, Ordering::Relaxed) self.is_ready.store(false, Ordering::Relaxed)
} }