From 85d2c9634f5c4f7b3dac1f612655a5ce675a1422 Mon Sep 17 00:00:00 2001 From: Ruize Tang <1466040111@qq.com> Date: Thu, 7 Nov 2024 09:25:10 +0800 Subject: [PATCH] Fix non-atomic write for PIPEs with data up to PIPE_BUF --- kernel/src/fs/pipe.rs | 30 ++++++++++++++++++++++++------ kernel/src/fs/utils/channel.rs | 25 +++++++++++++++++++++---- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/kernel/src/fs/pipe.rs b/kernel/src/fs/pipe.rs index 67a2dcba7..c4ae5ce0b 100644 --- a/kernel/src/fs/pipe.rs +++ b/kernel/src/fs/pipe.rs @@ -217,7 +217,7 @@ mod test { W: Fn(Arc) + Sync + Send + 'static, R: Fn(Arc) + Sync + Send + 'static, { - let channel = Channel::with_capacity(1); + let channel = Channel::with_capacity(2); let (writer, readr) = channel.split(); let writer = PipeWriter::new(writer, StatusFlags::empty()).unwrap(); @@ -283,13 +283,13 @@ mod test { fn test_write_full() { test_blocking( |writer| { - assert_eq!(writer.write(&mut reader_from(&[1, 2])).unwrap(), 1); + assert_eq!(writer.write(&mut reader_from(&[1, 2, 3])).unwrap(), 2); assert_eq!(writer.write(&mut reader_from(&[2])).unwrap(), 1); }, |reader| { - let mut buf = [0; 2]; - assert_eq!(reader.read(&mut writer_from(&mut buf)).unwrap(), 1); - assert_eq!(&buf[..1], &[1]); + let mut buf = [0; 3]; + assert_eq!(reader.read(&mut writer_from(&mut buf)).unwrap(), 2); + assert_eq!(&buf[..2], &[1, 2]); assert_eq!(reader.read(&mut writer_from(&mut buf)).unwrap(), 1); assert_eq!(&buf[..1], &[2]); }, @@ -313,7 +313,7 @@ mod test { fn test_write_closed() { test_blocking( |writer| { - assert_eq!(writer.write(&mut reader_from(&[1, 2])).unwrap(), 1); + assert_eq!(writer.write(&mut reader_from(&[1, 2, 3])).unwrap(), 2); assert_eq!( writer.write(&mut reader_from(&[2])).unwrap_err().error(), Errno::EPIPE @@ -324,6 +324,24 @@ mod test { ); } + #[ktest] + fn test_write_atomicity() { + test_blocking( + |writer| { + assert_eq!(writer.write(&mut reader_from(&[1])).unwrap(), 1); + assert_eq!(writer.write(&mut reader_from(&[1, 2])).unwrap(), 2); + }, + |reader| { + let mut buf = [0; 3]; + assert_eq!(reader.read(&mut writer_from(&mut buf)).unwrap(), 1); + assert_eq!(&buf[..1], &[1]); + assert_eq!(reader.read(&mut writer_from(&mut buf)).unwrap(), 2); + assert_eq!(&buf[..2], &[1, 2]); + }, + Ordering::WriteThenRead, + ); + } + fn reader_from(buf: &[u8]) -> VmReader { VmReader::from(buf).to_fallible() } diff --git a/kernel/src/fs/utils/channel.rs b/kernel/src/fs/utils/channel.rs index f7bca65de..407efcb75 100644 --- a/kernel/src/fs/utils/channel.rs +++ b/kernel/src/fs/utils/channel.rs @@ -22,6 +22,19 @@ pub struct Channel { consumer: Consumer, } +/// Maximum number of bytes guaranteed to be written to a pipe atomically. +/// +/// If the number of bytes to be written is less than the threshold, the write must be atomic. +/// A non-blocking atomic write may fail with `EAGAIN`, even if there is room for a partial write. +/// In other words, a partial write is not allowed for an atomic write. +/// +/// For more details, see the description of `PIPE_BUF` in +/// . +#[cfg(not(ktest))] +const PIPE_BUF: usize = 4096; +#[cfg(ktest)] +const PIPE_BUF: usize = 2; + impl Channel { /// Creates a new channel with the given capacity. /// @@ -114,7 +127,7 @@ impl Producer { } fn update_pollee(&self) { - // In theory, `rb.is_full()`/`rb.is_empty()`, where the `rb` is taken from either + // In theory, `rb.free_len()`/`rb.is_empty()`, where the `rb` is taken from either // `this_end` or `peer_end`, should reflect the same state. However, we need to take the // correct lock when updating the events to avoid races between the state check and the // event update. @@ -123,7 +136,7 @@ impl Producer { let rb = this_end.rb(); if self.is_shutdown() { // The POLLOUT event is always set in this case. Don't try to remove it. - } else if rb.is_full() { + } else if rb.free_len() < PIPE_BUF { this_end.pollee.del_events(IoEvents::OUT); } drop(rb); @@ -204,7 +217,7 @@ impl Consumer { } fn update_pollee(&self) { - // In theory, `rb.is_full()`/`rb.is_empty()`, where the `rb` is taken from either + // In theory, `rb.free_len()`/`rb.is_empty()`, where the `rb` is taken from either // `this_end` or `peer_end`, should reflect the same state. However, we need to take the // correct lock when updating the events to avoid races between the state check and the // event update. @@ -218,7 +231,7 @@ impl Consumer { let peer_end = self.peer_end(); let rb = peer_end.rb(); - if !rb.is_full() { + if rb.free_len() >= PIPE_BUF { peer_end.pollee.add_events(IoEvents::OUT); } drop(rb); @@ -307,6 +320,10 @@ impl Fifo { #[require(R > Write)] pub fn write(&self, reader: &mut dyn MultiRead) -> Result { let mut rb = self.common.producer.rb(); + if rb.free_len() < reader.sum_lens() && reader.sum_lens() <= PIPE_BUF { + // No sufficient space for an atomic write + return Ok(0); + } rb.write_fallible(reader) } }