Use more wait_events

This commit is contained in:
Ruihan Li 2024-09-02 14:54:46 +08:00 committed by Tate, Hongliang Tian
parent bbfc2cd12d
commit f21394c679
3 changed files with 51 additions and 74 deletions

View File

@ -89,6 +89,19 @@ impl PtyMaster {
self.output.buffer_len()
}
fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
let mut input = self.input.disable_irq().lock();
if input.is_empty() {
return_errno_with_message!(Errno::EAGAIN, "the buffer is empty");
}
let read_len = input.read_fallible(writer)?;
self.update_state(&input);
Ok(read_len)
}
fn update_state(&self, buf: &RingBuffer<u8>) {
if buf.is_empty() {
self.pollee.del_events(IoEvents::IN)
@ -120,35 +133,12 @@ impl Pollable for PtyMaster {
impl FileIo for PtyMaster {
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
let read_len = writer.avail();
// TODO: deal with nonblocking read
if read_len == 0 {
if !writer.has_avail() {
return Ok(0);
}
let mut poller = Poller::new();
loop {
let mut input = self.input.disable_irq().lock();
if input.is_empty() {
let events = self.pollee.poll(IoEvents::IN, Some(&mut poller));
if events.contains(IoEvents::ERR) {
return_errno_with_message!(Errno::EACCES, "unexpected err");
}
if events.is_empty() {
drop(input);
// FIXME: deal with pty read timeout
poller.wait()?;
}
continue;
}
let read_len = input.read_fallible(writer)?;
self.update_state(&input);
return Ok(read_len);
}
// TODO: deal with nonblocking and timeout
self.wait_events(IoEvents::IN, || self.try_read(writer))
}
fn write(&self, reader: &mut VmReader) -> Result<usize> {

View File

@ -16,7 +16,7 @@ use crate::{
process::signal::{
constants::{SIGINT, SIGQUIT},
signals::kernel::KernelSignal,
Pollee, Poller,
Pollable, Pollee, Poller,
},
thread::work_queue::{submit_work_item, work_item::WorkItem, WorkPriority},
util::ring_buffer::RingBuffer,
@ -87,6 +87,12 @@ impl CurrentLine {
}
}
impl Pollable for LineDiscipline {
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
}
impl LineDiscipline {
/// Creates a new line discipline
pub fn new(send_signal: LdiscSignalSender) -> Arc<Self> {
@ -229,19 +235,7 @@ impl LineDiscipline {
}
pub fn read(&self, buf: &mut [u8]) -> Result<usize> {
loop {
let res = self.try_read(buf);
match res {
Ok(len) => return Ok(len),
Err(e) if e.error() != Errno::EAGAIN => return Err(e),
Err(_) => {
let mut poller = Poller::new();
if self.poll(IoEvents::IN, Some(&mut poller)).is_empty() {
poller.wait()?
}
}
}
}
self.wait_events(IoEvents::IN, || self.try_read(buf))
}
/// Reads all bytes buffered to `dst`.
@ -275,10 +269,6 @@ impl LineDiscipline {
Ok(read_len)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
self.pollee.poll(mask, poller)
}
/// Reads bytes from `self` to `dst`, returning the actual bytes read.
///
/// If no bytes are available, this method returns 0 immediately.

View File

@ -130,6 +130,28 @@ impl EventFile {
// TODO: deal with overflow logic
}
fn try_read(&self, writer: &mut VmWriter) -> Result<()> {
let mut counter = self.counter.lock();
// Wait until the counter becomes non-zero
if *counter == 0 {
return_errno_with_message!(Errno::EAGAIN, "the counter is zero");
}
// Copy value from counter, and set the new counter value
if self.flags.lock().contains(Flags::EFD_SEMAPHORE) {
writer.write_fallible(&mut 1u64.as_bytes().into())?;
*counter -= 1;
} else {
writer.write_fallible(&mut (*counter).as_bytes().into())?;
*counter = 0;
}
self.update_io_state(&counter);
Ok(())
}
/// Adds val to the counter.
///
/// If the new_value is overflowed or exceeds MAX_COUNTER_VALUE, the counter value
@ -160,40 +182,15 @@ impl Pollable for EventFile {
impl FileLike for EventFile {
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
let read_len = core::mem::size_of::<u64>();
if writer.avail() < read_len {
return_errno_with_message!(Errno::EINVAL, "buf len is less len u64 size");
}
loop {
let mut counter = self.counter.lock();
// Wait until the counter becomes non-zero
if *counter == 0 {
if self.is_nonblocking() {
return_errno_with_message!(Errno::EAGAIN, "try reading event file again");
}
self.update_io_state(&counter);
drop(counter);
let mut poller = Poller::new();
if self.pollee.poll(IoEvents::IN, Some(&mut poller)).is_empty() {
poller.wait()?;
}
continue;
}
// Copy value from counter, and set the new counter value
if self.flags.lock().contains(Flags::EFD_SEMAPHORE) {
writer.write_fallible(&mut 1u64.as_bytes().into())?;
*counter -= 1;
} else {
writer.write_fallible(&mut (*counter).as_bytes().into())?;
*counter = 0;
}
self.update_io_state(&counter);
break;
if self.is_nonblocking() {
self.try_read(writer)?;
} else {
self.wait_events(IoEvents::IN, || self.try_read(writer))?;
}
Ok(read_len)