Fix many error codes in pipes

This commit is contained in:
Ruihan Li
2024-07-01 02:34:55 +08:00
committed by Tate, Hongliang Tian
parent 8e72451448
commit 39cd4420f2
7 changed files with 241 additions and 49 deletions

View File

@ -18,11 +18,11 @@ use crate::{
/// The basic operations defined on a file /// The basic operations defined on a file
pub trait FileLike: Pollable + Send + Sync + Any { pub trait FileLike: Pollable + Send + Sync + Any {
fn read(&self, buf: &mut [u8]) -> Result<usize> { fn read(&self, buf: &mut [u8]) -> Result<usize> {
return_errno_with_message!(Errno::EINVAL, "read is not supported"); return_errno_with_message!(Errno::EBADF, "the file is not valid for reading");
} }
fn write(&self, buf: &[u8]) -> Result<usize> { fn write(&self, buf: &[u8]) -> Result<usize> {
return_errno_with_message!(Errno::EINVAL, "write is not supported"); return_errno_with_message!(Errno::EBADF, "the file is not valid for writing");
} }
/// Read at the given file offset. /// Read at the given file offset.

View File

@ -121,7 +121,9 @@ impl<T> Producer<T> {
let this_end = self.this_end(); let this_end = self.this_end();
let rb = this_end.rb(); let rb = this_end.rb();
if rb.is_full() { if self.is_shutdown() || self.is_peer_shutdown() {
// The POLLOUT event is always set in this case. Don't try to remove it.
} else if rb.is_full() {
this_end.pollee.del_events(IoEvents::OUT); this_end.pollee.del_events(IoEvents::OUT);
} }
drop(rb); drop(rb);
@ -148,27 +150,28 @@ impl<T: Copy> Producer<T> {
if self.is_nonblocking() { if self.is_nonblocking() {
self.try_write(buf) self.try_write(buf)
} else { } else {
// The POLLOUT event is set after shutdown, so waiting for the single event is enough.
self.wait_events(IoEvents::OUT, || self.try_write(buf)) self.wait_events(IoEvents::OUT, || self.try_write(buf))
} }
} }
fn try_write(&self, buf: &[T]) -> Result<usize> { fn try_write(&self, buf: &[T]) -> Result<usize> {
if self.is_shutdown() || self.is_peer_shutdown() {
return_errno!(Errno::EPIPE);
}
if buf.is_empty() { if buf.is_empty() {
// Even after shutdown, writing an empty buffer is still fine.
return Ok(0); return Ok(0);
} }
let written_len = self.0.write(buf); if self.is_shutdown() || self.is_peer_shutdown() {
return_errno_with_message!(Errno::EPIPE, "the channel is shut down");
}
let written_len = self.0.write(buf);
self.update_pollee(); self.update_pollee();
if written_len > 0 { if written_len > 0 {
Ok(written_len) Ok(written_len)
} else { } else {
return_errno_with_message!(Errno::EAGAIN, "try write later"); return_errno_with_message!(Errno::EAGAIN, "the channel is full");
} }
} }
} }
@ -185,6 +188,7 @@ impl<T> Producer<T> {
let mut stored_item = Some(item); let mut stored_item = Some(item);
// The POLLOUT event is set after shutdown, so waiting for the single event is enough.
let result = self.wait_events(IoEvents::OUT, || { let result = self.wait_events(IoEvents::OUT, || {
match self.try_push(stored_item.take().unwrap()) { match self.try_push(stored_item.take().unwrap()) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -203,16 +207,16 @@ impl<T> Producer<T> {
fn try_push(&self, item: T) -> core::result::Result<(), (Error, T)> { fn try_push(&self, item: T) -> core::result::Result<(), (Error, T)> {
if self.is_shutdown() || self.is_peer_shutdown() { if self.is_shutdown() || self.is_peer_shutdown() {
let err = Error::with_message(Errno::EPIPE, "the pipe is shutdown"); let err = Error::with_message(Errno::EPIPE, "the channel is shut down");
return Err((err, item)); return Err((err, item));
} }
self.0.push(item).map_err(|item| { self.0.push(item).map_err(|item| {
let err = Error::with_message(Errno::EAGAIN, "try push again"); let err = Error::with_message(Errno::EAGAIN, "the channel is full");
(err, item) (err, item)
})?; })?;
self.update_pollee(); self.update_pollee();
Ok(()) Ok(())
} }
} }
@ -221,8 +225,9 @@ impl<T> Drop for Producer<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(); self.shutdown();
// When reading from a channel such as a pipe or a stream socket, // The POLLHUP event indicates that the write end is shut down.
// POLLHUP merely indicates that the peer closed its end of the channel. //
// No need to take a lock. There is no race because no one is modifying this particular event.
self.peer_end().pollee.add_events(IoEvents::HUP); self.peer_end().pollee.add_events(IoEvents::HUP);
} }
} }
@ -271,62 +276,57 @@ impl<T: Copy> Consumer<T> {
if self.is_nonblocking() { if self.is_nonblocking() {
self.try_read(buf) self.try_read(buf)
} else { } else {
// The POLLHUP event is in `IoEvents::ALWAYS_POLL`, which is not specified again.
self.wait_events(IoEvents::IN, || self.try_read(buf)) self.wait_events(IoEvents::IN, || self.try_read(buf))
} }
} }
fn try_read(&self, buf: &mut [T]) -> Result<usize> { fn try_read(&self, buf: &mut [T]) -> Result<usize> {
if self.is_shutdown() {
return_errno!(Errno::EPIPE);
}
if buf.is_empty() { if buf.is_empty() {
return Ok(0); return Ok(0);
} }
// This must be recorded before the actual operation to avoid race conditions.
let is_shutdown = self.is_shutdown() || self.is_peer_shutdown();
let read_len = self.0.read(buf); let read_len = self.0.read(buf);
self.update_pollee(); self.update_pollee();
if self.is_peer_shutdown() {
return Ok(read_len);
}
if read_len > 0 { if read_len > 0 {
Ok(read_len) Ok(read_len)
} else if is_shutdown {
Ok(0)
} else { } else {
return_errno_with_message!(Errno::EAGAIN, "try read later"); return_errno_with_message!(Errno::EAGAIN, "the channel is empty");
} }
} }
} }
impl<T> Consumer<T> { impl<T> Consumer<T> {
/// Pops an item from the consumer /// Pops an item from the consumer.
pub fn pop(&self) -> Result<T> { pub fn pop(&self) -> Result<Option<T>> {
if self.is_nonblocking() { if self.is_nonblocking() {
self.try_pop() self.try_pop()
} else { } else {
// The POLLHUP event is in `IoEvents::ALWAYS_POLL`, which is not specified again.
self.wait_events(IoEvents::IN, || self.try_pop()) self.wait_events(IoEvents::IN, || self.try_pop())
} }
} }
fn try_pop(&self) -> Result<T> { fn try_pop(&self) -> Result<Option<T>> {
if self.is_shutdown() { // This must be recorded before the actual operation to avoid race conditions.
return_errno_with_message!(Errno::EPIPE, "this end is shut down"); let is_shutdown = self.is_shutdown() || self.is_peer_shutdown();
}
let item = self.0.pop(); let item = self.0.pop();
self.update_pollee(); self.update_pollee();
if let Some(item) = item { if let Some(item) = item {
return Ok(item); Ok(Some(item))
} else if is_shutdown {
Ok(None)
} else {
return_errno_with_message!(Errno::EAGAIN, "the channel is empty")
} }
if self.is_peer_shutdown() {
return_errno_with_message!(Errno::EPIPE, "remote end is shut down");
}
return_errno_with_message!(Errno::EAGAIN, "try pop again")
} }
} }
@ -334,9 +334,14 @@ impl<T> Drop for Consumer<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(); self.shutdown();
// POLLERR is also set for a file descriptor referring to the write end of a pipe // The POLLERR event indicates that the read end is closed (so any subsequent writes will
// when the read end has been closed. // fail with an `EPIPE` error).
self.peer_end().pollee.add_events(IoEvents::ERR); //
// The lock is taken because we are also adding the POLLOUT event, which may have races
// with the event updates triggered by the writer.
let peer_end = self.peer_end();
let _rb = peer_end.rb();
peer_end.pollee.add_events(IoEvents::ERR | IoEvents::OUT);
} }
} }
@ -396,7 +401,7 @@ impl<T> Common<T> {
check_status_flags(flags)?; check_status_flags(flags)?;
if capacity == 0 { if capacity == 0 {
return_errno_with_message!(Errno::EINVAL, "capacity cannot be zero"); return_errno_with_message!(Errno::EINVAL, "the channel capacity cannot be zero");
} }
let rb: HeapRb<T> = HeapRb::new(capacity); let rb: HeapRb<T> = HeapRb::new(capacity);
@ -456,12 +461,17 @@ impl<T> EndPointInner<T> {
fn check_status_flags(flags: StatusFlags) -> Result<()> { fn check_status_flags(flags: StatusFlags) -> Result<()> {
let valid_flags: StatusFlags = StatusFlags::O_NONBLOCK | StatusFlags::O_DIRECT; let valid_flags: StatusFlags = StatusFlags::O_NONBLOCK | StatusFlags::O_DIRECT;
if !valid_flags.contains(flags) { if !valid_flags.contains(flags) {
return_errno_with_message!(Errno::EINVAL, "invalid flags"); // FIXME: Linux seems to silently ignore invalid flags. See
// <https://man7.org/linux/man-pages/man2/fcntl.2.html>.
return_errno_with_message!(Errno::EINVAL, "the flags are invalid");
} }
if flags.contains(StatusFlags::O_DIRECT) { if flags.contains(StatusFlags::O_DIRECT) {
return_errno_with_message!(Errno::EINVAL, "O_DIRECT is not supported"); return_errno_with_message!(Errno::EINVAL, "the `O_DIRECT` flag is not supported");
} }
Ok(()) Ok(())
} }
@ -489,7 +499,7 @@ mod test {
} }
for _ in 0..3 { for _ in 0..3 {
let data = consumer.pop().unwrap(); let data = consumer.pop().unwrap().unwrap();
assert_eq!(data, expected_data); assert_eq!(data, expected_data);
} }
} }

