Refine the event lock for pipes

This commit is contained in:
Ruihan Li
2024-06-30 23:47:01 +08:00
committed by Tate, Hongliang Tian
parent ab9941263b
commit 1c0d865373

View File

@ -114,21 +114,24 @@ impl<T> Producer<T> {
} }
fn update_pollee(&self) { fn update_pollee(&self) {
// In theory, `rb.is_full()`/`rb.is_empty()`, where the `rb` is taken from either
// `this_end` or `peer_end`, should reflect the same state. However, we need to take the
// correct lock when updating the events to avoid races between the state check and the
// event update.
let this_end = self.this_end(); let this_end = self.this_end();
let peer_end = self.peer_end();
// Update the event of pollee in a critical region so that pollee
// always reflects the _true_ state of the underlying ring buffer
// regardless of any race conditions.
let _guard = self.0.common.lock_event();
let rb = this_end.rb(); let rb = this_end.rb();
if rb.is_full() { if rb.is_full() {
this_end.pollee.del_events(IoEvents::OUT); this_end.pollee.del_events(IoEvents::OUT);
} }
drop(rb);
let peer_end = self.peer_end();
let rb = peer_end.rb();
if !rb.is_empty() { if !rb.is_empty() {
peer_end.pollee.add_events(IoEvents::IN); peer_end.pollee.add_events(IoEvents::IN);
} }
drop(rb);
} }
impl_common_methods_for_channel!(); impl_common_methods_for_channel!();
@ -235,8 +238,6 @@ impl<T> Drop for Producer<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(); self.shutdown();
let _guard = self.0.common.lock_event();
// When reading from a channel such as a pipe or a stream socket, // When reading from a channel such as a pipe or a stream socket,
// POLLHUP merely indicates that the peer closed its end of the channel. // POLLHUP merely indicates that the peer closed its end of the channel.
self.peer_end().pollee.add_events(IoEvents::HUP); self.peer_end().pollee.add_events(IoEvents::HUP);
@ -253,21 +254,24 @@ impl<T> Consumer<T> {
} }
fn update_pollee(&self) { fn update_pollee(&self) {
// In theory, `rb.is_full()`/`rb.is_empty()`, where the `rb` is taken from either
// `this_end` or `peer_end`, should reflect the same state. However, we need to take the
// correct lock when updating the events to avoid races between the state check and the
// event update.
let this_end = self.this_end(); let this_end = self.this_end();
let peer_end = self.peer_end();
// Update the event of pollee in a critical region so that pollee
// always reflects the _true_ state of the underlying ring buffer
// regardless of any race conditions.
let _guard = self.0.common.lock_event();
let rb = this_end.rb(); let rb = this_end.rb();
if rb.is_empty() { if rb.is_empty() {
this_end.pollee.del_events(IoEvents::IN); this_end.pollee.del_events(IoEvents::IN);
} }
drop(rb);
let peer_end = self.peer_end();
let rb = peer_end.rb();
if !rb.is_full() { if !rb.is_full() {
peer_end.pollee.add_events(IoEvents::OUT); peer_end.pollee.add_events(IoEvents::OUT);
} }
drop(rb);
} }
impl_common_methods_for_channel!(); impl_common_methods_for_channel!();
@ -375,8 +379,6 @@ impl<T> Drop for Consumer<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(); self.shutdown();
let _guard = self.0.common.lock_event();
// POLLERR is also set for a file descriptor referring to the write end of a pipe // POLLERR is also set for a file descriptor referring to the write end of a pipe
// when the read end has been closed. // when the read end has been closed.
self.peer_end().pollee.add_events(IoEvents::ERR); self.peer_end().pollee.add_events(IoEvents::ERR);
@ -432,7 +434,6 @@ impl<T, R: TRights> EndPoint<T, R> {
struct Common<T> { struct Common<T> {
producer: EndPointInner<HeapRbProducer<T>>, producer: EndPointInner<HeapRbProducer<T>>,
consumer: EndPointInner<HeapRbConsumer<T>>, consumer: EndPointInner<HeapRbConsumer<T>>,
event_lock: Mutex<()>,
} }
impl<T> Common<T> { impl<T> Common<T> {
@ -448,17 +449,8 @@ impl<T> Common<T> {
let producer = EndPointInner::new(rb_producer, IoEvents::OUT, flags); let producer = EndPointInner::new(rb_producer, IoEvents::OUT, flags);
let consumer = EndPointInner::new(rb_consumer, IoEvents::empty(), flags); let consumer = EndPointInner::new(rb_consumer, IoEvents::empty(), flags);
let event_lock = Mutex::new(());
Ok(Self { Ok(Self { producer, consumer })
producer,
consumer,
event_lock,
})
}
pub fn lock_event(&self) -> MutexGuard<()> {
self.event_lock.lock()
} }
pub fn capacity(&self) -> usize { pub fn capacity(&self) -> usize {