diff --git a/kernel/src/filesystem/vfs/file.rs b/kernel/src/filesystem/vfs/file.rs index e168654a..51086af6 100644 --- a/kernel/src/filesystem/vfs/file.rs +++ b/kernel/src/filesystem/vfs/file.rs @@ -11,7 +11,7 @@ use crate::{ tty::TtyFilePrivateData, }, filesystem::procfs::ProcfsFilePrivateData, - ipc::pipe::PipeFsPrivateData, + ipc::pipe::{LockedPipeInode, PipeFsPrivateData}, kerror, libs::spinlock::SpinLock, net::{ @@ -47,6 +47,14 @@ impl Default for FilePrivateData { } } +impl FilePrivateData { + pub fn update_mode(&mut self, mode: FileMode) { + if let FilePrivateData::Pipefs(pdata) = self { + pdata.set_mode(mode); + } + } +} + bitflags! { /// @brief 文件打开模式 /// 其中,低2bit组合而成的数字的值,用于表示访问权限。其他的bit,才支持通过按位或的方式来表示参数 @@ -388,6 +396,7 @@ impl File { // 直接修改文件的打开模式 self.mode = mode; + self.private_data.update_mode(mode); return Ok(()); } @@ -418,6 +427,10 @@ impl File { return socket.add_epoll(epitem); } + FileType::Pipe => { + let inode = self.inode.downcast_ref::().unwrap(); + return inode.inner().lock().add_epoll(epitem); + } _ => return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP), } } @@ -436,7 +449,7 @@ impl File { } pub fn poll(&self) -> Result { - self.inode.poll() + self.inode.poll(&self.private_data) } } diff --git a/kernel/src/filesystem/vfs/mod.rs b/kernel/src/filesystem/vfs/mod.rs index 211eacb8..e82e12f7 100644 --- a/kernel/src/filesystem/vfs/mod.rs +++ b/kernel/src/filesystem/vfs/mod.rs @@ -166,7 +166,7 @@ pub trait IndexNode: Any + Sync + Send + Debug { /// @brief 获取当前inode的状态。 /// /// @return PollStatus结构体 - fn poll(&self) -> Result { + fn poll(&self, _private_data: &FilePrivateData) -> Result { // 若文件系统没有实现此方法,则返回“不支持” return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP); } diff --git a/kernel/src/ipc/pipe.rs b/kernel/src/ipc/pipe.rs index 808a2927..ef57d87c 100644 --- a/kernel/src/ipc/pipe.rs +++ b/kernel/src/ipc/pipe.rs @@ -6,11 +6,15 @@ use crate::{ FileType, IndexNode, Metadata, }, libs::{spinlock::SpinLock, wait_queue::WaitQueue}, + net::event_poll::{EPollEventType, EPollItem, EventPoll}, process::ProcessState, time::TimeSpec, }; -use alloc::sync::{Arc, Weak}; +use alloc::{ + collections::LinkedList, + sync::{Arc, Weak}, +}; use system_error::SystemError; /// 我们设定pipe_buff的总大小为1024字节 @@ -25,6 +29,10 @@ impl PipeFsPrivateData { pub fn new(mode: FileMode) -> Self { return PipeFsPrivateData { mode: mode }; } + + pub fn set_mode(&mut self, mode: FileMode) { + self.mode = mode; + } } /// @brief 管道文件i节点(锁) @@ -35,6 +43,7 @@ pub struct LockedPipeInode(SpinLock); #[derive(Debug)] pub struct InnerPipeInode { self_ref: Weak, + /// 管道内可读的数据数 valid_cnt: i32, read_pos: i32, write_pos: i32, @@ -45,6 +54,50 @@ pub struct InnerPipeInode { metadata: Metadata, reader: u32, writer: u32, + epitems: SpinLock>>, +} + +impl InnerPipeInode { + pub fn poll(&self, private_data: &FilePrivateData) -> Result { + let mut events = EPollEventType::empty(); + + let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data { + mode + } else { + return Err(SystemError::EBADFD); + }; + + if mode.contains(FileMode::O_RDONLY) { + if self.valid_cnt != 0 { + // 有数据可读 + events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM); + } + + // 没有写者 + if self.writer == 0 { + events.insert(EPollEventType::EPOLLHUP) + } + } + + if mode.contains(FileMode::O_WRONLY) { + // 管道内数据未满 + if self.valid_cnt as usize != PIPE_BUFF_SIZE { + events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM); + } + + // 没有读者 + if self.reader == 0 { + events.insert(EPollEventType::EPOLLERR); + } + } + + Ok(events.bits() as usize) + } + + pub fn add_epoll(&mut self, epitem: Arc) -> Result<(), SystemError> { + self.epitems.lock().push_back(epitem); + Ok(()) + } } impl LockedPipeInode { @@ -76,6 +129,7 @@ impl LockedPipeInode { }, reader: 0, writer: 0, + epitems: SpinLock::new(LinkedList::new()), }; let result = Arc::new(Self(SpinLock::new(inner))); let mut guard = result.0.lock(); @@ -84,6 +138,10 @@ impl LockedPipeInode { drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁 return result; } + + pub fn inner(&self) -> &SpinLock { + &self.0 + } } impl IndexNode for LockedPipeInode { @@ -162,10 +220,22 @@ impl IndexNode for LockedPipeInode { inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32; inode.valid_cnt -= num as i32; + // 读完以后如果未读完,则唤醒下一个读者 + if inode.valid_cnt > 0 { + inode + .read_wait_queue + .wakeup(Some(ProcessState::Blocked(true))); + } + //读完后解锁并唤醒等待在写等待队列中的进程 inode .write_wait_queue .wakeup(Some(ProcessState::Blocked(true))); + + let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); + // 唤醒epoll中等待的进程 + EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?; + //返回读取的字节数 return Ok(num); } @@ -302,10 +372,22 @@ impl IndexNode for LockedPipeInode { inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32; inode.valid_cnt += len as i32; + // 写完后还有位置,则唤醒下一个写者 + if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE { + inode + .write_wait_queue + .wakeup(Some(ProcessState::Blocked(true))); + } + // 读完后解锁并唤醒等待在读等待队列中的进程 inode .read_wait_queue .wakeup(Some(ProcessState::Blocked(true))); + + let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); + // 唤醒epoll中等待的进程 + EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?; + // 返回写入的字节数 return Ok(len); } @@ -331,4 +413,8 @@ impl IndexNode for LockedPipeInode { fn list(&self) -> Result, SystemError> { return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP); } + + fn poll(&self, private_data: &FilePrivateData) -> Result { + return self.0.lock().poll(private_data); + } } diff --git a/kernel/src/ipc/syscall.rs b/kernel/src/ipc/syscall.rs index 1b2f8ee5..9d712f5a 100644 --- a/kernel/src/ipc/syscall.rs +++ b/kernel/src/ipc/syscall.rs @@ -33,37 +33,46 @@ impl Syscall { /// - `fd`: 用于返回文件描述符的数组 /// - `flags`:设置管道的参数 pub fn pipe2(fd: *mut i32, flags: FileMode) -> Result { - if flags.contains(FileMode::O_NONBLOCK) - || flags.contains(FileMode::O_CLOEXEC) - || flags.contains(FileMode::O_RDONLY) + if !flags + .difference(FileMode::O_CLOEXEC | FileMode::O_NONBLOCK | FileMode::O_DIRECT) + .is_empty() { - let mut user_buffer = - UserBufferWriter::new(fd, core::mem::size_of::<[c_int; 2]>(), true)?; - let fd = user_buffer.buffer::(0)?; - let pipe_ptr = LockedPipeInode::new(); - let mut read_file = File::new(pipe_ptr.clone(), FileMode::O_RDONLY)?; - read_file.private_data = - FilePrivateData::Pipefs(PipeFsPrivateData::new(FileMode::O_RDONLY)); - let mut write_file = File::new(pipe_ptr.clone(), FileMode::O_WRONLY)?; - write_file.private_data = - FilePrivateData::Pipefs(PipeFsPrivateData::new(FileMode::O_WRONLY)); - if flags.contains(FileMode::O_CLOEXEC) { - read_file.set_close_on_exec(true); - write_file.set_close_on_exec(true); - } - let fd_table_ptr = ProcessManager::current_pcb().fd_table(); - let mut fd_table_guard = fd_table_ptr.write(); - let read_fd = fd_table_guard.alloc_fd(read_file, None)?; - let write_fd = fd_table_guard.alloc_fd(write_file, None)?; - - drop(fd_table_guard); - - fd[0] = read_fd; - fd[1] = write_fd; - Ok(0) - } else { - Err(SystemError::EINVAL) + return Err(SystemError::EINVAL); } + + let mut user_buffer = UserBufferWriter::new(fd, core::mem::size_of::<[c_int; 2]>(), true)?; + let fd = user_buffer.buffer::(0)?; + let pipe_ptr = LockedPipeInode::new(); + + let mut read_file = File::new( + pipe_ptr.clone(), + FileMode::O_RDONLY | (flags & FileMode::O_NONBLOCK), + )?; + read_file.private_data = + FilePrivateData::Pipefs(PipeFsPrivateData::new(FileMode::O_RDONLY)); + + let mut write_file = File::new( + pipe_ptr.clone(), + FileMode::O_WRONLY | (flags & (FileMode::O_NONBLOCK | FileMode::O_DIRECT)), + )?; + write_file.private_data = FilePrivateData::Pipefs(PipeFsPrivateData::new( + FileMode::O_WRONLY | (flags & (FileMode::O_NONBLOCK | FileMode::O_DIRECT)), + )); + + if flags.contains(FileMode::O_CLOEXEC) { + read_file.set_close_on_exec(true); + write_file.set_close_on_exec(true); + } + let fd_table_ptr = ProcessManager::current_pcb().fd_table(); + let mut fd_table_guard = fd_table_ptr.write(); + let read_fd = fd_table_guard.alloc_fd(read_file, None)?; + let write_fd = fd_table_guard.alloc_fd(write_file, None)?; + + drop(fd_table_guard); + + fd[0] = read_fd; + fd[1] = write_fd; + Ok(0) } pub fn kill(pid: Pid, sig: c_int) -> Result { diff --git a/kernel/src/net/event_poll/mod.rs b/kernel/src/net/event_poll/mod.rs index b5662ea6..14874ad7 100644 --- a/kernel/src/net/event_poll/mod.rs +++ b/kernel/src/net/event_poll/mod.rs @@ -161,7 +161,7 @@ impl IndexNode for EPollInode { Err(SystemError::ENOSYS) } - fn poll(&self) -> Result { + fn poll(&self, _private_data: &FilePrivateData) -> Result { // 需要实现epoll嵌套epoll时,需要实现这里 todo!() } @@ -704,6 +704,48 @@ impl EventPoll { pub fn ep_wake_one(&self) { self.epoll_wq.wakeup(None); } + + /// ### epoll的回调,支持epoll的文件有事件到来时直接调用该方法即可 + pub fn wakeup_epoll( + epitems: &mut SpinLock>>, + pollflags: EPollEventType, + ) -> Result<(), SystemError> { + let mut epitems_guard = epitems.try_lock_irqsave()?; + // 一次只取一个,因为一次也只有一个进程能拿到对应文件的🔓 + if let Some(epitem) = epitems_guard.pop_front() { + let epoll = epitem.epoll().upgrade().unwrap(); + let mut epoll_guard = epoll.try_lock()?; + let binding = epitem.clone(); + let event_guard = binding.event().read(); + let ep_events = EPollEventType::from_bits_truncate(event_guard.events()); + + // 检查事件合理性以及是否有感兴趣的事件 + if !(ep_events + .difference(EPollEventType::EP_PRIVATE_BITS) + .is_empty() + || pollflags.difference(ep_events).is_empty()) + { + // TODO: 未处理pm相关 + + // 首先将就绪的epitem加入等待队列 + epoll_guard.ep_add_ready(epitem.clone()); + + if epoll_guard.ep_has_waiter() { + if ep_events.contains(EPollEventType::EPOLLEXCLUSIVE) + && !pollflags.contains(EPollEventType::POLLFREE) + { + // 避免惊群 + epoll_guard.ep_wake_one(); + } else { + epoll_guard.ep_wake_all(); + } + } + } + + epitems_guard.push_back(epitem); + } + Ok(()) + } } /// 与C兼容的Epoll事件结构体 diff --git a/kernel/src/net/mod.rs b/kernel/src/net/mod.rs index 91cc576c..4c5769ab 100644 --- a/kernel/src/net/mod.rs +++ b/kernel/src/net/mod.rs @@ -231,11 +231,13 @@ pub trait Socket: Sync + Send + Debug { for epitem in handle_item.epitems.lock_irqsave().iter() { let epoll = epitem.epoll(); - let _ = EventPoll::ep_remove( - &mut epoll.upgrade().unwrap().lock_irqsave(), - epitem.fd(), - None, - ); + if epoll.upgrade().is_some() { + let _ = EventPoll::ep_remove( + &mut epoll.upgrade().unwrap().lock_irqsave(), + epitem.fd(), + None, + ); + } } Ok(()) diff --git a/kernel/src/net/net_core.rs b/kernel/src/net/net_core.rs index 65881133..1c9d5ad1 100644 --- a/kernel/src/net/net_core.rs +++ b/kernel/src/net/net_core.rs @@ -1,5 +1,5 @@ use alloc::{boxed::Box, collections::BTreeMap, sync::Arc}; -use smoltcp::{iface::SocketHandle, socket::dhcpv4, wire}; +use smoltcp::{socket::dhcpv4, wire}; use system_error::SystemError; use crate::{ @@ -11,7 +11,7 @@ use crate::{ }; use super::{ - event_poll::EPollEventType, + event_poll::{EPollEventType, EventPoll}, socket::{TcpSocket, HANDLE_MAP, SOCKET_SET}, }; @@ -228,7 +228,12 @@ fn send_event(sockets: &smoltcp::iface::SocketSet) -> Result<(), SystemError> { smoltcp::socket::Socket::Dns(_) => unimplemented!("Dns socket hasn't unimplemented"), } drop(handle_guard); - wakeup_epoll(handle, events as u32)?; + let mut handle_guard = HANDLE_MAP.write_irqsave(); + let handle_item = handle_guard.get_mut(&handle).unwrap(); + EventPoll::wakeup_epoll( + &mut handle_item.epitems, + EPollEventType::from_bits_truncate(events as u32), + )?; // crate::kdebug!( // "{} send_event {:?}", // handle, @@ -237,48 +242,3 @@ fn send_event(sockets: &smoltcp::iface::SocketSet) -> Result<(), SystemError> { } Ok(()) } - -/// ### 处理epoll -fn wakeup_epoll(handle: SocketHandle, events: u32) -> Result<(), SystemError> { - let mut handle_guard = HANDLE_MAP.write_irqsave(); - let handle_item = handle_guard.get_mut(&handle).unwrap(); - let mut epitems_guard = handle_item.epitems.try_lock_irqsave()?; - - // 从events拿到epoll相关事件 - let pollflags = EPollEventType::from_bits_truncate(events); - - // 一次只取一个,因为一次也只有一个进程能拿到对应文件的🔓 - if let Some(epitem) = epitems_guard.pop_front() { - let epoll = epitem.epoll().upgrade().unwrap(); - let mut epoll_guard = epoll.try_lock_irqsave()?; - let binding = epitem.clone(); - let event_guard = binding.event().read_irqsave(); - let ep_events = EPollEventType::from_bits_truncate(event_guard.events()); - - // 检查事件合理性以及是否有感兴趣的事件 - if !(ep_events - .difference(EPollEventType::EP_PRIVATE_BITS) - .is_empty() - || pollflags.difference(ep_events).is_empty()) - { - // TODO: 未处理pm相关 - - // 首先将就绪的epitem加入等待队列 - epoll_guard.ep_add_ready(epitem.clone()); - - if epoll_guard.ep_has_waiter() { - if ep_events.contains(EPollEventType::EPOLLEXCLUSIVE) - && !pollflags.contains(EPollEventType::POLLFREE) - { - // 避免惊群 - epoll_guard.ep_wake_one(); - } else { - epoll_guard.ep_wake_all(); - } - } - } - - epitems_guard.push_back(epitem); - } - Ok(()) -} diff --git a/kernel/src/net/socket.rs b/kernel/src/net/socket.rs index 2011e667..11902675 100644 --- a/kernel/src/net/socket.rs +++ b/kernel/src/net/socket.rs @@ -20,7 +20,7 @@ use system_error::SystemError; use crate::{ arch::{rand::rand, sched::sched}, driver::net::NetDriver, - filesystem::vfs::{syscall::ModeType, FileType, IndexNode, Metadata}, + filesystem::vfs::{syscall::ModeType, FilePrivateData, FileType, IndexNode, Metadata}, kerror, kwarn, libs::{ rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}, @@ -1351,7 +1351,7 @@ impl IndexNode for SocketInode { return self.0.lock_no_preempt().write(&buf[0..len], None); } - fn poll(&self) -> Result { + fn poll(&self, _private_data: &FilePrivateData) -> Result { let events = self.0.lock_irqsave().poll(); return Ok(events.bits() as usize); }