diff --git a/Cargo.lock b/Cargo.lock index 39457265..b4e50c71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,15 @@ dependencies = [ name = "cpio-decoder" version = "0.1.0" +[[package]] +name = "crossbeam-utils" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +dependencies = [ + "cfg-if", +] + [[package]] name = "ctor" version = "0.1.25" @@ -359,11 +368,12 @@ dependencies = [ "jinux-rights-proc", "jinux-time", "jinux-util", + "keyable-arc", "lazy_static", "log", "lru", "pod", - "ringbuffer", + "ringbuf", "spin 0.9.4", "time", "typeflags", @@ -411,6 +421,10 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" +[[package]] +name = "keyable-arc" +version = "0.1.0" + [[package]] name = "lazy_static" version = "1.4.0" @@ -536,10 +550,13 @@ dependencies = [ ] [[package]] -name = "ringbuffer" -version = "0.10.0" +name = "ringbuf" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "310a2514cb46bb500a2f2ad70c79c51c5a475cc23faa245d2c675faabe889370" +checksum = "93ca10b9c9e53ac855a2d6953bce34cef6edbac32c4b13047a4d59d67299420a" +dependencies = [ + "crossbeam-utils", +] [[package]] name = "rsdp" diff --git a/services/libs/jinux-std/Cargo.toml b/services/libs/jinux-std/Cargo.toml index 06d3a2fa..56cb2f48 100644 --- a/services/libs/jinux-std/Cargo.toml +++ b/services/libs/jinux-std/Cargo.toml @@ -28,7 +28,8 @@ xmas-elf = "0.8.0" # goblin = {version= "0.5.3", default-features = false, features = ["elf64"]} # data-structures bitflags = "1.3" -ringbuffer = "0.10.0" +ringbuf = { version = "0.3.2", default-features = false, features = ["alloc"] } +keyable-arc = { path = "../keyable-arc" } spin = "0.9.4" vte = "0.10" diff --git a/services/libs/jinux-std/src/fs/file_handle/file.rs b/services/libs/jinux-std/src/fs/file_handle/file.rs index 283c5890..2b6f8779 100644 --- a/services/libs/jinux-std/src/fs/file_handle/file.rs +++ b/services/libs/jinux-std/src/fs/file_handle/file.rs @@ -1,4 +1,4 @@ -use crate::fs::utils::{IoEvents, IoctlCmd, Metadata, SeekFrom}; +use crate::fs::utils::{IoEvents, IoctlCmd, Metadata, Poller, SeekFrom}; use crate::prelude::*; use crate::tty::get_n_tty; @@ -25,7 +25,7 @@ pub trait File: Send + Sync + Any { } } - fn poll(&self) -> IoEvents { + fn poll(&self, _mask: IoEvents, _poller: Option<&Poller>) -> IoEvents { IoEvents::empty() } diff --git a/services/libs/jinux-std/src/fs/file_handle/mod.rs b/services/libs/jinux-std/src/fs/file_handle/mod.rs index 84202a8f..01cd09aa 100644 --- a/services/libs/jinux-std/src/fs/file_handle/mod.rs +++ b/services/libs/jinux-std/src/fs/file_handle/mod.rs @@ -3,7 +3,7 @@ mod file; mod inode_handle; -use crate::fs::utils::{Metadata, SeekFrom}; +use crate::fs::utils::{IoEvents, Metadata, Poller, SeekFrom}; use crate::prelude::*; use crate::rights::{ReadOp, WriteOp}; use alloc::sync::Arc; @@ -81,6 +81,13 @@ impl FileHandle { } } + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + match &self.inner { + Inner::File(file) => file.poll(mask, poller), + Inner::Inode(inode_handle) => inode_handle.dentry().vnode().poll(mask, poller), + } + } + pub fn clean_for_close(&self) -> Result<()> { match &self.inner { Inner::Inode(_) => { diff --git a/services/libs/jinux-std/src/fs/mod.rs b/services/libs/jinux-std/src/fs/mod.rs index 021b7195..a2508270 100644 --- a/services/libs/jinux-std/src/fs/mod.rs +++ b/services/libs/jinux-std/src/fs/mod.rs @@ -2,6 +2,7 @@ pub mod file_handle; pub mod file_table; pub mod fs_resolver; pub mod initramfs; +pub mod pipe; pub mod procfs; pub mod ramfs; pub mod stdio; diff --git a/services/libs/jinux-std/src/fs/pipe.rs b/services/libs/jinux-std/src/fs/pipe.rs new file mode 100644 index 00000000..fe7ae33e --- /dev/null +++ b/services/libs/jinux-std/src/fs/pipe.rs @@ -0,0 +1,95 @@ +use crate::prelude::*; + +use super::file_handle::File; +use super::utils::{Consumer, IoEvents, Poller, Producer}; + +pub struct PipeReader { + consumer: Consumer, +} + +impl PipeReader { + pub fn new(consumer: Consumer) -> Self { + Self { consumer } + } +} + +impl File for PipeReader { + fn read(&self, buf: &mut [u8]) -> Result { + let is_nonblocking = self.consumer.is_nonblocking(); + + // Fast path + let res = self.consumer.read(buf); + if should_io_return(&res, is_nonblocking) { + return res; + } + + // Slow path + let mask = IoEvents::IN; + let poller = Poller::new(); + loop { + let res = self.consumer.read(buf); + if should_io_return(&res, is_nonblocking) { + return res; + } + let events = self.poll(mask, Some(&poller)); + if events.is_empty() { + poller.wait(); + } + } + } + + fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.consumer.poll(mask, poller) + } +} + +pub struct PipeWriter { + producer: Producer, +} + +impl PipeWriter { + pub fn new(producer: Producer) -> Self { + Self { producer } + } +} + +impl File for PipeWriter { + fn write(&self, buf: &[u8]) -> Result { + let is_nonblocking = self.producer.is_nonblocking(); + + // Fast path + let res = self.producer.write(buf); + if should_io_return(&res, is_nonblocking) { + return res; + } + + // Slow path + let mask = IoEvents::OUT; + let poller = Poller::new(); + loop { + let res = self.producer.write(buf); + if should_io_return(&res, is_nonblocking) { + return res; + } + let events = self.poll(mask, Some(&poller)); + if events.is_empty() { + poller.wait(); + } + } + } + + fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.producer.poll(mask, poller) + } +} + +fn should_io_return(res: &Result, is_nonblocking: bool) -> bool { + if is_nonblocking { + return true; + } + match res { + Ok(_) => true, + Err(e) if e.error() == Errno::EAGAIN => false, + Err(_) => true, + } +} diff --git a/services/libs/jinux-std/src/fs/stdio.rs b/services/libs/jinux-std/src/fs/stdio.rs index 5ef7da83..fbc138a3 100644 --- a/services/libs/jinux-std/src/fs/stdio.rs +++ b/services/libs/jinux-std/src/fs/stdio.rs @@ -3,7 +3,7 @@ use crate::tty::{get_n_tty, Tty}; use super::file_handle::File; use super::file_table::FileDescripter; -use super::utils::{InodeMode, InodeType, IoEvents, Metadata, SeekFrom}; +use super::utils::{InodeMode, InodeType, IoEvents, Metadata, Poller, SeekFrom}; pub const FD_STDIN: FileDescripter = 0; pub const FD_STDOUT: FileDescripter = 1; @@ -22,9 +22,9 @@ pub struct Stderr { } impl File for Stdin { - fn poll(&self) -> IoEvents { + fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { if let Some(console) = self.console.as_ref() { - console.poll() + console.poll(mask, poller) } else { todo!() } diff --git a/services/libs/jinux-std/src/fs/utils/channel.rs b/services/libs/jinux-std/src/fs/utils/channel.rs new file mode 100644 index 00000000..04543449 --- /dev/null +++ b/services/libs/jinux-std/src/fs/utils/channel.rs @@ -0,0 +1,349 @@ +use core::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use jinux_rights_proc::require; +use ringbuf::{HeapConsumer as HeapRbConsumer, HeapProducer as HeapRbProducer, HeapRb}; + +use crate::prelude::*; +use crate::rights::*; + +use super::{IoEvents, Pollee, Poller, StatusFlags}; + +/// A unidirectional communication channel, intended to implement IPC, e.g., pipe, +/// unix domain sockets, etc. +pub struct Channel { + producer: Producer, + consumer: Consumer, +} + +impl Channel { + pub fn with_capacity(capacity: usize) -> Result { + Self::with_capacity_and_flags(capacity, StatusFlags::empty()) + } + + pub fn with_capacity_and_flags(capacity: usize, flags: StatusFlags) -> Result { + let common = Arc::new(Common::with_capacity_and_flags(capacity, flags)?); + let producer = Producer(EndPoint::new(common.clone(), WriteOp::new())); + let consumer = Consumer(EndPoint::new(common, ReadOp::new())); + Ok(Self { producer, consumer }) + } + + pub fn split(self) -> (Producer, Consumer) { + let Self { producer, consumer } = self; + (producer, consumer) + } + + pub fn producer(&self) -> &Producer { + &self.producer + } + + pub fn consumer(&self) -> &Consumer { + &self.consumer + } + + pub fn capacity(&self) -> usize { + self.producer.0.common.capacity() + } +} + +pub struct Producer(EndPoint); + +pub struct Consumer(EndPoint); + +impl Producer { + fn this_end(&self) -> &EndPointInner> { + &self.0.common.producer + } + + fn peer_end(&self) -> &EndPointInner> { + &self.0.common.consumer + } + + fn update_pollee(&self) { + let this_end = self.this_end(); + let peer_end = self.peer_end(); + + // Update the event of pollee in a critical region so that pollee + // always reflects the _true_ state of the underlying ring buffer + // regardless of any race conditions. + self.0.common.lock_event(); + + let rb = this_end.rb(); + if rb.is_full() { + this_end.pollee.del_events(IoEvents::OUT); + } + if !rb.is_empty() { + peer_end.pollee.add_events(IoEvents::IN); + } + } + + pub fn shutdown(&self) { + self.this_end().shutdown() + } + + pub fn is_shutdown(&self) -> bool { + self.this_end().is_shutdown() + } + + pub fn is_peer_shutdown(&self) -> bool { + self.peer_end().is_shutdown() + } + + pub fn status_flags(&self) -> StatusFlags { + self.this_end().status_flags() + } + + pub fn set_status_flags(&self, new_flags: StatusFlags) { + self.this_end().set_status_flags(new_flags) + } + + pub fn is_nonblocking(&self) -> bool { + self.this_end() + .status_flags() + .contains(StatusFlags::O_NONBLOCK) + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.this_end().pollee.poll(mask, poller) + } +} + +impl Producer { + pub fn write(&self, buf: &[T]) -> Result { + if self.is_shutdown() || self.is_peer_shutdown() { + return_errno!(Errno::EPIPE); + } + + if buf.len() == 0 { + return Ok(0); + } + + let written_len = self.0.write(buf); + + self.update_pollee(); + + if written_len > 0 { + Ok(written_len) + } else { + return_errno_with_message!(Errno::EAGAIN, "try write later"); + } + } +} + +impl Drop for Producer { + fn drop(&mut self) { + self.shutdown(); + + self.0.common.lock_event(); + + // When reading from a channel such as a pipe or a stream socket, + // POLLHUP merely indicates that the peer closed its end of the channel. + self.peer_end().pollee.add_events(IoEvents::HUP); + } +} + +impl Consumer { + fn this_end(&self) -> &EndPointInner> { + &self.0.common.consumer + } + + fn peer_end(&self) -> &EndPointInner> { + &self.0.common.producer + } + + fn update_pollee(&self) { + let this_end = self.this_end(); + let peer_end = self.peer_end(); + + // Update the event of pollee in a critical region so that pollee + // always reflects the _true_ state of the underlying ring buffer + // regardless of any race conditions. + self.0.common.lock_event(); + + let rb = this_end.rb(); + if rb.is_empty() { + this_end.pollee.del_events(IoEvents::IN); + } + if !rb.is_full() { + peer_end.pollee.add_events(IoEvents::OUT); + } + } + + pub fn shutdown(&self) { + self.this_end().shutdown() + } + + pub fn is_shutdown(&self) -> bool { + self.this_end().is_shutdown() + } + + pub fn is_peer_shutdown(&self) -> bool { + self.peer_end().is_shutdown() + } + + pub fn status_flags(&self) -> StatusFlags { + self.this_end().status_flags() + } + + pub fn set_status_flags(&self, new_flags: StatusFlags) { + self.this_end().set_status_flags(new_flags) + } + + pub fn is_nonblocking(&self) -> bool { + self.this_end() + .status_flags() + .contains(StatusFlags::O_NONBLOCK) + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.this_end().pollee.poll(mask, poller) + } +} + +impl Consumer { + pub fn read(&self, buf: &mut [T]) -> Result { + if self.is_shutdown() { + return_errno!(Errno::EPIPE); + } + + if buf.len() == 0 { + return Ok(0); + } + + let read_len = self.0.read(buf); + + self.update_pollee(); + + if self.is_peer_shutdown() { + return Ok(read_len); + } + + if read_len > 0 { + Ok(read_len) + } else { + return_errno_with_message!(Errno::EAGAIN, "try read later"); + } + } +} + +impl Drop for Consumer { + fn drop(&mut self) { + self.shutdown(); + + self.0.common.lock_event(); + + // POLLERR is also set for a file descriptor referring to the write end of a pipe + // when the read end has been closed. + self.peer_end().pollee.add_events(IoEvents::ERR); + } +} + +struct EndPoint { + common: Arc>, + rights: R, +} + +impl EndPoint { + pub fn new(common: Arc>, rights: R) -> Self { + Self { common, rights } + } +} + +impl EndPoint { + #[require(R > Read)] + pub fn read(&self, buf: &mut [T]) -> usize { + let mut rb = self.common.consumer.rb(); + rb.pop_slice(buf) + } + + #[require(R > Write)] + pub fn write(&self, buf: &[T]) -> usize { + let mut rb = self.common.producer.rb(); + rb.push_slice(buf) + } +} + +struct Common { + producer: EndPointInner>, + consumer: EndPointInner>, + event_lock: Mutex<()>, +} + +impl Common { + fn with_capacity_and_flags(capacity: usize, flags: StatusFlags) -> Result { + check_status_flags(flags)?; + + if capacity == 0 { + return_errno_with_message!(Errno::EINVAL, "capacity cannot be zero"); + } + + let rb: HeapRb = HeapRb::new(capacity); + let (rb_producer, rb_consumer) = rb.split(); + + let producer = EndPointInner::new(rb_producer, IoEvents::OUT, flags); + let consumer = EndPointInner::new(rb_consumer, IoEvents::empty(), flags); + let event_lock = Mutex::new(()); + + Ok(Self { + producer, + consumer, + event_lock, + }) + } + + pub fn lock_event(&self) -> MutexGuard<()> { + self.event_lock.lock() + } + + pub fn capacity(&self) -> usize { + self.producer.rb().capacity() + } +} + +struct EndPointInner { + rb: Mutex, + pollee: Pollee, + is_shutdown: AtomicBool, + status_flags: AtomicU32, +} + +impl EndPointInner { + pub fn new(rb: T, init_events: IoEvents, status_flags: StatusFlags) -> Self { + Self { + rb: Mutex::new(rb), + pollee: Pollee::new(init_events), + is_shutdown: AtomicBool::new(false), + status_flags: AtomicU32::new(status_flags.bits()), + } + } + + pub fn rb(&self) -> MutexGuard { + self.rb.lock() + } + + pub fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::Acquire) + } + + pub fn shutdown(&self) { + self.is_shutdown.store(true, Ordering::Release) + } + + pub fn status_flags(&self) -> StatusFlags { + let bits = self.status_flags.load(Ordering::Relaxed); + StatusFlags::from_bits(bits).unwrap() + } + + pub fn set_status_flags(&self, new_flags: StatusFlags) { + self.status_flags.store(new_flags.bits(), Ordering::Relaxed); + } +} + +fn check_status_flags(flags: StatusFlags) -> Result<()> { + let valid_flags: StatusFlags = StatusFlags::O_NONBLOCK | StatusFlags::O_DIRECT; + if !valid_flags.contains(flags) { + return_errno_with_message!(Errno::EINVAL, "invalid flags"); + } + if flags.contains(StatusFlags::O_DIRECT) { + return_errno_with_message!(Errno::EINVAL, "O_DIRECT is not supported"); + } + Ok(()) +} diff --git a/services/libs/jinux-std/src/fs/utils/inode.rs b/services/libs/jinux-std/src/fs/utils/inode.rs index 88a50905..fde7fdf4 100644 --- a/services/libs/jinux-std/src/fs/utils/inode.rs +++ b/services/libs/jinux-std/src/fs/utils/inode.rs @@ -5,7 +5,7 @@ use core::any::Any; use core::time::Duration; use jinux_frame::vm::VmFrame; -use super::{DirentVisitor, FileSystem, IoctlCmd, SuperBlock}; +use super::{DirentVisitor, FileSystem, IoEvents, IoctlCmd, Poller, SuperBlock}; use crate::prelude::*; #[repr(u32)] @@ -197,6 +197,11 @@ pub trait Inode: Any + Sync + Send { fn sync(&self) -> Result<()>; + fn poll(&self, mask: IoEvents, _poller: Option<&Poller>) -> IoEvents { + let events = IoEvents::IN | IoEvents::OUT; + events & mask + } + fn fs(&self) -> Arc; fn as_any_ref(&self) -> &dyn Any; diff --git a/services/libs/jinux-std/src/fs/utils/io_events.rs b/services/libs/jinux-std/src/fs/utils/io_events.rs index f8d1dbdb..4a3cbe41 100644 --- a/services/libs/jinux-std/src/fs/utils/io_events.rs +++ b/services/libs/jinux-std/src/fs/utils/io_events.rs @@ -1,11 +1,17 @@ +use crate::events::Events; + crate::bitflags! { pub struct IoEvents: u32 { - const POLLIN = 0x0001; - const POLLPRI = 0x0002; - const POLLOUT = 0x0004; - const POLLERR = 0x0008; - const POLLHUP = 0x0010; - const POLLNVAL = 0x0020; - const POLLRDHUP = 0x2000; + const IN = 0x0001; + const PRI = 0x0002; + const OUT = 0x0004; + const ERR = 0x0008; + const HUP = 0x0010; + const NVAL = 0x0020; + const RDHUP = 0x2000; + /// Events that are always polled even without specifying them. + const ALWAYS_POLL = Self::ERR.bits | Self::HUP.bits; } } + +impl Events for IoEvents {} diff --git a/services/libs/jinux-std/src/fs/utils/mod.rs b/services/libs/jinux-std/src/fs/utils/mod.rs index ee83cbae..b978ec32 100644 --- a/services/libs/jinux-std/src/fs/utils/mod.rs +++ b/services/libs/jinux-std/src/fs/utils/mod.rs @@ -1,6 +1,7 @@ //! VFS components pub use access_mode::AccessMode; +pub use channel::{Channel, Consumer, Producer}; pub use creation_flags::CreationFlags; pub use dentry_cache::Dentry; pub use dirent_visitor::DirentVisitor; @@ -12,11 +13,12 @@ pub use inode::{Inode, InodeMode, InodeType, Metadata}; pub use io_events::IoEvents; pub use ioctl::IoctlCmd; pub use page_cache::PageCache; -pub use poll::{c_nfds, c_pollfd, PollFd}; +pub use poll::{Pollee, Poller}; pub use status_flags::StatusFlags; pub use vnode::Vnode; mod access_mode; +mod channel; mod creation_flags; mod dentry_cache; mod dirent_visitor; diff --git a/services/libs/jinux-std/src/fs/utils/poll.rs b/services/libs/jinux-std/src/fs/utils/poll.rs index 2154d1fc..53d234f7 100644 --- a/services/libs/jinux-std/src/fs/utils/poll.rs +++ b/services/libs/jinux-std/src/fs/utils/poll.rs @@ -1,46 +1,211 @@ -#![allow(non_camel_case_types)] - use super::IoEvents; -use crate::fs::file_table::FileDescripter; +use crate::events::Observer; use crate::prelude::*; -pub type c_nfds = u64; -// https://github.com/torvalds/linux/blob/master/include/uapi/asm-generic/poll.h -#[derive(Debug, Clone, Copy, Pod)] -#[repr(C)] -pub struct c_pollfd { - fd: FileDescripter, - events: i16, - revents: i16, +use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use jinux_frame::sync::WaitQueue; +use keyable_arc::KeyableArc; + +/// A pollee maintains a set of active events, which can be polled with +/// pollers or be monitored with observers. +pub struct Pollee { + inner: Arc, } -#[derive(Debug, Clone, Copy)] -pub struct PollFd { - pub fd: FileDescripter, - pub events: IoEvents, - pub revents: IoEvents, +struct PolleeInner { + // A table that maintains all interesting pollers + pollers: Mutex>, IoEvents>>, + // For efficient manipulation, we use AtomicU32 instead of RwLock + events: AtomicU32, + // To reduce lock contentions, we maintain a counter for the size of the table + num_pollers: AtomicUsize, } -impl From for PollFd { - fn from(raw: c_pollfd) -> Self { - let events = IoEvents::from_bits_truncate(raw.events as _); - let revents = IoEvents::from_bits_truncate(raw.revents as _); +impl Pollee { + /// Creates a new instance of pollee. + pub fn new(init_events: IoEvents) -> Self { + let inner = PolleeInner { + pollers: Mutex::new(BTreeMap::new()), + events: AtomicU32::new(init_events.bits()), + num_pollers: AtomicUsize::new(0), + }; Self { - fd: raw.fd, - events, - revents, + inner: Arc::new(inner), + } + } + + /// Returns the current events of the pollee given an event mask. + /// + /// If no interesting events are polled and a poller is provided, then + /// the poller will start monitoring the pollee and receive event + /// notification once the pollee gets any interesting events. + /// + /// This operation is _atomic_ in the sense that either some interesting + /// events are returned or the poller is registered (if a poller is provided). + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + let mask = mask | IoEvents::ALWAYS_POLL; + + // Fast path: return events immediately + let revents = self.events() & mask; + if !revents.is_empty() || poller.is_none() { + return revents; + } + + // Slow path: register the provided poller + self.register_poller(poller.unwrap(), mask); + + // It is important to check events again to handle race conditions + let revents = self.events() & mask; + revents + } + + fn register_poller(&self, poller: &Poller, mask: IoEvents) { + let mut pollers = self.inner.pollers.lock(); + let is_new = { + let observer = poller.observer(); + pollers.insert(observer, mask).is_none() + }; + if is_new { + let mut pollees = poller.inner.pollees.lock(); + pollees.push(Arc::downgrade(&self.inner)); + + self.inner.num_pollers.fetch_add(1, Ordering::Release); + } + } + + /// Add some events to the pollee's state. + /// + /// This method wakes up all registered pollers that are interested in + /// the added events. + pub fn add_events(&self, events: IoEvents) { + self.inner.events.fetch_or(events.bits(), Ordering::Release); + + // Fast path + if self.inner.num_pollers.load(Ordering::Relaxed) == 0 { + return; + } + + // Slow path: broadcast the new events to all pollers + let pollers = self.inner.pollers.lock(); + pollers + .iter() + .filter(|(_, mask)| mask.intersects(events)) + .for_each(|(poller, mask)| poller.on_events(&(events & *mask))); + } + + /// Remove some events from the pollee's state. + /// + /// This method will not wake up registered pollers even when + /// the pollee still has some interesting events to the pollers. + pub fn del_events(&self, events: IoEvents) { + self.inner + .events + .fetch_and(!events.bits(), Ordering::Release); + } + + /// Reset the pollee's state. + /// + /// Reset means removing all events on the pollee. + pub fn reset_events(&self) { + self.inner + .events + .fetch_and(!IoEvents::all().bits(), Ordering::Release); + } + + fn events(&self) -> IoEvents { + let event_bits = self.inner.events.load(Ordering::Acquire); + IoEvents::from_bits(event_bits).unwrap() + } +} + +/// A poller gets notified when its associated pollees have interesting events. +pub struct Poller { + inner: KeyableArc, +} + +struct PollerInner { + // Use event counter to wait or wake up a poller + event_counter: EventCounter, + // All pollees that are interesting to this poller + pollees: Mutex>>, +} + +impl Poller { + /// Constructs a new `Poller`. + pub fn new() -> Self { + let inner = PollerInner { + event_counter: EventCounter::new(), + pollees: Mutex::new(Vec::with_capacity(1)), + }; + Self { + inner: KeyableArc::new(inner), + } + } + + /// Wait until there are any interesting events happen since last `wait`. + pub fn wait(&self) { + self.inner.event_counter.read(); + } + + fn observer(&self) -> KeyableArc> { + self.inner.clone() as KeyableArc> + } +} + +impl Observer for PollerInner { + fn on_events(&self, _events: &IoEvents) { + self.event_counter.write(); + } +} + +impl Drop for Poller { + fn drop(&mut self) { + let mut pollees = self.inner.pollees.lock(); + if pollees.len() == 0 { + return; + } + + let self_observer = self.observer(); + for weak_pollee in pollees.drain(..) { + if let Some(pollee) = weak_pollee.upgrade() { + let mut pollers = pollee.pollers.lock(); + let res = pollers.remove(&self_observer); + assert!(res.is_some()); + drop(pollers); + + pollee.num_pollers.fetch_sub(1, Ordering::Relaxed); + } } } } -impl From for c_pollfd { - fn from(raw: PollFd) -> Self { - let events = raw.events.bits() as i16; - let revents = raw.revents.bits() as i16; +/// A counter for wait and wakeup. +struct EventCounter { + counter: AtomicUsize, + wait_queue: WaitQueue, +} + +impl EventCounter { + pub fn new() -> Self { Self { - fd: raw.fd, - events, - revents, + counter: AtomicUsize::new(0), + wait_queue: WaitQueue::new(), } } + + pub fn read(&self) -> usize { + self.wait_queue.wait_until(|| { + let val = self.counter.swap(0, Ordering::Relaxed); + if val > 0 { + Some(val) + } else { + None + } + }) + } + + pub fn write(&self) { + self.counter.fetch_add(1, Ordering::Relaxed); + self.wait_queue.wake_one(); + } } diff --git a/services/libs/jinux-std/src/fs/utils/vnode.rs b/services/libs/jinux-std/src/fs/utils/vnode.rs index d1017198..6168979d 100644 --- a/services/libs/jinux-std/src/fs/utils/vnode.rs +++ b/services/libs/jinux-std/src/fs/utils/vnode.rs @@ -1,4 +1,6 @@ -use super::{DirentVisitor, FsFlags, Inode, InodeMode, InodeType, Metadata, PageCache}; +use super::{ + DirentVisitor, FsFlags, Inode, InodeMode, InodeType, IoEvents, Metadata, PageCache, Poller, +}; use crate::prelude::*; use crate::rights::Full; use crate::vm::vmo::Vmo; @@ -184,6 +186,10 @@ impl Vnode { self.inner.read().inode.readdir_at(offset, visitor) } + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.inner.read().inode.poll(mask, poller) + } + pub fn metadata(&self) -> Metadata { self.inner.read().inode.metadata() } diff --git a/services/libs/jinux-std/src/process/mod.rs b/services/libs/jinux-std/src/process/mod.rs index 422ff4cc..1b9849d1 100644 --- a/services/libs/jinux-std/src/process/mod.rs +++ b/services/libs/jinux-std/src/process/mod.rs @@ -49,8 +49,6 @@ pub struct Process { root_vmar: Arc>, /// wait for child status changed waiting_children: WaitQueue, - /// wait for io events - poll_queue: WaitQueue, // Mutable Part /// The executable path. @@ -110,7 +108,6 @@ impl Process { ) -> Self { let children = BTreeMap::new(); let waiting_children = WaitQueue::new(); - let poll_queue = WaitQueue::new(); let resource_limits = ResourceLimits::default(); Self { pid, @@ -119,7 +116,6 @@ impl Process { user_vm, root_vmar, waiting_children, - poll_queue, exit_code: AtomicI32::new(0), status: Mutex::new(ProcessStatus::Runnable), parent: Mutex::new(parent), @@ -138,10 +134,6 @@ impl Process { &self.waiting_children } - pub fn poll_queue(&self) -> &WaitQueue { - &self.poll_queue - } - /// init a user process and run the process pub fn spawn_user_process( executable_path: &str, diff --git a/services/libs/jinux-std/src/process/process_group.rs b/services/libs/jinux-std/src/process/process_group.rs index f469e2e3..b203b38a 100644 --- a/services/libs/jinux-std/src/process/process_group.rs +++ b/services/libs/jinux-std/src/process/process_group.rs @@ -69,14 +69,6 @@ impl ProcessGroup { self.inner.lock().pgid } - /// Wake up all processes waiting on polling queue - pub fn wake_all_polling_procs(&self) { - let inner = self.inner.lock(); - for (_, process) in &inner.processes { - process.poll_queue().wake_all(); - } - } - /// send kernel signal to all processes in the group pub fn kernel_signal(&self, signal: KernelSignal) { for (_, process) in &self.inner.lock().processes { diff --git a/services/libs/jinux-std/src/syscall/mod.rs b/services/libs/jinux-std/src/syscall/mod.rs index e0be1822..f2390b95 100644 --- a/services/libs/jinux-std/src/syscall/mod.rs +++ b/services/libs/jinux-std/src/syscall/mod.rs @@ -36,6 +36,7 @@ use crate::syscall::mprotect::sys_mprotect; use crate::syscall::munmap::sys_munmap; use crate::syscall::open::{sys_open, sys_openat}; use crate::syscall::pause::sys_pause; +use crate::syscall::pipe::{sys_pipe, sys_pipe2}; use crate::syscall::poll::sys_poll; use crate::syscall::prctl::sys_prctl; use crate::syscall::prlimit64::sys_prlimit64; @@ -102,6 +103,7 @@ mod mprotect; mod munmap; mod open; mod pause; +mod pipe; mod poll; mod prctl; mod pread64; @@ -181,6 +183,7 @@ define_syscall_nums!( SYS_PREAD64 = 17, SYS_WRITEV = 20, SYS_ACCESS = 21, + SYS_PIPE = 22, SYS_SCHED_YIELD = 24, SYS_MADVISE = 28, SYS_DUP = 32, @@ -234,6 +237,7 @@ define_syscall_nums!( SYS_READLINKAT = 267, SYS_SET_ROBUST_LIST = 273, SYS_UTIMENSAT = 280, + SYS_PIPE2 = 293, SYS_PRLIMIT64 = 302 ); @@ -313,6 +317,7 @@ pub fn syscall_dispatch( SYS_PREAD64 => syscall_handler!(4, sys_pread64, args), SYS_WRITEV => syscall_handler!(3, sys_writev, args), SYS_ACCESS => syscall_handler!(2, sys_access, args), + SYS_PIPE => syscall_handler!(1, sys_pipe, args), SYS_SCHED_YIELD => syscall_handler!(0, sys_sched_yield), SYS_MADVISE => syscall_handler!(3, sys_madvise, args), SYS_DUP => syscall_handler!(1, sys_dup, args), @@ -366,6 +371,7 @@ pub fn syscall_dispatch( SYS_READLINKAT => syscall_handler!(4, sys_readlinkat, args), SYS_SET_ROBUST_LIST => syscall_handler!(2, sys_set_robust_list, args), SYS_UTIMENSAT => syscall_handler!(4, sys_utimensat, args), + SYS_PIPE2 => syscall_handler!(2, sys_pipe2, args), SYS_PRLIMIT64 => syscall_handler!(4, sys_prlimit64, args), _ => { error!("Unimplemented syscall number: {}", syscall_number); diff --git a/services/libs/jinux-std/src/syscall/pipe.rs b/services/libs/jinux-std/src/syscall/pipe.rs new file mode 100644 index 00000000..fe9715c8 --- /dev/null +++ b/services/libs/jinux-std/src/syscall/pipe.rs @@ -0,0 +1,48 @@ +use crate::fs::file_handle::FileHandle; +use crate::fs::file_table::FileDescripter; +use crate::fs::pipe::{PipeReader, PipeWriter}; +use crate::fs::utils::{Channel, StatusFlags}; +use crate::log_syscall_entry; +use crate::prelude::*; +use crate::util::{read_val_from_user, write_val_to_user}; + +use super::SyscallReturn; +use super::SYS_PIPE2; + +pub fn sys_pipe2(fds: Vaddr, flags: u32) -> Result { + log_syscall_entry!(SYS_PIPE2); + debug!("flags: {:?}", flags); + + let mut pipe_fds = read_val_from_user::(fds)?; + let (reader, writer) = { + let (producer, consumer) = Channel::with_capacity_and_flags( + PIPE_BUF_SIZE, + StatusFlags::from_bits_truncate(flags), + )? + .split(); + (PipeReader::new(consumer), PipeWriter::new(producer)) + }; + let pipe_reader = FileHandle::new_file(Arc::new(reader)); + let pipe_writer = FileHandle::new_file(Arc::new(writer)); + + let current = current!(); + let mut file_table = current.file_table().lock(); + pipe_fds.reader_fd = file_table.insert(pipe_reader); + pipe_fds.writer_fd = file_table.insert(pipe_writer); + write_val_to_user(fds, &pipe_fds)?; + + Ok(SyscallReturn::Return(0)) +} + +pub fn sys_pipe(fds: Vaddr) -> Result { + self::sys_pipe2(fds, 0) +} + +#[derive(Debug, Clone, Copy, Pod)] +#[repr(C)] +struct PipeFds { + reader_fd: FileDescripter, + writer_fd: FileDescripter, +} + +const PIPE_BUF_SIZE: usize = 1024 * 1024; diff --git a/services/libs/jinux-std/src/syscall/poll.rs b/services/libs/jinux-std/src/syscall/poll.rs index 15096c6c..13f564a7 100644 --- a/services/libs/jinux-std/src/syscall/poll.rs +++ b/services/libs/jinux-std/src/syscall/poll.rs @@ -1,65 +1,161 @@ +use core::cell::Cell; use core::time::Duration; -use crate::fs::utils::{c_pollfd, PollFd}; +use crate::fs::file_table::FileDescripter; +use crate::fs::utils::{IoEvents, Poller}; use crate::log_syscall_entry; +use crate::prelude::*; use crate::util::{read_val_from_user, write_val_to_user}; -use crate::{fs::utils::c_nfds, prelude::*}; use super::SyscallReturn; use super::SYS_POLL; -pub fn sys_poll(fds: Vaddr, nfds: c_nfds, timeout: i32) -> Result { +pub fn sys_poll(fds: Vaddr, nfds: u64, timeout: i32) -> Result { log_syscall_entry!(SYS_POLL); - let mut read_addr = fds; - let mut pollfds = Vec::with_capacity(nfds as _); - for _ in 0..nfds { - let c_poll_fd = read_val_from_user::(read_addr)?; - let poll_fd = PollFd::from(c_poll_fd); - pollfds.push(poll_fd); - // FIXME: do we need to respect align of c_pollfd here? - read_addr += core::mem::size_of::(); - } - let timeout = if timeout == 0 { - None - } else { + let poll_fds = { + let mut read_addr = fds; + let mut poll_fds = Vec::with_capacity(nfds as _); + for _ in 0..nfds { + let c_poll_fd = read_val_from_user::(read_addr)?; + let poll_fd = PollFd::from(c_poll_fd); + // Always clear the revents fields first + poll_fd.revents().set(IoEvents::empty()); + poll_fds.push(poll_fd); + // FIXME: do we need to respect align of c_pollfd here? + read_addr += core::mem::size_of::(); + } + poll_fds + }; + let timeout = if timeout >= 0 { Some(Duration::from_millis(timeout as _)) + } else { + None }; debug!( "poll_fds = {:?}, nfds = {}, timeout = {:?}", - pollfds, nfds, timeout + poll_fds, nfds, timeout ); - let current = current!(); - // FIXME: respect timeout parameter - let ready_files = current.poll_queue().wait_until(|| { - let mut ready_files = 0; - for pollfd in &mut pollfds { - let file_table = current.file_table().lock(); - let file = file_table.get_file(pollfd.fd); - match file { - Err(_) => return Some(Err(Error::new(Errno::EBADF))), - Ok(file) => { - let file_events = file.as_file().unwrap().poll(); - let polled_events = pollfd.events.intersection(file_events); - if !polled_events.is_empty() { - ready_files += 1; - pollfd.revents |= polled_events; - } - } - } - } - if ready_files > 0 { - return Some(Ok(ready_files)); - } else { - return None; - } - })?; + + let num_revents = do_poll(&poll_fds, timeout)?; + + // Write back let mut write_addr = fds; - for pollfd in pollfds { + for pollfd in poll_fds { let c_poll_fd = c_pollfd::from(pollfd); write_val_to_user(write_addr, &c_poll_fd)?; // FIXME: do we need to respect align of c_pollfd here? write_addr += core::mem::size_of::(); } - Ok(SyscallReturn::Return(ready_files)) + + Ok(SyscallReturn::Return(num_revents as _)) +} + +fn do_poll(poll_fds: &[PollFd], timeout: Option) -> Result { + // The main loop of polling + let poller = Poller::new(); + loop { + let mut num_revents = 0; + + for poll_fd in poll_fds { + // Skip poll_fd if it is not given a fd + let fd = match poll_fd.fd() { + Some(fd) => fd, + None => continue, + }; + + // Poll the file + let current = current!(); + let file = { + let file_table = current.file_table().lock(); + file_table.get_file(fd)?.clone() + }; + let need_poller = if num_revents == 0 { + Some(&poller) + } else { + None + }; + let revents = file.poll(poll_fd.events(), need_poller); + if !revents.is_empty() { + poll_fd.revents().set(revents); + num_revents += 1; + } + } + + if num_revents > 0 { + return Ok(num_revents); + } + + // Return immediately if specifying a timeout of zero + if timeout.is_some() && timeout.as_ref().unwrap().is_zero() { + return Ok(0); + } + + // FIXME: respect timeout parameter + poller.wait(); + } +} + +// https://github.com/torvalds/linux/blob/master/include/uapi/asm-generic/poll.h +#[derive(Debug, Clone, Copy, Pod)] +#[repr(C)] +pub struct c_pollfd { + fd: i32, + events: i16, + revents: i16, +} + +#[derive(Debug, Clone)] +pub struct PollFd { + fd: Option, + events: IoEvents, + revents: Cell, +} + +impl PollFd { + pub fn fd(&self) -> Option { + self.fd + } + + pub fn events(&self) -> IoEvents { + self.events + } + + pub fn revents(&self) -> &Cell { + &self.revents + } +} + +impl From for PollFd { + fn from(raw: c_pollfd) -> Self { + let fd = if raw.fd >= 0 { + Some(raw.fd as FileDescripter) + } else { + None + }; + let events = IoEvents::from_bits_truncate(raw.events as _); + let revents = Cell::new(IoEvents::from_bits_truncate(raw.revents as _)); + Self { + fd, + events, + revents, + } + } +} + +impl From for c_pollfd { + fn from(raw: PollFd) -> Self { + let fd = if let Some(fd) = raw.fd() { + fd as i32 + } else { + -1 + }; + let events = raw.events().bits() as i16; + let revents = raw.revents().get().bits() as i16; + Self { + fd, + events, + revents, + } + } } diff --git a/services/libs/jinux-std/src/tty/line_discipline.rs b/services/libs/jinux-std/src/tty/line_discipline.rs index e0769209..688e6ef4 100644 --- a/services/libs/jinux-std/src/tty/line_discipline.rs +++ b/services/libs/jinux-std/src/tty/line_discipline.rs @@ -1,11 +1,10 @@ -use crate::fs::utils::IoEvents; +use crate::fs::utils::{IoEvents, Pollee, Poller}; use crate::process::signal::constants::{SIGINT, SIGQUIT}; use crate::{ prelude::*, process::{process_table, signal::signals::kernel::KernelSignal, Pgid}, }; -use jinux_frame::sync::WaitQueue; -use ringbuffer::{ConstGenericRingBuffer, RingBuffer, RingBufferRead, RingBufferWrite}; +use ringbuf::{ring_buffer::RbBase, Rb, StaticRb}; use super::termio::{KernelTermios, CC_C_CHAR}; @@ -18,40 +17,39 @@ pub struct LineDiscipline { /// current line current_line: RwLock, /// The read buffer - read_buffer: Mutex>, + read_buffer: Mutex>, /// The foreground process group foreground: RwLock>, /// termios termios: RwLock, - /// wait until self is readable - read_wait_queue: WaitQueue, + /// Pollee + pollee: Pollee, } -#[derive(Debug)] pub struct CurrentLine { - buffer: ConstGenericRingBuffer, + buffer: StaticRb, } impl CurrentLine { pub fn new() -> Self { Self { - buffer: ConstGenericRingBuffer::new(), + buffer: StaticRb::default(), } } /// read all bytes inside current line and clear current line pub fn drain(&mut self) -> Vec { - self.buffer.drain().collect() + self.buffer.pop_iter().collect() } pub fn push_char(&mut self, char: u8) { // What should we do if line is full? debug_assert!(!self.is_full()); - self.buffer.push(char); + self.buffer.push_overwrite(char); } pub fn backspace(&mut self) { - self.buffer.dequeue(); + self.buffer.pop(); } pub fn is_full(&self) -> bool { @@ -68,10 +66,10 @@ impl LineDiscipline { pub fn new() -> Self { Self { current_line: RwLock::new(CurrentLine::new()), - read_buffer: Mutex::new(ConstGenericRingBuffer::new()), + read_buffer: Mutex::new(StaticRb::default()), foreground: RwLock::new(None), termios: RwLock::new(KernelTermios::default()), - read_wait_queue: WaitQueue::new(), + pollee: Pollee::new(IoEvents::empty()), } } @@ -118,7 +116,7 @@ impl LineDiscipline { current_line.push_char(item); let current_line_chars = current_line.drain(); for char in current_line_chars { - self.read_buffer.lock().push(char); + self.read_buffer.lock().push_overwrite(char); } } else if item >= 0x20 && item < 0x7f { // printable character @@ -126,7 +124,7 @@ impl LineDiscipline { } } else { // raw mode - self.read_buffer.lock().push(item); + self.read_buffer.lock().push_overwrite(item); // debug!("push char: {}", char::from(item)) } @@ -135,13 +133,13 @@ impl LineDiscipline { } if self.is_readable() { - self.read_wait_queue.wake_all(); + self.pollee.add_events(IoEvents::IN); } } /// whether self is readable fn is_readable(&self) -> bool { - self.read_buffer.lock().len() > 0 + !self.read_buffer.lock().is_empty() } // TODO: respect output flags @@ -170,58 +168,84 @@ impl LineDiscipline { /// read all bytes buffered to dst, return the actual read length. pub fn read(&self, dst: &mut [u8]) -> Result { - let termios = self.termios.read(); - let vmin = *termios.get_special_char(CC_C_CHAR::VMIN); - let vtime = *termios.get_special_char(CC_C_CHAR::VTIME); - drop(termios); - let read_len: usize = self.read_wait_queue.wait_until(|| { - // if current process does not belong to foreground process group, - // block until current process become foreground. - if !self.current_belongs_to_foreground() { - warn!("current process does not belong to foreground process group"); - return None; + let mut poller = None; + loop { + let res = self.try_read(dst); + match res { + Ok(read_len) => { + return Ok(read_len); + } + Err(e) => { + if e.error() != Errno::EAGAIN { + return Err(e); + } + } } + + // Wait for read event + let need_poller = if poller.is_none() { + poller = Some(Poller::new()); + poller.as_ref() + } else { + None + }; + let revents = self.pollee.poll(IoEvents::IN, need_poller); + if revents.is_empty() { + poller.as_ref().unwrap().wait(); + } + } + } + + pub fn try_read(&self, dst: &mut [u8]) -> Result { + if !self.current_belongs_to_foreground() { + return_errno!(Errno::EAGAIN); + } + + let (vmin, vtime) = { + let termios = self.termios.read(); + let vmin = *termios.get_special_char(CC_C_CHAR::VMIN); + let vtime = *termios.get_special_char(CC_C_CHAR::VTIME); + (vmin, vtime) + }; + let read_len = { let len = self.read_buffer.lock().len(); let max_read_len = len.min(dst.len()); if vmin == 0 && vtime == 0 { // poll read - return self.poll_read(dst); - } - if vmin > 0 && vtime == 0 { + self.poll_read(dst) + } else if vmin > 0 && vtime == 0 { // block read - return self.block_read(dst, vmin); - } - if vmin == 0 && vtime > 0 { + self.block_read(dst, vmin)? + } else if vmin == 0 && vtime > 0 { todo!() - } - if vmin > 0 && vtime > 0 { + } else if vmin > 0 && vtime > 0 { todo!() + } else { + unreachable!() } - unreachable!() - }); + }; + if !self.is_readable() { + self.pollee.del_events(IoEvents::IN); + } Ok(read_len) } - pub fn poll(&self) -> IoEvents { - if self.is_empty() { - IoEvents::empty() - } else { - IoEvents::POLLIN - } + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.pollee.poll(mask, poller) } /// returns immediately with the lesser of the number of bytes available or the number of bytes requested. /// If no bytes are available, completes immediately, returning 0. - fn poll_read(&self, dst: &mut [u8]) -> Option { + fn poll_read(&self, dst: &mut [u8]) -> usize { let mut buffer = self.read_buffer.lock(); let len = buffer.len(); let max_read_len = len.min(dst.len()); if max_read_len == 0 { - return Some(0); + return 0; } let mut read_len = 0; for i in 0..max_read_len { - if let Some(next_char) = buffer.dequeue() { + if let Some(next_char) = buffer.pop() { if self.termios.read().is_canonical_mode() { // canonical mode, read until meet new line if meet_new_line(next_char, &self.termios.read()) { @@ -245,18 +269,18 @@ impl LineDiscipline { } } - Some(read_len) + read_len } // The read() blocks until the lesser of the number of bytes requested or // MIN bytes are available, and returns the lesser of the two values. - pub fn block_read(&self, dst: &mut [u8], vmin: u8) -> Option { + pub fn block_read(&self, dst: &mut [u8], vmin: u8) -> Result { let min_read_len = (vmin as usize).min(dst.len()); let buffer_len = self.read_buffer.lock().len(); if buffer_len < min_read_len { - return None; + return_errno!(Errno::EAGAIN); } - return self.poll_read(&mut dst[..min_read_len]); + Ok(self.poll_read(&mut dst[..min_read_len])) } /// write bytes to buffer, if flush to console, then write the content to console @@ -283,7 +307,7 @@ impl LineDiscipline { *self.foreground.write() = Some(fg_pgid); // Some background processes may be waiting on the wait queue, when set_fg, the background processes may be able to read. if self.is_readable() { - self.read_wait_queue.wake_all(); + self.pollee.add_events(IoEvents::IN); } } diff --git a/services/libs/jinux-std/src/tty/mod.rs b/services/libs/jinux-std/src/tty/mod.rs index de8bd9e3..bd534be7 100644 --- a/services/libs/jinux-std/src/tty/mod.rs +++ b/services/libs/jinux-std/src/tty/mod.rs @@ -1,9 +1,12 @@ use self::line_discipline::LineDiscipline; use crate::driver::tty::TtyDriver; use crate::fs::utils::{InodeMode, InodeType, IoEvents, Metadata}; -use crate::fs::{file_handle::File, utils::IoctlCmd}; +use crate::fs::{ + file_handle::File, + utils::{IoctlCmd, Poller}, +}; use crate::prelude::*; -use crate::process::{process_table, Pgid}; +use crate::process::Pgid; use crate::util::{read_val_from_user, write_val_to_user}; pub mod line_discipline; @@ -43,19 +46,8 @@ impl Tty { *self.driver.lock() = driver; } - /// Wake up foreground process group that wait on IO events. - /// This function should be called when the interrupt handler of IO events is called. - fn wake_fg_proc_grp(&self) { - if let Some(fg_pgid) = self.ldisc.get_fg() { - if let Some(fg_proc_grp) = process_table::pgid_to_process_group(fg_pgid) { - fg_proc_grp.wake_all_polling_procs(); - } - } - } - pub fn receive_char(&self, item: u8) { self.ldisc.push_char(item); - self.wake_fg_proc_grp(); } } @@ -73,8 +65,8 @@ impl File for Tty { Ok(buf.len()) } - fn poll(&self) -> IoEvents { - self.ldisc.poll() + fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.ldisc.poll(mask, poller) } fn ioctl(&self, cmd: IoctlCmd, arg: usize) -> Result {