Update line discipline pollee immediately

This commit is contained in:
jiangjianfeng
2024-10-14 06:57:30 +00:00
committed by Tate, Hongliang Tian
parent e32687e6d5
commit 002a67451d

View File

@ -4,7 +4,10 @@
use alloc::format; use alloc::format;
use ostd::trap::{disable_local, in_interrupt_context}; use ostd::{
sync::LocalIrqDisabled,
trap::{disable_local, in_interrupt_context},
};
use super::termio::{KernelTermios, WinSize, CC_C_CHAR}; use super::termio::{KernelTermios, WinSize, CC_C_CHAR};
use crate::{ use crate::{
@ -27,22 +30,22 @@ const BUFFER_CAPACITY: usize = 4096;
pub type LdiscSignalSender = Arc<dyn Fn(KernelSignal) + Send + Sync + 'static>; pub type LdiscSignalSender = Arc<dyn Fn(KernelSignal) + Send + Sync + 'static>;
pub struct LineDiscipline { pub struct LineDiscipline {
/// current line /// Current line
current_line: SpinLock<CurrentLine>, current_line: SpinLock<CurrentLine, LocalIrqDisabled>,
/// The read buffer /// The read buffer
read_buffer: SpinLock<RingBuffer<u8>>, read_buffer: SpinLock<RingBuffer<u8>, LocalIrqDisabled>,
/// termios /// Termios
termios: SpinLock<KernelTermios>, termios: SpinLock<KernelTermios, LocalIrqDisabled>,
/// Windows size, /// Windows size
winsize: SpinLock<WinSize>, winsize: SpinLock<WinSize, LocalIrqDisabled>,
/// Pollee /// Pollee
pollee: Pollee, pollee: Pollee,
/// Used to send signal for foreground processes, when some char comes. /// Used to send signal for foreground processes, when some char comes.
send_signal: LdiscSignalSender, send_signal: LdiscSignalSender,
/// work item /// Work item
work_item: Arc<WorkItem>, work_item: Arc<WorkItem>,
/// Parameters used by a work item. /// Parameters used by a work item.
work_item_para: Arc<SpinLock<LineDisciplineWorkPara>>, work_item_para: Arc<SpinLock<LineDisciplineWorkPara, LocalIrqDisabled>>,
} }
pub struct CurrentLine { pub struct CurrentLine {
@ -58,7 +61,7 @@ impl Default for CurrentLine {
} }
impl CurrentLine { impl CurrentLine {
/// read all bytes inside current line and clear current line /// Reads all bytes inside current line and clear current line
pub fn drain(&mut self) -> Vec<u8> { pub fn drain(&mut self) -> Vec<u8> {
let mut ret = vec![0u8; self.buffer.len()]; let mut ret = vec![0u8; self.buffer.len()];
self.buffer.pop_slice(ret.as_mut_slice()).unwrap(); self.buffer.pop_slice(ret.as_mut_slice()).unwrap();
@ -85,13 +88,13 @@ impl CurrentLine {
} }
impl LineDiscipline { impl LineDiscipline {
/// Create a new line discipline /// Creates a new line discipline
pub fn new(send_signal: LdiscSignalSender) -> Arc<Self> { pub fn new(send_signal: LdiscSignalSender) -> Arc<Self> {
Arc::new_cyclic(move |line_ref: &Weak<LineDiscipline>| { Arc::new_cyclic(move |line_ref: &Weak<LineDiscipline>| {
let line_discipline = line_ref.clone(); let line_discipline = line_ref.clone();
let work_item = Arc::new(WorkItem::new(Box::new(move || { let work_item = Arc::new(WorkItem::new(Box::new(move || {
if let Some(line_discipline) = line_discipline.upgrade() { if let Some(line_discipline) = line_discipline.upgrade() {
line_discipline.update_readable_state_after(); line_discipline.send_signal_after();
} }
}))); })));
Self { Self {
@ -107,9 +110,9 @@ impl LineDiscipline {
}) })
} }
/// Push char to line discipline. /// Pushes a char to the line discipline
pub fn push_char<F2: FnMut(&str)>(&self, ch: u8, echo_callback: F2) { pub fn push_char<F2: FnMut(&str)>(&self, ch: u8, echo_callback: F2) {
let termios = self.termios.disable_irq().lock(); let termios = self.termios.lock();
let ch = if termios.contains_icrnl() && ch == b'\r' { let ch = if termios.contains_icrnl() && ch == b'\r' {
b'\n' b'\n'
@ -130,7 +133,7 @@ impl LineDiscipline {
// Raw mode // Raw mode
if !termios.is_canonical_mode() { if !termios.is_canonical_mode() {
self.read_buffer.disable_irq().lock().push_overwrite(ch); self.read_buffer.lock().push_overwrite(ch);
self.update_readable_state(); self.update_readable_state();
return; return;
} }
@ -139,12 +142,12 @@ impl LineDiscipline {
if ch == *termios.get_special_char(CC_C_CHAR::VKILL) { if ch == *termios.get_special_char(CC_C_CHAR::VKILL) {
// Erase current line // Erase current line
self.current_line.disable_irq().lock().drain(); self.current_line.lock().drain();
} }
if ch == *termios.get_special_char(CC_C_CHAR::VERASE) { if ch == *termios.get_special_char(CC_C_CHAR::VERASE) {
// Type backspace // Type backspace
let mut current_line = self.current_line.disable_irq().lock(); let mut current_line = self.current_line.lock();
if !current_line.is_empty() { if !current_line.is_empty() {
current_line.backspace(); current_line.backspace();
} }
@ -152,17 +155,17 @@ impl LineDiscipline {
if is_line_terminator(ch, &termios) { if is_line_terminator(ch, &termios) {
// If a new line is met, all bytes in current_line will be moved to read_buffer // If a new line is met, all bytes in current_line will be moved to read_buffer
let mut current_line = self.current_line.disable_irq().lock(); let mut current_line = self.current_line.lock();
current_line.push_char(ch); current_line.push_char(ch);
let current_line_chars = current_line.drain(); let current_line_chars = current_line.drain();
for char in current_line_chars { for char in current_line_chars {
self.read_buffer.disable_irq().lock().push_overwrite(char); self.read_buffer.lock().push_overwrite(char);
} }
} }
if is_printable_char(ch) { if is_printable_char(ch) {
// Printable character // Printable character
self.current_line.disable_irq().lock().push_char(ch); self.current_line.lock().push_char(ch);
} }
self.update_readable_state(); self.update_readable_state();
@ -181,7 +184,7 @@ impl LineDiscipline {
if in_interrupt_context() { if in_interrupt_context() {
// `kernel_signal()` may cause sleep, so only construct parameters here. // `kernel_signal()` may cause sleep, so only construct parameters here.
self.work_item_para.disable_irq().lock().kernel_signal = Some(signal); self.work_item_para.lock().kernel_signal = Some(signal);
} else { } else {
(self.send_signal)(signal); (self.send_signal)(signal);
} }
@ -190,18 +193,7 @@ impl LineDiscipline {
} }
pub fn update_readable_state(&self) { pub fn update_readable_state(&self) {
let buffer = self.read_buffer.disable_irq().lock(); let buffer = self.read_buffer.lock();
if in_interrupt_context() {
// Add/Del events may sleep, so only construct parameters here.
if !buffer.is_empty() {
self.work_item_para.disable_irq().lock().pollee_type = Some(PolleeType::Add);
} else {
self.work_item_para.disable_irq().lock().pollee_type = Some(PolleeType::Del);
}
submit_work_item(self.work_item.clone(), WorkPriority::High);
return;
}
if !buffer.is_empty() { if !buffer.is_empty() {
self.pollee.add_events(IoEvents::IN); self.pollee.add_events(IoEvents::IN);
@ -210,27 +202,11 @@ impl LineDiscipline {
} }
} }
/// include all operations that may cause sleep, and processes by a work queue. /// Sends a signal later. The signal will be handled by a work queue.
fn update_readable_state_after(&self) { fn send_signal_after(&self) {
if let Some(signal) = self if let Some(signal) = self.work_item_para.lock().kernel_signal.take() {
.work_item_para
.disable_irq()
.lock()
.kernel_signal
.take()
{
(self.send_signal)(signal); (self.send_signal)(signal);
}; };
if let Some(pollee_type) = self.work_item_para.disable_irq().lock().pollee_type.take() {
match pollee_type {
PolleeType::Add => {
self.pollee.add_events(IoEvents::IN);
}
PolleeType::Del => {
self.pollee.del_events(IoEvents::IN);
}
}
}
} }
// TODO: respect output flags // TODO: respect output flags
@ -268,16 +244,18 @@ impl LineDiscipline {
} }
} }
/// read all bytes buffered to dst, return the actual read length. /// Reads all bytes buffered to `dst`.
///
/// This method returns the actual read length.
fn try_read(&self, dst: &mut [u8]) -> Result<usize> { fn try_read(&self, dst: &mut [u8]) -> Result<usize> {
let (vmin, vtime) = { let (vmin, vtime) = {
let termios = self.termios.disable_irq().lock(); let termios = self.termios.lock();
let vmin = *termios.get_special_char(CC_C_CHAR::VMIN); let vmin = *termios.get_special_char(CC_C_CHAR::VMIN);
let vtime = *termios.get_special_char(CC_C_CHAR::VTIME); let vtime = *termios.get_special_char(CC_C_CHAR::VTIME);
(vmin, vtime) (vmin, vtime)
}; };
let read_len = { let read_len = {
let len = self.read_buffer.disable_irq().lock().len(); let len = self.read_buffer.lock().len();
let max_read_len = len.min(dst.len()); let max_read_len = len.min(dst.len());
if vmin == 0 && vtime == 0 { if vmin == 0 && vtime == 0 {
// poll read // poll read
@ -301,10 +279,11 @@ impl LineDiscipline {
self.pollee.poll(mask, poller) self.pollee.poll(mask, poller)
} }
/// returns immediately with the lesser of the number of bytes available or the number of bytes requested. /// Reads bytes from `self` to `dst`, returning the actual bytes read.
/// If no bytes are available, completes immediately, returning 0. ///
/// If no bytes are available, this method returns 0 immediately.
fn poll_read(&self, dst: &mut [u8]) -> usize { fn poll_read(&self, dst: &mut [u8]) -> usize {
let mut buffer = self.read_buffer.disable_irq().lock(); let mut buffer = self.read_buffer.lock();
let len = buffer.len(); let len = buffer.len();
let max_read_len = len.min(dst.len()); let max_read_len = len.min(dst.len());
if max_read_len == 0 { if max_read_len == 0 {
@ -313,7 +292,7 @@ impl LineDiscipline {
let mut read_len = 0; let mut read_len = 0;
for dst_i in dst.iter_mut().take(max_read_len) { for dst_i in dst.iter_mut().take(max_read_len) {
if let Some(next_char) = buffer.pop() { if let Some(next_char) = buffer.pop() {
let termios = self.termios.disable_irq().lock(); let termios = self.termios.lock();
if termios.is_canonical_mode() { if termios.is_canonical_mode() {
// canonical mode, read until meet new line // canonical mode, read until meet new line
if is_line_terminator(next_char, &termios) { if is_line_terminator(next_char, &termios) {
@ -341,8 +320,13 @@ impl LineDiscipline {
read_len read_len
} }
// The read() blocks until the number of bytes requested or /// Reads bytes from `self` into `dst`,
// at least vmin bytes are available, and returns the real read value. /// returning the actual number of bytes read.
///
/// # Errors
///
/// If the available bytes are fewer than `min(dst.len(), vmin)`,
/// this method returns [`Errno::EAGAIN`].
pub fn block_read(&self, dst: &mut [u8], vmin: u8) -> Result<usize> { pub fn block_read(&self, dst: &mut [u8], vmin: u8) -> Result<usize> {
let _guard = disable_local(); let _guard = disable_local();
let buffer_len = self.read_buffer.lock().len(); let buffer_len = self.read_buffer.lock().len();
@ -355,22 +339,17 @@ impl LineDiscipline {
Ok(self.poll_read(&mut dst[..buffer_len])) Ok(self.poll_read(&mut dst[..buffer_len]))
} }
/// write bytes to buffer, if flush to console, then write the content to console /// Returns whether there is buffered data
pub fn write(&self, src: &[u8], flush_to_console: bool) -> Result<usize> {
todo!()
}
/// whether there is buffered data
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.read_buffer.disable_irq().lock().len() == 0 self.read_buffer.lock().len() == 0
} }
pub fn termios(&self) -> KernelTermios { pub fn termios(&self) -> KernelTermios {
*self.termios.disable_irq().lock() *self.termios.lock()
} }
pub fn set_termios(&self, termios: KernelTermios) { pub fn set_termios(&self, termios: KernelTermios) {
*self.termios.disable_irq().lock() = termios; *self.termios.lock() = termios;
} }
pub fn drain_input(&self) { pub fn drain_input(&self) {
@ -433,16 +412,13 @@ enum PolleeType {
} }
struct LineDisciplineWorkPara { struct LineDisciplineWorkPara {
#[allow(clippy::type_complexity)]
kernel_signal: Option<KernelSignal>, kernel_signal: Option<KernelSignal>,
pollee_type: Option<PolleeType>,
} }
impl LineDisciplineWorkPara { impl LineDisciplineWorkPara {
fn new() -> Self { fn new() -> Self {
Self { Self {
kernel_signal: None, kernel_signal: None,
pollee_type: None,
} }
} }
} }