Correct the blocking behavior

This commit is contained in:
Ruihan Li 2025-05-03 23:31:56 +08:00 committed by Tate, Hongliang Tian
parent 67065835ef
commit fe6b78058c
9 changed files with 338 additions and 82 deletions

View File

@ -3,14 +3,14 @@
use ostd::sync::SpinLock;
use crate::{
device::tty::{Tty, TtyDriver},
device::tty::{PushCharError, Tty, TtyDriver},
events::IoEvents,
prelude::{return_errno_with_message, Errno, Result},
process::signal::Pollee,
util::ring_buffer::RingBuffer,
};
const BUFFER_CAPACITY: usize = 4096;
const BUFFER_CAPACITY: usize = 8192;
/// A pseudoterminal driver.
///
@ -59,20 +59,28 @@ impl PtyDriver {
}
impl TtyDriver for PtyDriver {
fn push_output(&self, chs: &[u8]) {
fn push_output(&self, chs: &[u8]) -> core::result::Result<usize, PushCharError> {
let mut output = self.output.lock();
let mut len = 0;
for ch in chs {
// TODO: This is termios-specific behavior and should be part of the TTY implementation
// instead of the TTY driver implementation. See the ONLCR flag for more details.
if *ch == b'\n' {
output.push_overwrite(b'\r');
output.push_overwrite(b'\n');
continue;
if *ch == b'\n' && output.capacity() - output.len() >= 2 {
output.push(b'\r').unwrap();
output.push(b'\n').unwrap();
} else if *ch != b'\n' && !output.is_full() {
output.push(*ch).unwrap();
} else if len == 0 {
return Err(PushCharError);
} else {
break;
}
output.push_overwrite(*ch);
len += 1;
}
self.pollee.notify(IoEvents::IN);
Ok(len)
}
fn drain_output(&self) {
@ -86,7 +94,7 @@ impl TtyDriver for PtyDriver {
move |chs| {
for ch in chs {
output.push_overwrite(*ch);
let _ = output.push(*ch);
}
if !has_notified {
@ -95,4 +103,13 @@ impl TtyDriver for PtyDriver {
}
}
}
fn can_push(&self) -> bool {
let output = self.output.lock();
output.capacity() - output.len() >= 2
}
fn notify_input(&self) {
self.pollee.notify(IoEvents::OUT);
}
}

View File

@ -51,11 +51,17 @@ impl PtyMaster {
}
fn check_io_events(&self) -> IoEvents {
let mut events = IoEvents::empty();
if self.slave().driver().buffer_len() > 0 {
IoEvents::IN | IoEvents::OUT
} else {
IoEvents::OUT
events |= IoEvents::IN;
}
if self.slave().can_push() {
events |= IoEvents::OUT;
}
events
}
}
@ -76,6 +82,7 @@ impl FileIo for PtyMaster {
self.slave.driver().try_read(&mut buf)
})?;
self.slave.driver().pollee().invalidate();
self.slave.notify_output();
// TODO: Confirm what we should do if `write_fallible` fails in the middle.
writer.write_fallible(&mut buf[..read_len].into())?;
@ -86,8 +93,12 @@ impl FileIo for PtyMaster {
let mut buf = vec![0u8; reader.remain().min(IO_CAPACITY)];
let write_len = reader.read_fallible(&mut buf.as_mut_slice().into())?;
self.slave.push_input(&buf[..write_len]);
Ok(write_len)
// TODO: Add support for non-blocking mode and timeout
let len = self.wait_events(IoEvents::OUT, None, || {
Ok(self.slave.push_input(&buf[..write_len])?)
})?;
self.slave.driver().pollee().invalidate();
Ok(len)
}
fn ioctl(&self, cmd: IoctlCmd, arg: usize) -> Result<i32> {

View File

@ -1,5 +1,17 @@
// SPDX-License-Identifier: MPL-2.0
use crate::prelude::{Errno, Error};
/// An error indicating that no characters can be pushed because the buffer is full.
#[derive(Debug, Clone, Copy)]
pub struct PushCharError;
impl From<PushCharError> for Error {
fn from(_value: PushCharError) -> Self {
Error::with_message(Errno::EAGAIN, "the buffer is full")
}
}
/// A TTY driver.
///
/// A driver exposes some device-specific behavior to [`Tty`]. For example, a device provides
@ -10,7 +22,10 @@
/// [`Tty`]: super::Tty
pub trait TtyDriver: Send + Sync + 'static {
/// Pushes characters into the output buffer.
fn push_output(&self, chs: &[u8]);
///
/// This method returns the number of bytes pushed or fails with an error if no bytes can be
/// pushed because the buffer is full.
fn push_output(&self, chs: &[u8]) -> core::result::Result<usize, PushCharError>;
/// Drains the output buffer.
fn drain_output(&self);
@ -20,4 +35,17 @@ pub trait TtyDriver: Send + Sync + 'static {
/// Note that the implementation may choose to hold a lock during the life of the callback.
/// During this time, calls to other methods such as [`Self::push_output`] may cause deadlocks.
fn echo_callback(&self) -> impl FnMut(&[u8]) + '_;
/// Returns whether new characters can be pushed into the output buffer.
///
/// This method should return `false` if the output buffer is full.
fn can_push(&self) -> bool;
/// Notifies that the input buffer now has room for new characters.
///
/// This method should be called when the state of [`Tty::can_push`] changes from `false` to
/// `true`.
///
/// [`Tty::can_push`]: super::Tty::can_push
fn notify_input(&self);
}

View File

@ -1,6 +1,11 @@
// SPDX-License-Identifier: MPL-2.0
use super::termio::{KernelTermios, WinSize, CC_C_CHAR};
use ostd::const_assert;
use super::{
termio::{KernelTermios, WinSize, CC_C_CHAR},
PushCharError,
};
use crate::{
prelude::*,
process::signal::{
@ -10,54 +15,71 @@ use crate::{
util::ring_buffer::RingBuffer,
};
// This implementation refers the implementation of linux
// https://elixir.bootlin.com/linux/latest/source/include/linux/tty_ldisc.h
// This implementation references the implementation of Linux:
// <https://elixir.bootlin.com/linux/latest/source/include/linux/tty_ldisc.h>
const BUFFER_CAPACITY: usize = 4096;
const LINE_CAPACITY: usize = 4095;
const BUFFER_CAPACITY: usize = 8192;
// `LINE_CAPACITY` must be less than `BUFFER_CAPACITY`. Otherwise, `write()` can be blocked
// indefinitely if both the current line and the buffer are full, so even the line terminator won't
// be accepted.
const_assert!(LINE_CAPACITY < BUFFER_CAPACITY);
pub struct LineDiscipline {
/// Current line
current_line: CurrentLine,
/// The read buffer
/// Read buffer
read_buffer: RingBuffer<u8>,
/// Termios
termios: KernelTermios,
/// Windows size
/// Window size
winsize: WinSize,
}
pub struct CurrentLine {
buffer: RingBuffer<u8>,
struct CurrentLine {
buffer: Box<[u8]>,
len: usize,
}
impl Default for CurrentLine {
fn default() -> Self {
Self {
buffer: RingBuffer::new(BUFFER_CAPACITY),
buffer: vec![0; LINE_CAPACITY].into_boxed_slice(),
len: 0,
}
}
}
impl CurrentLine {
/// Reads all bytes inside current line and clear current line
pub fn drain(&mut self) -> Vec<u8> {
let mut ret = vec![0u8; self.buffer.len()];
self.buffer.pop_slice(ret.as_mut_slice()).unwrap();
ret
/// Pushes a character to the current line.
fn push_char(&mut self, ch: u8) {
// If the line is full, the character will be ignored, but other actions such as echoing
// and signaling will work as normal. This will never block the caller, even if the input
// comes from the pseduoterminal master.
if self.len == self.buffer.len() {
return;
}
self.buffer[self.len] = ch;
self.len += 1;
}
pub fn push_char(&mut self, char: u8) {
// What should we do if line is full?
debug_assert!(!self.is_full());
self.buffer.push_overwrite(char);
/// Clears the current line and returns the bytes in it.
fn drain(&mut self) -> &[u8] {
let chs = &self.buffer[..self.len];
self.len = 0;
chs
}
pub fn backspace(&mut self) {
let _ = self.buffer.pop();
/// Removes the last character, if it is present.
fn backspace(&mut self) {
self.len = self.len.saturating_sub(1);
}
pub fn is_full(&self) -> bool {
self.buffer.is_full()
/// Returns the number of characters in the current line.
fn len(&self) -> usize {
self.len
}
}
@ -78,7 +100,7 @@ impl LineDiscipline {
ch: u8,
mut signal_callback: F1,
echo_callback: F2,
) {
) -> core::result::Result<(), PushCharError> {
let ch = if self.termios.contains_icrnl() && ch == b'\r' {
b'\n'
} else {
@ -96,10 +118,18 @@ impl LineDiscipline {
self.output_char(ch, echo_callback);
}
if self.is_full() {
// If the buffer is full, we should not push the character into the buffer. The caller
// can silently ignore the error (if the input comes from the keyboard) or block the
// user space (if the input comes from the pseduoterminal master).
return Err(PushCharError);
}
// Raw mode
if !self.termios.is_canonical_mode() {
self.read_buffer.push_overwrite(ch);
return;
// Note that `unwrap()` below won't fail because we checked `is_full()` above.
self.read_buffer.push(ch).unwrap();
return Ok(());
}
// Canonical mode
@ -116,16 +146,19 @@ impl LineDiscipline {
if is_line_terminator(ch, &self.termios) {
// A new line is met. Move all bytes in `current_line` to `read_buffer`.
self.current_line.push_char(ch);
// Note that `unwrap()` below won't fail because we checked `is_full()` above.
for line_ch in self.current_line.drain() {
self.read_buffer.push_overwrite(line_ch);
self.read_buffer.push(*line_ch).unwrap();
}
self.read_buffer.push(ch).unwrap();
}
if is_printable_char(ch) {
// Printable character
self.current_line.push_char(ch);
}
Ok(())
}
// TODO: respect output flags
@ -190,14 +223,6 @@ impl LineDiscipline {
Ok(dst.len())
}
pub fn termios(&self) -> &KernelTermios {
&self.termios
}
pub fn set_termios(&mut self, termios: KernelTermios) {
self.termios = termios;
}
pub fn drain_input(&mut self) {
self.current_line.drain();
self.read_buffer.clear();
@ -207,6 +232,18 @@ impl LineDiscipline {
self.read_buffer.len()
}
pub fn is_full(&self) -> bool {
self.read_buffer.len() + self.current_line.len() >= self.read_buffer.capacity()
}
pub fn termios(&self) -> &KernelTermios {
&self.termios
}
pub fn set_termios(&mut self, termios: KernelTermios) {
self.termios = termios;
}
pub fn window_size(&self) -> WinSize {
self.winsize
}

View File

@ -26,7 +26,7 @@ mod n_tty;
mod termio;
pub use device::TtyDevice;
pub use driver::TtyDriver;
pub use driver::{PushCharError, TtyDriver};
pub(super) use n_tty::init;
pub use n_tty::{iter_n_tty, system_console};
@ -81,22 +81,34 @@ impl<D> Tty<D> {
&self.driver
}
fn check_io_events(&self) -> IoEvents {
if self.ldisc.lock().buffer_len() != 0 {
IoEvents::IN | IoEvents::OUT
} else {
IoEvents::OUT
}
/// Returns whether new characters can be pushed into the input buffer.
///
/// This method should return `false` if the input buffer is full.
pub fn can_push(&self) -> bool {
!self.ldisc.lock().is_full()
}
/// Notifies that the output buffer now has room for new characters.
///
/// This method should be called when the state of [`TtyDriver::can_push`] changes from `false`
/// to `true`.
pub fn notify_output(&self) {
self.pollee.notify(IoEvents::OUT);
}
}
impl<D: TtyDriver> Tty<D> {
pub fn push_input(&self, chs: &[u8]) {
/// Pushes characters into the output buffer.
///
/// This method returns the number of bytes pushed or fails with an error if no bytes can be
/// pushed because the buffer is full.
pub fn push_input(&self, chs: &[u8]) -> core::result::Result<usize, PushCharError> {
let mut ldisc = self.ldisc.lock();
let mut echo = self.driver.echo_callback();
let mut len = 0;
for ch in chs {
ldisc.push_char(
let res = ldisc.push_char(
*ch,
|signum| {
if let Some(foreground) = self.job_control.foreground() {
@ -105,12 +117,35 @@ impl<D: TtyDriver> Tty<D> {
},
&mut echo,
);
if res.is_err() && len == 0 {
return Err(PushCharError);
} else if res.is_err() {
break;
} else {
len += 1;
}
}
self.pollee.notify(IoEvents::IN);
Ok(len)
}
fn check_io_events(&self) -> IoEvents {
let mut events = IoEvents::empty();
if self.ldisc.lock().buffer_len() > 0 {
events |= IoEvents::IN;
}
if self.driver.can_push() {
events |= IoEvents::OUT;
}
events
}
}
impl<D> Pollable for Tty<D> {
impl<D: TtyDriver> Pollable for Tty<D> {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.pollee
.poll_with(mask, poller, || self.check_io_events())
@ -126,6 +161,7 @@ impl<D: TtyDriver> FileIo for Tty<D> {
let read_len =
self.wait_events(IoEvents::IN, None, || self.ldisc.lock().try_read(&mut buf))?;
self.pollee.invalidate();
self.driver.notify_input();
// TODO: Confirm what we should do if `write_fallible` fails in the middle.
writer.write_fallible(&mut buf[..read_len].into())?;
@ -136,8 +172,12 @@ impl<D: TtyDriver> FileIo for Tty<D> {
let mut buf = vec![0u8; reader.remain().min(IO_CAPACITY)];
let write_len = reader.read_fallible(&mut buf.as_mut_slice().into())?;
self.driver.push_output(&buf[..write_len]);
Ok(write_len)
// TODO: Add support for non-blocking mode and timeout
let len = self.wait_events(IoEvents::OUT, None, || {
Ok(self.driver.push_output(&buf[..write_len])?)
})?;
self.pollee.invalidate();
Ok(len)
}
fn ioctl(&self, cmd: IoctlCmd, arg: usize) -> Result<i32> {

View File

@ -6,15 +6,16 @@ use aster_console::AnyConsoleDevice;
use ostd::mm::{Infallible, VmReader, VmWriter};
use spin::Once;
use super::{Tty, TtyDriver};
use super::{PushCharError, Tty, TtyDriver};
pub struct ConsoleDriver {
console: Arc<dyn AnyConsoleDevice>,
}
impl TtyDriver for ConsoleDriver {
fn push_output(&self, chs: &[u8]) {
fn push_output(&self, chs: &[u8]) -> core::result::Result<usize, PushCharError> {
self.console.send(chs);
Ok(chs.len())
}
fn drain_output(&self) {}
@ -22,6 +23,12 @@ impl TtyDriver for ConsoleDriver {
fn echo_callback(&self) -> impl FnMut(&[u8]) + '_ {
|chs| self.console.send(chs)
}
fn can_push(&self) -> bool {
true
}
fn notify_input(&self) {}
}
static N_TTY: Once<Box<[Arc<Tty<ConsoleDriver>>]>> = Once::new();
@ -59,7 +66,7 @@ fn create_n_tty(index: u32, device: Arc<dyn AnyConsoleDevice>) -> Arc<Tty<Consol
move |mut reader: VmReader<Infallible>| {
let mut chs = vec![0u8; reader.remain()];
reader.read(&mut VmWriter::from(chs.as_mut_slice()));
tty.push_input(chs.as_slice());
let _ = tty.push_input(chs.as_slice());
},
)));

View File

@ -156,16 +156,6 @@ impl<T: Pod> RingBuffer<T> {
producer.push_slice(items)
}
/// Pushes an item to the ring buffer. The next item
/// will be overwritten if the buffer is full.
///
/// Returns the overwritten item if any.
pub fn push_overwrite(&mut self, item: T) -> Option<T> {
let ret = if self.is_full() { self.pop() } else { None };
self.push(item).unwrap();
ret
}
/// Pops an item from the `RingBuffer`.
///
/// Returns `Some` with the popped item on success.
@ -467,18 +457,14 @@ mod test {
rb.push_slice(&[i32::MAX, 1, -2, 100]).unwrap();
assert!(rb.is_full());
let popped = rb.push_overwrite(i32::MIN);
let popped = rb.pop();
assert_eq!(popped, Some(i32::MAX));
assert!(rb.is_full());
assert_eq!(rb.free_len(), 1);
let mut popped = [0i32; 3];
rb.pop_slice(&mut popped).unwrap();
assert_eq!(popped, [1i32, -2, 100]);
assert_eq!(rb.free_len(), 3);
let popped = rb.pop().unwrap();
assert_eq!(popped, i32::MIN);
assert!(rb.is_empty());
assert_eq!(rb.free_len(), 4);
}
#[ktest]

View File

@ -0,0 +1,129 @@
// SPDX-License-Identifier: MPL-2.0
#include "../network/test.h"
#include <pthread.h>
#include <pty.h>
#include <unistd.h>
#include <sys/ioctl.h>
static int master, slave;
FN_SETUP(openpty)
{
CHECK(openpty(&master, &slave, NULL, NULL, NULL));
}
END_SETUP()
static int write_repeat(int fd, char c, size_t n)
{
while (n--)
if (write(fd, &c, 1) < 0)
return -1;
return 0;
}
static int read_repeat(int fd, char c, size_t n)
{
char c2;
while (n--)
if (read(fd, &c2, 1) != 1 || c2 != c)
return -1;
return 0;
}
static int read_all(int fd, char c)
{
int n;
for (;;) {
if (ioctl(fd, FIONREAD, &n) < 0)
return -1;
if (n == 0)
return 0;
if (read_repeat(fd, c, n) < 0)
return -1;
}
}
#define PTR_VOID_FALSE ((void *)0)
#define PTR_VOID_TRUE ((void *)1)
// Tests that `block` will block the thread until `unblock` is called.
#define DECLARE_BLOCKING_TEST(name, block, unblock) \
void *blocking_##name(void *is_child) \
{ \
static _Atomic int started = 0; \
static _Atomic int ended = 0; \
\
if (!is_child) { \
started = 1; \
block; \
\
if (ended != 1) \
return PTR_VOID_FALSE; \
} else { \
usleep(100 * 1000); \
if (started != 1) \
return PTR_VOID_FALSE; \
\
ended = 1; \
unblock; \
} \
\
return PTR_VOID_TRUE; \
}
#define RUN_BLOCKING_TEST(name) \
{ \
pthread_t __thd; \
void *__res; \
TEST_SUCC(pthread_create(&__thd, NULL, blocking_##name, \
PTR_VOID_TRUE)); \
TEST_SUCC(blocking_##name(PTR_VOID_FALSE) == PTR_VOID_TRUE ? \
0 : \
-1); \
TEST_RES(pthread_join(__thd, &__res), __res == PTR_VOID_TRUE); \
}
DECLARE_BLOCKING_TEST(write_slave, write_repeat(slave, 'a', 1),
read_all(master, 'a'));
DECLARE_BLOCKING_TEST(read_master, read_repeat(master, 'a', 1),
write_repeat(slave, 'a', 1));
DECLARE_BLOCKING_TEST(read_slave, read_repeat(slave, 'a', 1),
write_repeat(master, '\n', 1));
FN_TEST(pty_blocking)
{
// Write many characters to overflow the line buffer. As documented in the man pages, the
// line buffer can hold a maximum of 4095 characters, not including the line terminator.
// Additional characters will not be queued, but signals and echoes will still work.
// Therefore, the echoed characters will cause the output buffer to overflow.
TEST_SUCC(write_repeat(master, 'a', 128 * 1024));
// Since the output buffer is overflowing, writing characters to it should block. In Linux,
// reading one character from the buffer does not unblock the writer, which is rather odd.
// So here we test that reading all characters from the buffer should unblock the printer.
RUN_BLOCKING_TEST(write_slave);
TEST_SUCC(read_all(master, 'a'));
// Now that the output buffer is empty, reading characters from it should block. Writing a
// character to the buffer should unblock the reader.
RUN_BLOCKING_TEST(read_master);
// The input buffer is empty because all characters are in the line buffer until a line
// terminator is seen. So reading characters from the input buffer should block. Writing a
// line character should move all characters in the line buffer into the input buffer and
// unblock the reader.
RUN_BLOCKING_TEST(read_slave);
TEST_SUCC(read_repeat(slave, 'a', 4094));
TEST_SUCC(read_repeat(slave, '\n', 1));
// TODO: This test does not cover cases in which the input buffer overflows. Reliably
// constructing that state is difficult without first knowing the size of the input buffer.
}
END_TEST()

View File

@ -35,6 +35,7 @@ process/group_session
process/job_control
pthread/pthread_test
pty/open_pty
pty/pty_blocking
sched/sched_attr
shm/posix_shm
signal_c/parent_death_signal