View File

@ -20,10 +20,6 @@ pub fn sys_write(fd: FileDesc, user_buf_ptr: Vaddr, user_buf_len: usize) -> Resu
file_table.get_file(fd)?.clone() file_table.get_file(fd)?.clone()
}; };
if user_buf_len == 0 {
return Ok(SyscallReturn::Return(0));
}
let mut buffer = vec![0u8; user_buf_len]; let mut buffer = vec![0u8; user_buf_len];
read_bytes_from_user(user_buf_ptr, &mut VmWriter::from(buffer.as_mut_slice()))?; read_bytes_from_user(user_buf_ptr, &mut VmWriter::from(buffer.as_mut_slice()))?;
debug!("write content = {:?}", buffer); debug!("write content = {:?}", buffer);

View File

@ -29,6 +29,7 @@ TEST_APPS := \
mmap \ mmap \
mongoose \ mongoose \
network \ network \
pipe \
pthread \ pthread \
pty \ pty \
signal_c \ signal_c \

5
test/apps/pipe/Makefile Normal file
View File

@ -0,0 +1,5 @@
# SPDX-License-Identifier: MPL-2.0
include ../test_common.mk
EXTRA_C_FLAGS :=

178
test/apps/pipe/pipe_err.c Normal file
View File

@ -0,0 +1,178 @@
// SPDX-License-Identifier: MPL-2.0
#define _GNU_SOURCE
#include "../network/test.h"
#include <signal.h>
#include <string.h>
#include <sys/poll.h>
#include <unistd.h>
FN_SETUP()
{
signal(SIGPIPE, SIG_IGN);
}
END_SETUP()
FN_TEST(close_without_data_then_read)
{
int fildes[2];
char buf[8] = { 0 };
CHECK(pipe(fildes));
TEST_SUCC(close(fildes[1]));
TEST_RES(read(fildes[0], buf, sizeof(buf)), _ret == 0);
TEST_ERRNO(write(fildes[0], buf, sizeof(buf)), EBADF);
TEST_RES(read(fildes[0], buf, 0), _ret == 0);
TEST_ERRNO(write(fildes[0], buf, 0), EBADF);
TEST_SUCC(close(fildes[0]));
}
END_TEST()
FN_TEST(close_without_data_then_write)
{
int fildes[2];
char buf[8] = { 0 };
CHECK(pipe(fildes));
TEST_SUCC(close(fildes[0]));
TEST_ERRNO(read(fildes[1], buf, sizeof(buf)), EBADF);
TEST_ERRNO(write(fildes[1], buf, sizeof(buf)), EPIPE);
TEST_ERRNO(read(fildes[1], buf, 0), EBADF);
TEST_RES(write(fildes[1], buf, 0), _ret == 0);
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_TEST(close_with_data_then_read)
{
int fildes[2];
char buf[8] = { 0 };
CHECK(pipe(fildes));
TEST_RES(write(fildes[1], "hello", 5), _ret == 5);
TEST_SUCC(close(fildes[1]));
TEST_RES(read(fildes[0], buf, 2),
_ret == 2 && strncmp(buf, "he", 2) == 0);
TEST_RES(read(fildes[0], buf, sizeof(buf)),
_ret == 3 && strncmp(buf, "llo", 3) == 0);
TEST_RES(read(fildes[0], buf, sizeof(buf)), _ret == 0);
TEST_ERRNO(write(fildes[0], buf, sizeof(buf)), EBADF);
TEST_RES(read(fildes[0], buf, 0), _ret == 0);
TEST_ERRNO(write(fildes[0], buf, 0), EBADF);
TEST_SUCC(close(fildes[0]));
}
END_TEST()
FN_TEST(close_with_data_then_write)
{
int fildes[2];
char buf[8] = { 0 };
CHECK(pipe(fildes));
TEST_RES(write(fildes[1], "hello", 5), _ret == 5);
TEST_SUCC(close(fildes[0]));
TEST_ERRNO(read(fildes[1], buf, sizeof(buf)), EBADF);
TEST_ERRNO(write(fildes[1], buf, sizeof(buf)), EPIPE);
TEST_ERRNO(read(fildes[1], buf, 0), EBADF);
TEST_RES(write(fildes[1], buf, 0), _ret == 0);
TEST_SUCC(close(fildes[1]));
}
END_TEST()
#define POLL_MASK (POLLIN | POLLOUT | POLLHUP | POLLERR)
FN_TEST(poll_basic)
{
int fildes[2];
char buf[8];
struct pollfd pfd = { .events = POLL_MASK };
CHECK(pipe(fildes));
pfd.fd = fildes[0];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == 0);
pfd.fd = fildes[1];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == POLLOUT);
TEST_RES(write(fildes[1], "hello", 5), _ret == 5);
pfd.fd = fildes[0];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == POLLIN);
pfd.fd = fildes[1];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == POLLOUT);
TEST_RES(read(fildes[0], buf, sizeof(buf)), _ret == 5);
pfd.fd = fildes[0];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == 0);
pfd.fd = fildes[1];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == POLLOUT);
TEST_SUCC(close(fildes[0]));
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_TEST(close_first_then_poll)
{
int fildes[2];
struct pollfd pfd = { .events = POLLIN | POLLOUT };
CHECK(pipe(fildes));
TEST_RES(write(fildes[1], "hello", 5), _ret == 5);
TEST_SUCC(close(fildes[0]));
pfd.fd = fildes[1];
TEST_RES(poll(&pfd, 1, 0),
(pfd.revents & POLL_MASK) == (POLLOUT | POLLERR));
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_TEST(close_second_then_poll)
{
int fildes[2];
char buf[8];
struct pollfd pfd = { .events = POLLIN | POLLOUT };
CHECK(pipe(fildes));
TEST_RES(write(fildes[1], "hello", 5), _ret == 5);
TEST_SUCC(close(fildes[1]));
pfd.fd = fildes[0];
TEST_RES(poll(&pfd, 1, 0),
(pfd.revents & POLL_MASK) == (POLLIN | POLLHUP));
TEST_RES(read(fildes[0], buf, sizeof(buf)),
_ret == 5 && strncmp(buf, "hello", 5) == 0);
pfd.fd = fildes[0];
TEST_RES(poll(&pfd, 1, 0), (pfd.revents & POLL_MASK) == POLLHUP);
TEST_SUCC(close(fildes[0]));
}
END_TEST()

View File

@ -59,4 +59,6 @@ echo "All ext2 fs test passed."
echo "Start fdatasync test......" echo "Start fdatasync test......"
test_fdatasync test_fdatasync
echo "All fdatasync test passed." echo "All fdatasync test passed."
pipe/pipe_err