Add support for pipe and poll

This commit is contained in:
LI Qing
2023-03-30 23:43:05 -04:00
committed by Tate, Hongliang Tian
parent 453d2ad0f0
commit 0fc707d38c
20 changed files with 981 additions and 177 deletions

View File

@ -1,4 +1,4 @@
use crate::fs::utils::{IoEvents, IoctlCmd, Metadata, SeekFrom};
use crate::fs::utils::{IoEvents, IoctlCmd, Metadata, Poller, SeekFrom};
use crate::prelude::*;
use crate::tty::get_n_tty;
@ -25,7 +25,7 @@ pub trait File: Send + Sync + Any {
}
}
fn poll(&self) -> IoEvents {
fn poll(&self, _mask: IoEvents, _poller: Option<&Poller>) -> IoEvents {
IoEvents::empty()
}

View File

@ -3,7 +3,7 @@
mod file;
mod inode_handle;
use crate::fs::utils::{Metadata, SeekFrom};
use crate::fs::utils::{IoEvents, Metadata, Poller, SeekFrom};
use crate::prelude::*;
use crate::rights::{ReadOp, WriteOp};
use alloc::sync::Arc;
@ -81,6 +81,13 @@ impl FileHandle {
}
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
match &self.inner {
Inner::File(file) => file.poll(mask, poller),
Inner::Inode(inode_handle) => inode_handle.dentry().vnode().poll(mask, poller),
}
}
pub fn clean_for_close(&self) -> Result<()> {
match &self.inner {
Inner::Inode(_) => {

View File

@ -2,6 +2,7 @@ pub mod file_handle;
pub mod file_table;
pub mod fs_resolver;
pub mod initramfs;
pub mod pipe;
pub mod procfs;
pub mod ramfs;
pub mod stdio;

View File

@ -0,0 +1,95 @@
use crate::prelude::*;
use super::file_handle::File;
use super::utils::{Consumer, IoEvents, Poller, Producer};
pub struct PipeReader {
consumer: Consumer<u8>,
}
impl PipeReader {
pub fn new(consumer: Consumer<u8>) -> Self {
Self { consumer }
}
}
impl File for PipeReader {
fn read(&self, buf: &mut [u8]) -> Result<usize> {
let is_nonblocking = self.consumer.is_nonblocking();
// Fast path
let res = self.consumer.read(buf);
if should_io_return(&res, is_nonblocking) {
return res;
}
// Slow path
let mask = IoEvents::IN;
let poller = Poller::new();
loop {
let res = self.consumer.read(buf);
if should_io_return(&res, is_nonblocking) {
return res;
}
let events = self.poll(mask, Some(&poller));
if events.is_empty() {
poller.wait();
}
}
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.consumer.poll(mask, poller)
}
}
pub struct PipeWriter {
producer: Producer<u8>,
}
impl PipeWriter {
pub fn new(producer: Producer<u8>) -> Self {
Self { producer }
}
}
impl File for PipeWriter {
fn write(&self, buf: &[u8]) -> Result<usize> {
let is_nonblocking = self.producer.is_nonblocking();
// Fast path
let res = self.producer.write(buf);
if should_io_return(&res, is_nonblocking) {
return res;
}
// Slow path
let mask = IoEvents::OUT;
let poller = Poller::new();
loop {
let res = self.producer.write(buf);
if should_io_return(&res, is_nonblocking) {
return res;
}
let events = self.poll(mask, Some(&poller));
if events.is_empty() {
poller.wait();
}
}
}
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.producer.poll(mask, poller)
}
}
fn should_io_return(res: &Result<usize>, is_nonblocking: bool) -> bool {
if is_nonblocking {
return true;
}
match res {
Ok(_) => true,
Err(e) if e.error() == Errno::EAGAIN => false,
Err(_) => true,
}
}

View File

@ -3,7 +3,7 @@ use crate::tty::{get_n_tty, Tty};
use super::file_handle::File;
use super::file_table::FileDescripter;
use super::utils::{InodeMode, InodeType, IoEvents, Metadata, SeekFrom};
use super::utils::{InodeMode, InodeType, IoEvents, Metadata, Poller, SeekFrom};
pub const FD_STDIN: FileDescripter = 0;
pub const FD_STDOUT: FileDescripter = 1;
@ -22,9 +22,9 @@ pub struct Stderr {
}
impl File for Stdin {
fn poll(&self) -> IoEvents {
fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
if let Some(console) = self.console.as_ref() {
console.poll()
console.poll(mask, poller)
} else {
todo!()
}

View File

@ -0,0 +1,349 @@
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use jinux_rights_proc::require;
use ringbuf::{HeapConsumer as HeapRbConsumer, HeapProducer as HeapRbProducer, HeapRb};
use crate::prelude::*;
use crate::rights::*;
use super::{IoEvents, Pollee, Poller, StatusFlags};
/// A unidirectional communication channel, intended to implement IPC, e.g., pipe,
/// unix domain sockets, etc.
pub struct Channel<T> {
producer: Producer<T>,
consumer: Consumer<T>,
}
impl<T> Channel<T> {
pub fn with_capacity(capacity: usize) -> Result<Self> {
Self::with_capacity_and_flags(capacity, StatusFlags::empty())
}
pub fn with_capacity_and_flags(capacity: usize, flags: StatusFlags) -> Result<Self> {
let common = Arc::new(Common::with_capacity_and_flags(capacity, flags)?);
let producer = Producer(EndPoint::new(common.clone(), WriteOp::new()));
let consumer = Consumer(EndPoint::new(common, ReadOp::new()));
Ok(Self { producer, consumer })
}
pub fn split(self) -> (Producer<T>, Consumer<T>) {
let Self { producer, consumer } = self;
(producer, consumer)
}
pub fn producer(&self) -> &Producer<T> {
&self.producer
}
pub fn consumer(&self) -> &Consumer<T> {
&self.consumer
}
pub fn capacity(&self) -> usize {
self.producer.0.common.capacity()
}
}
pub struct Producer<T>(EndPoint<T, WriteOp>);
pub struct Consumer<T>(EndPoint<T, ReadOp>);
impl<T> Producer<T> {
fn this_end(&self) -> &EndPointInner<HeapRbProducer<T>> {
&self.0.common.producer
}
fn peer_end(&self) -> &EndPointInner<HeapRbConsumer<T>> {
&self.0.common.consumer
}
fn update_pollee(&self) {
let this_end = self.this_end();
let peer_end = self.peer_end();
// Update the event of pollee in a critical region so that pollee
// always reflects the _true_ state of the underlying ring buffer
// regardless of any race conditions.
self.0.common.lock_event();
let rb = this_end.rb();
if rb.is_full() {
this_end.pollee.del_events(IoEvents::OUT);
}
if !rb.is_empty() {
peer_end.pollee.add_events(IoEvents::IN);
}
}
pub fn shutdown(&self) {
self.this_end().shutdown()
}
pub fn is_shutdown(&self) -> bool {
self.this_end().is_shutdown()
}
pub fn is_peer_shutdown(&self) -> bool {
self.peer_end().is_shutdown()
}
pub fn status_flags(&self) -> StatusFlags {
self.this_end().status_flags()
}
pub fn set_status_flags(&self, new_flags: StatusFlags) {
self.this_end().set_status_flags(new_flags)
}
pub fn is_nonblocking(&self) -> bool {
self.this_end()
.status_flags()
.contains(StatusFlags::O_NONBLOCK)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.this_end().pollee.poll(mask, poller)
}
}
impl<T: Copy> Producer<T> {
pub fn write(&self, buf: &[T]) -> Result<usize> {
if self.is_shutdown() || self.is_peer_shutdown() {
return_errno!(Errno::EPIPE);
}
if buf.len() == 0 {
return Ok(0);
}
let written_len = self.0.write(buf);
self.update_pollee();
if written_len > 0 {
Ok(written_len)
} else {
return_errno_with_message!(Errno::EAGAIN, "try write later");
}
}
}
impl<T> Drop for Producer<T> {
fn drop(&mut self) {
self.shutdown();
self.0.common.lock_event();
// When reading from a channel such as a pipe or a stream socket,
// POLLHUP merely indicates that the peer closed its end of the channel.
self.peer_end().pollee.add_events(IoEvents::HUP);
}
}
impl<T> Consumer<T> {
fn this_end(&self) -> &EndPointInner<HeapRbConsumer<T>> {
&self.0.common.consumer
}
fn peer_end(&self) -> &EndPointInner<HeapRbProducer<T>> {
&self.0.common.producer
}
fn update_pollee(&self) {
let this_end = self.this_end();
let peer_end = self.peer_end();
// Update the event of pollee in a critical region so that pollee
// always reflects the _true_ state of the underlying ring buffer
// regardless of any race conditions.
self.0.common.lock_event();
let rb = this_end.rb();
if rb.is_empty() {
this_end.pollee.del_events(IoEvents::IN);
}
if !rb.is_full() {
peer_end.pollee.add_events(IoEvents::OUT);
}
}
pub fn shutdown(&self) {
self.this_end().shutdown()
}
pub fn is_shutdown(&self) -> bool {
self.this_end().is_shutdown()
}
pub fn is_peer_shutdown(&self) -> bool {
self.peer_end().is_shutdown()
}
pub fn status_flags(&self) -> StatusFlags {
self.this_end().status_flags()
}
pub fn set_status_flags(&self, new_flags: StatusFlags) {
self.this_end().set_status_flags(new_flags)
}
pub fn is_nonblocking(&self) -> bool {
self.this_end()
.status_flags()
.contains(StatusFlags::O_NONBLOCK)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.this_end().pollee.poll(mask, poller)
}
}
impl<T: Copy> Consumer<T> {
pub fn read(&self, buf: &mut [T]) -> Result<usize> {
if self.is_shutdown() {
return_errno!(Errno::EPIPE);
}
if buf.len() == 0 {
return Ok(0);
}
let read_len = self.0.read(buf);
self.update_pollee();
if self.is_peer_shutdown() {
return Ok(read_len);
}
if read_len > 0 {
Ok(read_len)
} else {
return_errno_with_message!(Errno::EAGAIN, "try read later");
}
}
}
impl<T> Drop for Consumer<T> {
fn drop(&mut self) {
self.shutdown();
self.0.common.lock_event();
// POLLERR is also set for a file descriptor referring to the write end of a pipe
// when the read end has been closed.
self.peer_end().pollee.add_events(IoEvents::ERR);
}
}
struct EndPoint<T, R: TRights> {
common: Arc<Common<T>>,
rights: R,
}
impl<T, R: TRights> EndPoint<T, R> {
pub fn new(common: Arc<Common<T>>, rights: R) -> Self {
Self { common, rights }
}
}
impl<T: Copy, R: TRights> EndPoint<T, R> {
#[require(R > Read)]
pub fn read(&self, buf: &mut [T]) -> usize {
let mut rb = self.common.consumer.rb();
rb.pop_slice(buf)
}
#[require(R > Write)]
pub fn write(&self, buf: &[T]) -> usize {
let mut rb = self.common.producer.rb();
rb.push_slice(buf)
}
}
struct Common<T> {
producer: EndPointInner<HeapRbProducer<T>>,
consumer: EndPointInner<HeapRbConsumer<T>>,
event_lock: Mutex<()>,
}
impl<T> Common<T> {
fn with_capacity_and_flags(capacity: usize, flags: StatusFlags) -> Result<Self> {
check_status_flags(flags)?;
if capacity == 0 {
return_errno_with_message!(Errno::EINVAL, "capacity cannot be zero");
}
let rb: HeapRb<T> = HeapRb::new(capacity);
let (rb_producer, rb_consumer) = rb.split();
let producer = EndPointInner::new(rb_producer, IoEvents::OUT, flags);
let consumer = EndPointInner::new(rb_consumer, IoEvents::empty(), flags);
let event_lock = Mutex::new(());
Ok(Self {
producer,
consumer,
event_lock,
})
}
pub fn lock_event(&self) -> MutexGuard<()> {
self.event_lock.lock()
}
pub fn capacity(&self) -> usize {
self.producer.rb().capacity()
}
}
struct EndPointInner<T> {
rb: Mutex<T>,
pollee: Pollee,
is_shutdown: AtomicBool,
status_flags: AtomicU32,
}
impl<T> EndPointInner<T> {
pub fn new(rb: T, init_events: IoEvents, status_flags: StatusFlags) -> Self {
Self {
rb: Mutex::new(rb),
pollee: Pollee::new(init_events),
is_shutdown: AtomicBool::new(false),
status_flags: AtomicU32::new(status_flags.bits()),
}
}
pub fn rb(&self) -> MutexGuard<T> {
self.rb.lock()
}
pub fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::Acquire)
}
pub fn shutdown(&self) {
self.is_shutdown.store(true, Ordering::Release)
}
pub fn status_flags(&self) -> StatusFlags {
let bits = self.status_flags.load(Ordering::Relaxed);
StatusFlags::from_bits(bits).unwrap()
}
pub fn set_status_flags(&self, new_flags: StatusFlags) {
self.status_flags.store(new_flags.bits(), Ordering::Relaxed);
}
}
fn check_status_flags(flags: StatusFlags) -> Result<()> {
let valid_flags: StatusFlags = StatusFlags::O_NONBLOCK | StatusFlags::O_DIRECT;
if !valid_flags.contains(flags) {
return_errno_with_message!(Errno::EINVAL, "invalid flags");
}
if flags.contains(StatusFlags::O_DIRECT) {
return_errno_with_message!(Errno::EINVAL, "O_DIRECT is not supported");
}
Ok(())
}

View File

@ -5,7 +5,7 @@ use core::any::Any;
use core::time::Duration;
use jinux_frame::vm::VmFrame;
use super::{DirentVisitor, FileSystem, IoctlCmd, SuperBlock};
use super::{DirentVisitor, FileSystem, IoEvents, IoctlCmd, Poller, SuperBlock};
use crate::prelude::*;
#[repr(u32)]
@ -197,6 +197,11 @@ pub trait Inode: Any + Sync + Send {
fn sync(&self) -> Result<()>;
fn poll(&self, mask: IoEvents, _poller: Option<&Poller>) -> IoEvents {
let events = IoEvents::IN | IoEvents::OUT;
events & mask
}
fn fs(&self) -> Arc<dyn FileSystem>;
fn as_any_ref(&self) -> &dyn Any;

View File

@ -1,11 +1,17 @@
use crate::events::Events;
crate::bitflags! {
pub struct IoEvents: u32 {
const POLLIN = 0x0001;
const POLLPRI = 0x0002;
const POLLOUT = 0x0004;
const POLLERR = 0x0008;
const POLLHUP = 0x0010;
const POLLNVAL = 0x0020;
const POLLRDHUP = 0x2000;
const IN = 0x0001;
const PRI = 0x0002;
const OUT = 0x0004;
const ERR = 0x0008;
const HUP = 0x0010;
const NVAL = 0x0020;
const RDHUP = 0x2000;
/// Events that are always polled even without specifying them.
const ALWAYS_POLL = Self::ERR.bits | Self::HUP.bits;
}
}
impl Events for IoEvents {}

View File

@ -1,6 +1,7 @@
//! VFS components
pub use access_mode::AccessMode;
pub use channel::{Channel, Consumer, Producer};
pub use creation_flags::CreationFlags;
pub use dentry_cache::Dentry;
pub use dirent_visitor::DirentVisitor;
@ -12,11 +13,12 @@ pub use inode::{Inode, InodeMode, InodeType, Metadata};
pub use io_events::IoEvents;
pub use ioctl::IoctlCmd;
pub use page_cache::PageCache;
pub use poll::{c_nfds, c_pollfd, PollFd};
pub use poll::{Pollee, Poller};
pub use status_flags::StatusFlags;
pub use vnode::Vnode;
mod access_mode;
mod channel;
mod creation_flags;
mod dentry_cache;
mod dirent_visitor;

View File

@ -1,46 +1,211 @@
#![allow(non_camel_case_types)]
use super::IoEvents;
use crate::fs::file_table::FileDescripter;
use crate::events::Observer;
use crate::prelude::*;
pub type c_nfds = u64;
// https://github.com/torvalds/linux/blob/master/include/uapi/asm-generic/poll.h
#[derive(Debug, Clone, Copy, Pod)]
#[repr(C)]
pub struct c_pollfd {
fd: FileDescripter,
events: i16,
revents: i16,
use core::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use jinux_frame::sync::WaitQueue;
use keyable_arc::KeyableArc;
/// A pollee maintains a set of active events, which can be polled with
/// pollers or be monitored with observers.
pub struct Pollee {
inner: Arc<PolleeInner>,
}
#[derive(Debug, Clone, Copy)]
pub struct PollFd {
pub fd: FileDescripter,
pub events: IoEvents,
pub revents: IoEvents,
struct PolleeInner {
// A table that maintains all interesting pollers
pollers: Mutex<BTreeMap<KeyableArc<dyn Observer<IoEvents>>, IoEvents>>,
// For efficient manipulation, we use AtomicU32 instead of RwLock<IoEvents>
events: AtomicU32,
// To reduce lock contentions, we maintain a counter for the size of the table
num_pollers: AtomicUsize,
}
impl From<c_pollfd> for PollFd {
fn from(raw: c_pollfd) -> Self {
let events = IoEvents::from_bits_truncate(raw.events as _);
let revents = IoEvents::from_bits_truncate(raw.revents as _);
impl Pollee {
/// Creates a new instance of pollee.
pub fn new(init_events: IoEvents) -> Self {
let inner = PolleeInner {
pollers: Mutex::new(BTreeMap::new()),
events: AtomicU32::new(init_events.bits()),
num_pollers: AtomicUsize::new(0),
};
Self {
fd: raw.fd,
events,
revents,
inner: Arc::new(inner),
}
}
/// Returns the current events of the pollee given an event mask.
///
/// If no interesting events are polled and a poller is provided, then
/// the poller will start monitoring the pollee and receive event
/// notification once the pollee gets any interesting events.
///
/// This operation is _atomic_ in the sense that either some interesting
/// events are returned or the poller is registered (if a poller is provided).
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
let mask = mask | IoEvents::ALWAYS_POLL;
// Fast path: return events immediately
let revents = self.events() & mask;
if !revents.is_empty() || poller.is_none() {
return revents;
}
// Slow path: register the provided poller
self.register_poller(poller.unwrap(), mask);
// It is important to check events again to handle race conditions
let revents = self.events() & mask;
revents
}
fn register_poller(&self, poller: &Poller, mask: IoEvents) {
let mut pollers = self.inner.pollers.lock();
let is_new = {
let observer = poller.observer();
pollers.insert(observer, mask).is_none()
};
if is_new {
let mut pollees = poller.inner.pollees.lock();
pollees.push(Arc::downgrade(&self.inner));
self.inner.num_pollers.fetch_add(1, Ordering::Release);
}
}
/// Add some events to the pollee's state.
///
/// This method wakes up all registered pollers that are interested in
/// the added events.
pub fn add_events(&self, events: IoEvents) {
self.inner.events.fetch_or(events.bits(), Ordering::Release);
// Fast path
if self.inner.num_pollers.load(Ordering::Relaxed) == 0 {
return;
}
// Slow path: broadcast the new events to all pollers
let pollers = self.inner.pollers.lock();
pollers
.iter()
.filter(|(_, mask)| mask.intersects(events))
.for_each(|(poller, mask)| poller.on_events(&(events & *mask)));
}
/// Remove some events from the pollee's state.
///
/// This method will not wake up registered pollers even when
/// the pollee still has some interesting events to the pollers.
pub fn del_events(&self, events: IoEvents) {
self.inner
.events
.fetch_and(!events.bits(), Ordering::Release);
}
/// Reset the pollee's state.
///
/// Reset means removing all events on the pollee.
pub fn reset_events(&self) {
self.inner
.events
.fetch_and(!IoEvents::all().bits(), Ordering::Release);
}
fn events(&self) -> IoEvents {
let event_bits = self.inner.events.load(Ordering::Acquire);
IoEvents::from_bits(event_bits).unwrap()
}
}
/// A poller gets notified when its associated pollees have interesting events.
pub struct Poller {
inner: KeyableArc<PollerInner>,
}
struct PollerInner {
// Use event counter to wait or wake up a poller
event_counter: EventCounter,
// All pollees that are interesting to this poller
pollees: Mutex<Vec<Weak<PolleeInner>>>,
}
impl Poller {
/// Constructs a new `Poller`.
pub fn new() -> Self {
let inner = PollerInner {
event_counter: EventCounter::new(),
pollees: Mutex::new(Vec::with_capacity(1)),
};
Self {
inner: KeyableArc::new(inner),
}
}
/// Wait until there are any interesting events happen since last `wait`.
pub fn wait(&self) {
self.inner.event_counter.read();
}
fn observer(&self) -> KeyableArc<dyn Observer<IoEvents>> {
self.inner.clone() as KeyableArc<dyn Observer<IoEvents>>
}
}
impl Observer<IoEvents> for PollerInner {
fn on_events(&self, _events: &IoEvents) {
self.event_counter.write();
}
}
impl Drop for Poller {
fn drop(&mut self) {
let mut pollees = self.inner.pollees.lock();
if pollees.len() == 0 {
return;
}
let self_observer = self.observer();
for weak_pollee in pollees.drain(..) {
if let Some(pollee) = weak_pollee.upgrade() {
let mut pollers = pollee.pollers.lock();
let res = pollers.remove(&self_observer);
assert!(res.is_some());
drop(pollers);
pollee.num_pollers.fetch_sub(1, Ordering::Relaxed);
}
}
}
}
impl From<PollFd> for c_pollfd {
fn from(raw: PollFd) -> Self {
let events = raw.events.bits() as i16;
let revents = raw.revents.bits() as i16;
/// A counter for wait and wakeup.
struct EventCounter {
counter: AtomicUsize,
wait_queue: WaitQueue,
}
impl EventCounter {
pub fn new() -> Self {
Self {
fd: raw.fd,
events,
revents,
counter: AtomicUsize::new(0),
wait_queue: WaitQueue::new(),
}
}
pub fn read(&self) -> usize {
self.wait_queue.wait_until(|| {
let val = self.counter.swap(0, Ordering::Relaxed);
if val > 0 {
Some(val)
} else {
None
}
})
}
pub fn write(&self) {
self.counter.fetch_add(1, Ordering::Relaxed);
self.wait_queue.wake_one();
}
}

View File

@ -1,4 +1,6 @@
use super::{DirentVisitor, FsFlags, Inode, InodeMode, InodeType, Metadata, PageCache};
use super::{
DirentVisitor, FsFlags, Inode, InodeMode, InodeType, IoEvents, Metadata, PageCache, Poller,
};
use crate::prelude::*;
use crate::rights::Full;
use crate::vm::vmo::Vmo;
@ -184,6 +186,10 @@ impl Vnode {
self.inner.read().inode.readdir_at(offset, visitor)
}
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.inner.read().inode.poll(mask, poller)
}
pub fn metadata(&self) -> Metadata {
self.inner.read().inode.metadata()
}