feat: 实现poll系统调用实现并修复相关bug (#1098)

feat: 实现poll系统调用实现并修复相关bug

- 实现poll系统调用,增加对EPollEvent的处理逻辑
- 修复LockedPipeInode中epitems的锁管理问题
- 添加RestartBlock支持,处理系统调用重启逻辑
- 修复EventPoll中epoll_wait的超时处理逻辑
- 新增test_poll测试程序,验证poll功能

Signed-off-by: longjin <longjin@DragonOS.org>
This commit is contained in:
LoGin 2025-03-14 10:48:06 +08:00 committed by GitHub
parent 65f6119c9f
commit 488718dc2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 743 additions and 158 deletions

View File

@ -84,7 +84,8 @@ pub trait TtyPort: Sync + Send + Debug {
let ld = tty.ldisc();
let ret = ld.receive_buf2(tty.clone(), buf, None, count);
if ret.is_err() && ret.clone().unwrap_err() == SystemError::ENOSYS {
if let Err(SystemError::ENOSYS) = ret {
return ld.receive_buf(tty, buf, None, count);
}

View File

@ -6,6 +6,7 @@ pub mod kernfs;
pub mod mbr;
pub mod overlayfs;
pub mod page_cache;
pub mod poll;
pub mod procfs;
pub mod ramfs;
pub mod sysfs;

View File

@ -0,0 +1,209 @@
use core::ffi::c_int;
use crate::{
ipc::signal::{RestartBlock, RestartBlockData, RestartFn},
mm::VirtAddr,
net::event_poll::{EPollCtlOption, EPollEvent, EPollEventType, EventPoll},
process::ProcessManager,
syscall::{user_access::UserBufferWriter, Syscall},
time::{Duration, Instant},
};
use super::vfs::file::{File, FileMode};
use alloc::sync::Arc;
use system_error::SystemError;
#[repr(C)]
#[derive(Debug)]
pub struct PollFd {
pub fd: c_int,
pub events: u16,
pub revents: u16,
}
struct PollAdapter<'a> {
ep_file: Arc<File>,
poll_fds: &'a mut [PollFd],
}
impl<'a> PollAdapter<'a> {
pub fn new(ep_file: Arc<File>, poll_fds: &'a mut [PollFd]) -> Self {
Self { ep_file, poll_fds }
}
fn add_pollfds(&self) -> Result<(), SystemError> {
for pollfd in self.poll_fds.iter() {
let mut epoll_event = EPollEvent::default();
let poll_flags = PollFlags::from_bits_truncate(pollfd.events);
let ep_events: EPollEventType = poll_flags.into();
epoll_event.set_events(ep_events.bits());
EventPoll::epoll_ctl_with_epfile(
self.ep_file.clone(),
EPollCtlOption::Add,
pollfd.fd,
epoll_event,
false,
)
.map(|_| ())?;
}
Ok(())
}
fn poll_all_fds(&mut self, timeout: Option<Instant>) -> Result<usize, SystemError> {
let mut epoll_events = vec![EPollEvent::default(); self.poll_fds.len()];
let len = epoll_events.len() as i32;
let remain_timeout = timeout
.and_then(|t| t.duration_since(Instant::now()))
.map(|t| t.into());
let events = EventPoll::epoll_wait_with_file(
self.ep_file.clone(),
&mut epoll_events,
len,
remain_timeout,
)?;
for (i, event) in epoll_events.iter().enumerate() {
self.poll_fds[i].revents = (event.events() & 0xffff) as u16;
}
Ok(events)
}
}
impl Syscall {
/// https://code.dragonos.org.cn/xref/linux-6.6.21/fs/select.c#1068
pub fn poll(pollfd_ptr: usize, nfds: u32, timeout_ms: i32) -> Result<usize, SystemError> {
let pollfd_ptr = VirtAddr::new(pollfd_ptr);
let len = nfds as usize * core::mem::size_of::<PollFd>();
let mut timeout: Option<Instant> = None;
if timeout_ms >= 0 {
timeout = poll_select_set_timeout(timeout_ms);
}
let mut poll_fds_writer = UserBufferWriter::new(pollfd_ptr.as_ptr::<PollFd>(), len, true)?;
let mut r = do_sys_poll(poll_fds_writer.buffer(0)?, timeout);
if let Err(SystemError::ERESTARTNOHAND) = r {
let restart_block_data = RestartBlockData::new_poll(pollfd_ptr, nfds, timeout);
let restart_block = RestartBlock::new(&RestartFnPoll, restart_block_data);
r = ProcessManager::current_pcb().set_restart_fn(Some(restart_block));
}
return r;
}
}
/// 计算超时的时刻
fn poll_select_set_timeout(timeout_ms: i32) -> Option<Instant> {
if timeout_ms == 0 {
return None;
}
Some(Instant::now() + Duration::from_millis(timeout_ms as u64))
}
fn do_sys_poll(poll_fds: &mut [PollFd], timeout: Option<Instant>) -> Result<usize, SystemError> {
let ep_file = EventPoll::create_epoll_file(FileMode::empty())?;
let ep_file = Arc::new(ep_file);
let mut adapter = PollAdapter::new(ep_file, poll_fds);
adapter.add_pollfds()?;
let nevents = adapter.poll_all_fds(timeout)?;
Ok(nevents)
}
bitflags! {
pub struct PollFlags: u16 {
const POLLIN = 0x0001;
const POLLPRI = 0x0002;
const POLLOUT = 0x0004;
const POLLERR = 0x0008;
const POLLHUP = 0x0010;
const POLLNVAL = 0x0020;
const POLLRDNORM = 0x0040;
const POLLRDBAND = 0x0080;
const POLLWRNORM = 0x0100;
const POLLWRBAND = 0x0200;
const POLLMSG = 0x0400;
const POLLREMOVE = 0x1000;
const POLLRDHUP = 0x2000;
const POLLFREE = 0x4000;
const POLL_BUSY_LOOP = 0x8000;
}
}
impl From<PollFlags> for EPollEventType {
fn from(val: PollFlags) -> Self {
let mut epoll_flags = EPollEventType::empty();
if val.contains(PollFlags::POLLIN) {
epoll_flags |= EPollEventType::EPOLLIN;
}
if val.contains(PollFlags::POLLPRI) {
epoll_flags |= EPollEventType::EPOLLPRI;
}
if val.contains(PollFlags::POLLOUT) {
epoll_flags |= EPollEventType::EPOLLOUT;
}
if val.contains(PollFlags::POLLERR) {
epoll_flags |= EPollEventType::EPOLLERR;
}
if val.contains(PollFlags::POLLHUP) {
epoll_flags |= EPollEventType::EPOLLHUP;
}
if val.contains(PollFlags::POLLNVAL) {
epoll_flags |= EPollEventType::EPOLLNVAL;
}
if val.contains(PollFlags::POLLRDNORM) {
epoll_flags |= EPollEventType::EPOLLRDNORM;
}
if val.contains(PollFlags::POLLRDBAND) {
epoll_flags |= EPollEventType::EPOLLRDBAND;
}
if val.contains(PollFlags::POLLWRNORM) {
epoll_flags |= EPollEventType::EPOLLWRNORM;
}
if val.contains(PollFlags::POLLWRBAND) {
epoll_flags |= EPollEventType::EPOLLWRBAND;
}
if val.contains(PollFlags::POLLMSG) {
epoll_flags |= EPollEventType::EPOLLMSG;
}
if val.contains(PollFlags::POLLRDHUP) {
epoll_flags |= EPollEventType::EPOLLRDHUP;
}
if val.contains(PollFlags::POLLFREE) {
epoll_flags |= EPollEventType::POLLFREE;
}
epoll_flags
}
}
/// sys_poll的restart fn
#[derive(Debug)]
struct RestartFnPoll;
impl RestartFn for RestartFnPoll {
// 参考 https://code.dragonos.org.cn/xref/linux-6.6.21/fs/select.c#1047
fn call(&self, data: &mut RestartBlockData) -> Result<usize, SystemError> {
if let RestartBlockData::Poll(d) = data {
let len = d.nfds as usize * core::mem::size_of::<PollFd>();
let mut poll_fds_writer =
UserBufferWriter::new(d.pollfd_ptr.as_ptr::<PollFd>(), len, true)?;
let mut r = do_sys_poll(poll_fds_writer.buffer(0)?, d.timeout_instant);
if let Err(SystemError::ERESTARTNOHAND) = r {
let restart_block = RestartBlock::new(&RestartFnPoll, data.clone());
r = ProcessManager::current_pcb().set_restart_fn(Some(restart_block));
}
return r;
} else {
panic!("RestartFnPoll called with wrong data type: {:?}", data);
}
}
}

View File

@ -505,7 +505,7 @@ impl File {
}
FileType::Pipe => {
let inode = self.inode.downcast_ref::<LockedPipeInode>().unwrap();
return inode.inner().lock().add_epoll(epitem);
return inode.add_epoll(epitem);
}
_ => {
let r = self.inode.kernel_ioctl(epitem, &self.private_data.lock());
@ -529,13 +529,14 @@ impl File {
}
FileType::Pipe => {
let inode = self.inode.downcast_ref::<LockedPipeInode>().unwrap();
inode.inner().lock().remove_epoll(epoll)
inode.remove_epoll(epoll)
}
_ => {
let inode = self.inode.downcast_ref::<EventFdInode>();
if let Some(inode) = inode {
return inode.remove_epoll(epoll);
}
let inode = self
.inode
.downcast_ref::<PerfEventInode>()

View File

@ -49,6 +49,7 @@ pub struct LockedPipeInode {
inner: SpinLock<InnerPipeInode>,
read_wait_queue: WaitQueue,
write_wait_queue: WaitQueue,
epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
}
/// @brief 管道文件i节点(无锁)
@ -65,7 +66,6 @@ pub struct InnerPipeInode {
reader: u32,
writer: u32,
had_reader: bool,
epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
}
impl InnerPipeInode {
@ -81,7 +81,7 @@ impl InnerPipeInode {
if mode.contains(FileMode::O_RDONLY) {
if self.valid_cnt != 0 {
// 有数据可读
events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM);
events.insert(EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM);
}
// 没有写者
@ -93,7 +93,7 @@ impl InnerPipeInode {
if mode.contains(FileMode::O_WRONLY) {
// 管道内数据未满
if self.valid_cnt as usize != PIPE_BUFF_SIZE {
events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM);
events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
}
// 没有读者
@ -105,29 +105,9 @@ impl InnerPipeInode {
Ok(events.bits() as usize)
}
pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
self.epitems.lock().push_back(epitem);
Ok(())
}
fn buf_full(&self) -> bool {
return self.valid_cnt as usize == PIPE_BUFF_SIZE;
}
pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
let is_remove = !self
.epitems
.lock_irqsave()
.extract_if(|x| x.epoll().ptr_eq(epoll))
.collect::<Vec<_>>()
.is_empty();
if is_remove {
return Ok(());
}
Err(SystemError::ENOENT)
}
}
impl LockedPipeInode {
@ -158,12 +138,12 @@ impl LockedPipeInode {
},
reader: 0,
writer: 0,
epitems: SpinLock::new(LinkedList::new()),
};
let result = Arc::new(Self {
inner: SpinLock::new(inner),
read_wait_queue: WaitQueue::default(),
write_wait_queue: WaitQueue::default(),
epitems: SpinLock::new(LinkedList::new()),
});
let mut guard = result.inner.lock();
guard.self_ref = Arc::downgrade(&result);
@ -185,6 +165,26 @@ impl LockedPipeInode {
let inode = self.inner.lock();
return !inode.buf_full() || inode.reader == 0;
}
pub fn add_epoll(&self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
self.epitems.lock().push_back(epitem);
Ok(())
}
pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
let is_remove = !self
.epitems
.lock_irqsave()
.extract_if(|x| x.epoll().ptr_eq(epoll))
.collect::<Vec<_>>()
.is_empty();
if is_remove {
return Ok(());
}
Err(SystemError::ENOENT)
}
}
impl IndexNode for LockedPipeInode {
@ -210,12 +210,12 @@ impl IndexNode for LockedPipeInode {
}
// log::debug!("pipe mode: {:?}", mode);
// 加锁
let mut inode = self.inner.lock();
let mut inner_guard = self.inner.lock();
// 如果管道里面没有数据,则唤醒写端,
while inode.valid_cnt == 0 {
while inner_guard.valid_cnt == 0 {
// 如果当前管道写者数为0则返回EOF
if inode.writer == 0 {
if inner_guard.writer == 0 {
return Ok(0);
}
@ -224,12 +224,12 @@ impl IndexNode for LockedPipeInode {
// 如果为非阻塞管道,直接返回错误
if mode.contains(FileMode::O_NONBLOCK) {
drop(inode);
drop(inner_guard);
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
// 否则在读等待队列中睡眠,并释放锁
drop(inode);
drop(inner_guard);
let r = wq_wait_event_interruptible!(self.read_wait_queue, self.readable(), {});
if r.is_err() {
ProcessManager::current_pcb()
@ -238,35 +238,37 @@ impl IndexNode for LockedPipeInode {
return Err(SystemError::ERESTARTSYS);
}
inode = self.inner.lock();
inner_guard = self.inner.lock();
}
let mut num = inode.valid_cnt as usize;
let mut num = inner_guard.valid_cnt as usize;
//决定要输出的字节
let start = inode.read_pos as usize;
let start = inner_guard.read_pos as usize;
//如果读端希望读取的字节数大于有效字节数,则输出有效字节
let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE;
let mut end =
(inner_guard.valid_cnt as usize + inner_guard.read_pos as usize) % PIPE_BUFF_SIZE;
//如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
if len < inode.valid_cnt as usize {
end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE;
if len < inner_guard.valid_cnt as usize {
end = (len + inner_guard.read_pos as usize) % PIPE_BUFF_SIZE;
num = len;
}
// 从管道拷贝数据到用户的缓冲区
if end < start {
buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]);
buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]);
buf[0..(PIPE_BUFF_SIZE - start)]
.copy_from_slice(&inner_guard.data[start..PIPE_BUFF_SIZE]);
buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inner_guard.data[0..end]);
} else {
buf[0..num].copy_from_slice(&inode.data[start..end]);
buf[0..num].copy_from_slice(&inner_guard.data[start..end]);
}
//更新读位置以及valid_cnt
inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
inode.valid_cnt -= num as i32;
inner_guard.read_pos = (inner_guard.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
inner_guard.valid_cnt -= num as i32;
// 读完以后如果未读完,则唤醒下一个读者
if inode.valid_cnt > 0 {
if inner_guard.valid_cnt > 0 {
self.read_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
}
@ -274,10 +276,10 @@ impl IndexNode for LockedPipeInode {
//读完后解锁并唤醒等待在写等待队列中的进程
self.write_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
drop(inner_guard);
// 唤醒epoll中等待的进程
EventPoll::wakeup_epoll(&inode.epitems, Some(pollflag))?;
EventPoll::wakeup_epoll(&self.epitems, Some(pollflag))?;
//返回读取的字节数
return Ok(num);
@ -380,11 +382,10 @@ impl IndexNode for LockedPipeInode {
return Err(SystemError::EINVAL);
}
// 加锁
let mut inner_guard = self.inner.lock();
let mut inode = self.inner.lock();
if inode.reader == 0 {
if !inode.had_reader {
if inner_guard.reader == 0 {
if !inner_guard.had_reader {
// 如果从未有读端,直接返回 ENXIO无论是否阻塞模式
return Err(SystemError::ENXIO);
} else {
@ -417,43 +418,44 @@ impl IndexNode for LockedPipeInode {
// 如果管道空间不够
while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE {
while len + inner_guard.valid_cnt as usize > PIPE_BUFF_SIZE {
// 唤醒读端
self.read_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
// 如果为非阻塞管道,直接返回错误
if mode.contains(FileMode::O_NONBLOCK) {
drop(inode);
drop(inner_guard);
return Err(SystemError::ENOMEM);
}
// 解锁并睡眠
drop(inode);
drop(inner_guard);
let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {});
if r.is_err() {
return Err(SystemError::ERESTARTSYS);
}
inode = self.inner.lock();
inner_guard = self.inner.lock();
}
// 决定要输入的字节
let start = inode.write_pos as usize;
let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE;
let start = inner_guard.write_pos as usize;
let end = (inner_guard.write_pos as usize + len) % PIPE_BUFF_SIZE;
// 从用户的缓冲区拷贝数据到管道
if end < start {
inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
inner_guard.data[start..PIPE_BUFF_SIZE]
.copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
inner_guard.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
} else {
inode.data[start..end].copy_from_slice(&buf[0..len]);
inner_guard.data[start..end].copy_from_slice(&buf[0..len]);
}
// 更新写位置以及valid_cnt
inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
inode.valid_cnt += len as i32;
inner_guard.write_pos = (inner_guard.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
inner_guard.valid_cnt += len as i32;
// 写完后还有位置,则唤醒下一个写者
if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE {
if (inner_guard.valid_cnt as usize) < PIPE_BUFF_SIZE {
self.write_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
}
@ -462,9 +464,11 @@ impl IndexNode for LockedPipeInode {
self.read_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
drop(inner_guard);
// 唤醒epoll中等待的进程
EventPoll::wakeup_epoll(&inode.epitems, Some(pollflag))?;
EventPoll::wakeup_epoll(&self.epitems, Some(pollflag))?;
// 返回写入的字节数
return Ok(len);

View File

@ -1,4 +1,4 @@
use core::sync::atomic::compiler_fence;
use core::{fmt::Debug, sync::atomic::compiler_fence};
use alloc::sync::Arc;
use log::warn;
@ -8,9 +8,11 @@ use crate::{
arch::ipc::signal::{SigCode, SigFlags, SigSet, Signal},
ipc::signal_types::SigactionType,
libs::spinlock::SpinLockGuard,
mm::VirtAddr,
process::{
pid::PidType, Pid, ProcessControlBlock, ProcessFlags, ProcessManager, ProcessSignalInfo,
},
time::Instant,
};
use super::signal_types::{
@ -664,3 +666,45 @@ pub fn set_sigprocmask(how: SigHow, set: SigSet) -> Result<SigSet, SystemError>
__set_current_blocked(&res_set);
Ok(oset)
}
#[derive(Debug)]
pub struct RestartBlock {
pub data: RestartBlockData,
pub restart_fn: &'static dyn RestartFn,
}
impl RestartBlock {
pub fn new(restart_fn: &'static dyn RestartFn, data: RestartBlockData) -> Self {
Self { data, restart_fn }
}
}
pub trait RestartFn: Debug + Sync + Send + 'static {
fn call(&self, data: &mut RestartBlockData) -> Result<usize, SystemError>;
}
#[derive(Debug, Clone)]
pub enum RestartBlockData {
Poll(PollRestartBlockData),
// todo: nanosleep
Nanosleep(),
// todo: futex_wait
FutexWait(),
}
impl RestartBlockData {
pub fn new_poll(pollfd_ptr: VirtAddr, nfds: u32, timeout_instant: Option<Instant>) -> Self {
Self::Poll(PollRestartBlockData {
pollfd_ptr,
nfds,
timeout_instant,
})
}
}
#[derive(Debug, Clone)]
pub struct PollRestartBlockData {
pub pollfd_ptr: VirtAddr,
pub nfds: u32,
pub timeout_instant: Option<Instant>,
}

View File

@ -548,8 +548,18 @@ impl Syscall {
}
pub fn restart_syscall() -> Result<usize, SystemError> {
// todo: https://code.dragonos.org.cn/xref/linux-6.1.9/kernel/signal.c#2998
unimplemented!("restart_syscall with restart block");
// Err(SystemError::ENOSYS)
let restart_block = ProcessManager::current_pcb().restart_block().take();
if let Some(mut restart_block) = restart_block {
return restart_block.restart_fn.call(&mut restart_block.data);
} else {
// 不应该走到这里因此kill掉当前进程及同组的进程
let pid = Pid::new(0);
let sig = Signal::SIGKILL;
let mut info = SigInfo::new(sig, 0, SigCode::Kernel, SigType::Kill(pid));
sig.send_signal_info(Some(&mut info), pid)
.expect("Failed to kill ");
return Ok(0);
}
}
}

View File

@ -51,27 +51,6 @@ pub struct EventPoll {
self_ref: Option<Weak<SpinLock<EventPoll>>>,
}
impl EventPoll {
pub const EP_MAX_EVENTS: u32 = u32::MAX / (core::mem::size_of::<EPollEvent>() as u32);
/// 用于获取inode中的epitem队列
pub const ADD_EPOLLITEM: u32 = 0x7965;
pub fn new() -> Self {
Self {
epoll_wq: WaitQueue::default(),
ep_items: RBTree::new(),
ready_list: LinkedList::new(),
shutdown: AtomicBool::new(false),
self_ref: None,
}
}
}
impl Default for EventPoll {
fn default() -> Self {
Self::new()
}
}
/// EpollItem表示的是Epoll所真正管理的对象
/// 每当用户向Epoll添加描述符时都会注册一个新的EpollItemEpollItem携带了一些被监听的描述符的必要信息
#[derive(Debug)]
@ -199,25 +178,7 @@ impl IndexNode for EPollInode {
// 释放资源
let mut epoll = self.epoll.0.lock_irqsave();
// 唤醒epoll上面等待的所有进程
epoll.shutdown.store(true, Ordering::SeqCst);
epoll.ep_wake_all();
let fds = epoll.ep_items.keys().cloned().collect::<Vec<_>>();
// 清理红黑树里面的epitems
for fd in fds {
let file = ProcessManager::current_pcb()
.fd_table()
.read()
.get_file_by_fd(fd);
if file.is_some() {
file.unwrap().remove_epoll(&Arc::downgrade(&self.epoll.0))?;
}
epoll.ep_items.remove(&fd);
}
epoll.close()?;
Ok(())
}
@ -232,21 +193,72 @@ impl IndexNode for EPollInode {
}
impl EventPoll {
/// ## 创建epoll对象
pub const EP_MAX_EVENTS: u32 = u32::MAX / (core::mem::size_of::<EPollEvent>() as u32);
/// 用于获取inode中的epitem队列
pub const ADD_EPOLLITEM: u32 = 0x7965;
fn new() -> Self {
Self {
epoll_wq: WaitQueue::default(),
ep_items: RBTree::new(),
ready_list: LinkedList::new(),
shutdown: AtomicBool::new(false),
self_ref: None,
}
}
/// 关闭epoll时执行的逻辑
fn close(&mut self) -> Result<(), SystemError> {
// 唤醒epoll上面等待的所有进程
self.shutdown.store(true, Ordering::SeqCst);
self.ep_wake_all();
let fds: Vec<i32> = self.ep_items.keys().cloned().collect::<Vec<_>>();
// 清理红黑树里面的epitems
for fd in fds {
let file = ProcessManager::current_pcb()
.fd_table()
.read()
.get_file_by_fd(fd);
if let Some(file) = file {
if let Some(self_ref) = self.self_ref.as_ref() {
file.remove_epoll(self_ref)?;
}
}
self.ep_items.remove(&fd);
}
Ok(())
}
/// ## 创建epoll对象, 并将其加入到当前进程的fd_table中
///
/// ### 参数
/// - flags: 创建的epoll文件的FileMode
///
/// ### 返回值
/// - 成功则返回Ok(fd)否则返回Err
pub fn do_create_epoll(flags: FileMode) -> Result<usize, SystemError> {
pub fn create_epoll(flags: FileMode) -> Result<usize, SystemError> {
let ep_file = Self::create_epoll_file(flags)?;
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let mut fd_table_guard = fd_table.write();
let fd = fd_table_guard.alloc_fd(ep_file, None)?;
Ok(fd as usize)
}
/// ## 创建epoll文件
pub fn create_epoll_file(flags: FileMode) -> Result<File, SystemError> {
if !flags.difference(FileMode::O_CLOEXEC).is_empty() {
return Err(SystemError::EINVAL);
}
// 创建epoll
let epoll = LockedEventPoll(Arc::new(SpinLock::new(EventPoll::new())));
epoll.0.lock_irqsave().self_ref = Some(Arc::downgrade(&epoll.0));
let epoll = Self::do_create_epoll();
// 创建epoll的inode对象
let epoll_inode = EPollInode::new(epoll.clone());
@ -258,14 +270,13 @@ impl EventPoll {
// 设置ep_file的FilePrivateData
ep_file.private_data = SpinLock::new(FilePrivateData::EPoll(EPollPrivateData { epoll }));
Ok(ep_file)
}
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let mut fd_table_guard = fd_table.write();
let fd = fd_table_guard.alloc_fd(ep_file, None)?;
Ok(fd as usize)
fn do_create_epoll() -> LockedEventPoll {
let epoll = LockedEventPoll(Arc::new(SpinLock::new(EventPoll::new())));
epoll.0.lock().self_ref = Some(Arc::downgrade(&epoll.0));
epoll
}
/// ## epoll_ctl的具体实现
@ -273,30 +284,20 @@ impl EventPoll {
/// 根据不同的op对epoll文件进行增删改
///
/// ### 参数
/// - epfd: 操作的epoll文件描述符
/// - ep_file: epoll文件
/// - op: 对应的操作
/// - fd: 操作对应的文件描述符
/// - dstfd: 操作对应的文件描述符
/// - dst_file: 操作对应的文件(与dstfd对应)
/// - epds: 从用户态传入的event若op为EpollCtlAdd则对应注册的监听事件若op为EPollCtlMod则对应更新的事件删除操作不涉及此字段
/// - nonblock: 定义这次操作是否为非阻塞有可能其他地方占有EPoll的锁
pub fn do_epoll_ctl(
epfd: i32,
fn do_epoll_ctl(
ep_file: Arc<File>,
op: EPollCtlOption,
fd: i32,
epds: &mut EPollEvent,
dstfd: i32,
dst_file: Arc<File>,
mut epds: EPollEvent,
nonblock: bool,
) -> Result<usize, SystemError> {
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let fd_table_guard = fd_table.read();
// 获取epoll和对应fd指向的文件
let ep_file = fd_table_guard
.get_file_by_fd(epfd)
.ok_or(SystemError::EBADF)?;
let dst_file = fd_table_guard
.get_file_by_fd(fd)
.ok_or(SystemError::EBADF)?;
// 检查是否允许 EPOLLWAKEUP
if op != EPollCtlOption::Del {
epds.events &= !EPollEventType::EPOLLWAKEUP.bits();
@ -351,7 +352,7 @@ impl EventPoll {
}
}
let ep_item = epoll_guard.ep_items.get(&fd);
let ep_item = epoll_guard.ep_items.get(&dstfd);
match op {
EPollCtlOption::Add => {
// 如果已经存在,则返回错误
@ -361,8 +362,8 @@ impl EventPoll {
// 设置epoll
let epitem = Arc::new(EPollItem::new(
Arc::downgrade(&epoll_data.epoll.0),
*epds,
fd,
epds,
dstfd,
Arc::downgrade(&dst_file),
));
Self::ep_insert(&mut epoll_guard, dst_file, epitem)?;
@ -373,7 +374,7 @@ impl EventPoll {
return Err(SystemError::ENOENT);
}
// 删除
Self::ep_remove(&mut epoll_guard, fd, Some(dst_file))?;
Self::ep_remove(&mut epoll_guard, dstfd, Some(dst_file))?;
}
EPollCtlOption::Mod => {
// 不存在则返回错误
@ -385,7 +386,7 @@ impl EventPoll {
epds.events |=
EPollEventType::EPOLLERR.bits() | EPollEventType::EPOLLHUP.bits();
Self::ep_modify(&mut epoll_guard, ep_item, epds)?;
Self::ep_modify(&mut epoll_guard, ep_item, &epds)?;
}
}
}
@ -394,8 +395,50 @@ impl EventPoll {
Ok(0)
}
/// ## epoll_wait的具体实现
pub fn do_epoll_wait(
pub fn epoll_ctl_with_epfd(
epfd: i32,
op: EPollCtlOption,
dstfd: i32,
epds: EPollEvent,
nonblock: bool,
) -> Result<usize, SystemError> {
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let fd_table_guard = fd_table.read();
// 获取epoll和对应fd指向的文件
let ep_file = fd_table_guard
.get_file_by_fd(epfd)
.ok_or(SystemError::EBADF)?;
let dst_file = fd_table_guard
.get_file_by_fd(dstfd)
.ok_or(SystemError::EBADF)?;
drop(fd_table_guard);
Self::do_epoll_ctl(ep_file, op, dstfd, dst_file, epds, nonblock)
}
pub fn epoll_ctl_with_epfile(
ep_file: Arc<File>,
op: EPollCtlOption,
dstfd: i32,
epds: EPollEvent,
nonblock: bool,
) -> Result<usize, SystemError> {
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let fd_table_guard = fd_table.read();
let dst_file = fd_table_guard
.get_file_by_fd(dstfd)
.ok_or(SystemError::EBADF)?;
drop(fd_table_guard);
Self::do_epoll_ctl(ep_file, op, dstfd, dst_file, epds, nonblock)
}
pub fn epoll_wait(
epfd: i32,
epoll_event: &mut [EPollEvent],
max_events: i32,
@ -411,6 +454,16 @@ impl EventPoll {
.ok_or(SystemError::EBADF)?;
drop(fd_table_guard);
Self::epoll_wait_with_file(ep_file, epoll_event, max_events, timespec)
}
/// ## epoll_wait的具体实现
pub fn epoll_wait_with_file(
ep_file: Arc<File>,
epoll_event: &mut [EPollEvent],
max_events: i32,
timespec: Option<PosixTimeSpec>,
) -> Result<usize, SystemError> {
let current_pcb = ProcessManager::current_pcb();
// 确保是epoll file
if !Self::is_epoll_file(&ep_file) {
@ -432,6 +485,9 @@ impl EventPoll {
// 非阻塞情况
timeout = true;
}
} else if timespec.is_none() {
// 非阻塞情况
timeout = true;
}
// 判断epoll上有没有就绪事件
let mut available = epoll_guard.ep_events_available();
@ -502,6 +558,7 @@ impl EventPoll {
})?;
drop(guard);
schedule(SchedMode::SM_NONE);
// 被唤醒后,检查是否有事件可读
available = epoll.0.lock_irqsave().ep_events_available();
if let Some(timer) = timer {
@ -530,6 +587,9 @@ impl EventPoll {
user_event: &mut [EPollEvent],
max_events: i32,
) -> Result<usize, SystemError> {
if user_event.len() < max_events as usize {
return Err(SystemError::EINVAL);
}
let mut ep_guard = epoll.0.lock_irqsave();
let mut res: usize = 0;
@ -651,11 +711,9 @@ impl EventPoll {
dst_file.remove_epoll(epoll.self_ref.as_ref().unwrap())?;
}
let epitem = epoll.ep_items.remove(&fd).unwrap();
let _ = epoll
.ready_list
.extract_if(|item| Arc::ptr_eq(item, &epitem));
if let Some(epitem) = epoll.ep_items.remove(&fd) {
epoll.ready_list.retain(|item| !Arc::ptr_eq(item, &epitem));
}
Ok(())
}
@ -740,7 +798,6 @@ impl EventPoll {
let binding = epitem.clone();
let event_guard = binding.event().read();
let ep_events = EPollEventType::from_bits_truncate(event_guard.events());
// 检查事件合理性以及是否有感兴趣的事件
if !(ep_events
.difference(EPollEventType::EP_PRIVATE_BITS)

View File

@ -20,13 +20,13 @@ impl Syscall {
return Err(SystemError::EINVAL);
}
return EventPoll::do_create_epoll(FileMode::empty());
return EventPoll::create_epoll(FileMode::empty());
}
pub fn epoll_create1(flag: usize) -> Result<usize, SystemError> {
let flags = FileMode::from_bits_truncate(flag as u32);
let ret = EventPoll::do_create_epoll(flags);
let ret = EventPoll::create_epoll(flags);
ret
}
@ -60,7 +60,7 @@ impl Syscall {
)?;
let epoll_events = epds_writer.buffer::<EPollEvent>(0)?;
return EventPoll::do_epoll_wait(epfd, epoll_events, max_events, timespec);
return EventPoll::epoll_wait(epfd, epoll_events, max_events, timespec);
}
pub fn epoll_ctl(epfd: i32, op: usize, fd: i32, event: VirtAddr) -> Result<usize, SystemError> {
@ -84,7 +84,7 @@ impl Syscall {
epds_reader.copy_one_from_user(&mut epds, 0)?;
}
return EventPoll::do_epoll_ctl(epfd, op, fd, &mut epds, false);
return EventPoll::epoll_ctl_with_epfd(epfd, op, fd, epds, false);
}
/// ## 在epoll_wait时屏蔽某些信号

View File

@ -31,7 +31,10 @@ use crate::{
procfs::procfs_unregister_pid,
vfs::{file::FileDescriptorVec, FileType},
},
ipc::signal_types::{SigInfo, SigPending, SignalStruct},
ipc::{
signal::RestartBlock,
signal_types::{SigInfo, SigPending, SignalStruct},
},
libs::{
align::AlignedBox,
casting::DowncastArc,
@ -712,6 +715,8 @@ pub struct ProcessControlBlock {
/// 进程作为主体的凭证集
cred: SpinLock<Cred>,
self_ref: Weak<ProcessControlBlock>,
restart_block: SpinLock<Option<RestartBlock>>,
}
impl ProcessControlBlock {
@ -799,6 +804,7 @@ impl ProcessControlBlock {
nsproxy: Arc::new(RwLock::new(NsProxy::new())),
cred: SpinLock::new(cred),
self_ref: Weak::new(),
restart_block: SpinLock::new(None),
};
pcb.sig_info.write().set_tty(tty);
@ -1117,6 +1123,18 @@ impl ProcessControlBlock {
pub fn threads_read_irqsave(&self) -> RwLockReadGuard<ThreadInfo> {
self.thread.read_irqsave()
}
pub fn restart_block(&self) -> SpinLockGuard<Option<RestartBlock>> {
self.restart_block.lock()
}
pub fn set_restart_fn(
&self,
restart_block: Option<RestartBlock>,
) -> Result<usize, SystemError> {
*self.restart_block.lock() = restart_block;
return Err(SystemError::ERESTART_RESTARTBLOCK);
}
}
impl Drop for ProcessControlBlock {

View File

@ -879,8 +879,10 @@ impl Syscall {
#[cfg(target_arch = "x86_64")]
SYS_POLL => {
warn!("SYS_POLL has not yet been implemented");
Ok(0)
let fds = args[0];
let nfds = args[1] as u32;
let timeout = args[2] as i32;
Self::poll(fds, nfds, timeout)
}
SYS_SETPGID => {

View File

@ -270,6 +270,24 @@ impl Instant {
pub const fn total_micros(&self) -> i64 {
self.micros
}
/// Returns the duration between this instant and another one.
///
/// # Arguments
///
/// * `earlier` - The earlier instant to calculate the duration since.
///
/// # Returns
///
/// An `Option<Duration>` representing the duration between this instant and the earlier one.
/// If the earlier instant is later than this one, it returns `None`.
pub fn duration_since(&self, earlier: Instant) -> Option<Duration> {
if earlier.micros > self.micros {
return None;
}
let micros_diff = self.micros - earlier.micros;
Some(Duration::from_micros(micros_diff as u64))
}
}
impl fmt::Display for Instant {

View File

@ -79,6 +79,7 @@ impl Syscall {
if sleep_time.is_null() {
return Err(SystemError::EFAULT);
}
let slt_spec = PosixTimeSpec {
tv_sec: unsafe { *sleep_time }.tv_sec,
tv_nsec: unsafe { *sleep_time }.tv_nsec,

1
user/apps/test_poll/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
test_poll

View File

@ -0,0 +1,21 @@
ifeq ($(ARCH), x86_64)
CROSS_COMPILE=x86_64-linux-musl-
else ifeq ($(ARCH), riscv64)
CROSS_COMPILE=riscv64-linux-musl-
endif
BIN_NAME=test_poll
CC=$(CROSS_COMPILE)gcc
.PHONY: all
all: main.c
$(CC) -static -o $(BIN_NAME) main.c
.PHONY: install clean
install: all
mv $(BIN_NAME) $(DADK_CURRENT_BUILD_DIR)/$(BIN_NAME)
clean:
rm $(BIN_NAME) *.o
fmt:

151
user/apps/test_poll/main.c Normal file
View File

@ -0,0 +1,151 @@
#include <errno.h>
#include <poll.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
int pipe_fd[2]; // 管道文件描述符数组
int child_can_exit = 0; // 子进程是否可以退出的标志
int signal_pid = 0;
int poll_errno; // poll错误码
#define WRITE_WAIT_SEC 3
#define POLL_TIMEOUT_SEC 5
#define EXPECTED_MESSAGE "Data is ready!\n"
#define POLL_DELTA_MS 1000
#define min(a, b) ((a) < (b) ? (a) : (b))
// 信号处理函数
void signal_handler(int signo) {
printf("[PID: %d, TID: %lu] Signal %d received.\n", getpid(), pthread_self(),
signo);
}
// 线程函数用于在n秒后向管道写入数据
void *writer_thread(void *arg) {
int seconds = WRITE_WAIT_SEC;
for (int i = 0; i < seconds; i++) {
printf("[PID: %d, TID: %lu] Waiting for %d seconds...\n", getpid(),
pthread_self(), seconds - i);
sleep(1);
kill(signal_pid, SIGUSR1); // 发送信号
}
const char *message = EXPECTED_MESSAGE;
write(pipe_fd[1], message, strlen(message)); // 写入管道
printf("[PID: %d, TID: %lu] Data written to pipe.\n", getpid(),
pthread_self());
close(pipe_fd[1]); // 关闭写端
printf("[PID: %d, TID: %lu] Pipe write end closed.\n", getpid(),
pthread_self());
while (child_can_exit == 0) {
printf("[PID: %d, TID: %lu] Waiting for main to finish...\n", getpid(),
pthread_self());
sleep(1);
}
return NULL;
}
int main() {
pthread_t tid;
struct pollfd fds[1];
int ret;
int test_passed = 1; // 假设测试通过
// 创建管道
if (pipe(pipe_fd) == -1) {
perror("pipe");
exit(EXIT_FAILURE);
}
// 设置信号处理函数
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGUSR1, &sa, NULL) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
}
signal_pid = getpid(); // 设置信号接收进程ID
// 创建写线程
if (pthread_create(&tid, NULL, writer_thread, NULL) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
// 设置poll监视的文件描述符
fds[0].fd = pipe_fd[0]; // 监视管道的读端
fds[0].events = POLLIN; // 监视是否有数据可读
printf("[PID: %d, TID: %lu] Waiting for data...\n", getpid(), pthread_self());
// 在 poll 调用前后添加时间统计
struct timeval start_time, end_time;
gettimeofday(&start_time, NULL); // 记录 poll 开始时间
ret = poll(fds, 1, POLL_TIMEOUT_SEC * 1000); // 调用 poll
poll_errno = errno;
gettimeofday(&end_time, NULL); // 记录 poll 结束时间
// 计算 poll 的总耗时(单位:毫秒)
long poll_duration_ms = (end_time.tv_sec - start_time.tv_sec) * 1000 +
(end_time.tv_usec - start_time.tv_usec) / 1000;
if (abs((int)poll_duration_ms -
min(POLL_TIMEOUT_SEC, WRITE_WAIT_SEC) * 1000) >= POLL_DELTA_MS) {
printf("Poll duration: %ld ms, expected: %d ms, errno: %s\n",
poll_duration_ms, POLL_TIMEOUT_SEC * 1000, strerror(poll_errno));
test_passed = 0; // 测试失败(如果 poll 耗时与预期相差较大,认为测试未通过)
}
if (test_passed == 0) {
} else if (ret == -1) {
printf("poll errno: %s\n", strerror(poll_errno));
test_passed = 0; // 测试失败
} else if (ret == 0) {
printf("Timeout! No data available.\n");
test_passed = 0; // 测试失败
} else {
if (fds[0].revents & POLLIN) {
char buffer[1024];
ssize_t count = read(pipe_fd[0], buffer, sizeof(buffer)); // 读取数据
if (count > 0) {
printf("Data received: %s", buffer);
// 检查读取的数据是否与预期一致
if (strcmp(buffer, EXPECTED_MESSAGE) != 0) {
printf("Unexpected data received.\n");
test_passed = 0; // 测试失败
}
} else {
printf("No data read from pipe.\n");
test_passed = 0; // 测试失败
}
} else {
printf("Unexpected event on pipe.\n");
test_passed = 0; // 测试失败
}
}
child_can_exit = 1; // 允许子进程退出
// 等待写线程结束
pthread_join(tid, NULL);
close(pipe_fd[0]); // 关闭读端
if (test_passed) {
printf("Test passed!\n");
} else {
printf("Test failed!\n");
}
printf("Program finished.\n");
return test_passed ? 0 : 1; // 返回0表示测试通过返回1表示测试失败
}

View File

@ -0,0 +1,46 @@
# 用户程序名称
name = "test_poll"
# 版本号
version = "0.1.0"
# 用户程序描述信息
description = "test_poll"
# (可选)默认: false 是否只构建一次如果为trueDADK会在构建成功后将构建结果缓存起来下次构建时直接使用缓存的构建结果
build-once = false
# (可选) 默认: false 是否只安装一次如果为trueDADK会在安装成功后不再重复安装
install-once = false
# 目标架构
# 可选值:"x86_64", "aarch64", "riscv64"
target-arch = ["x86_64"]
# 任务源
[task-source]
# 构建类型
# 可选值:"build-from_source", "install-from-prebuilt"
type = "build-from-source"
# 构建来源
# "build_from_source" 可选值:"git", "local", "archive"
# "install_from_prebuilt" 可选值:"local", "archive"
source = "local"
# 路径或URL
source-path = "user/apps/test_poll"
# 构建相关信息
[build]
# (可选)构建命令
build-command = "make install"
# 安装相关信息
[install]
# 可选安装到DragonOS的路径
in-dragonos-path = "/bin"
# 清除相关信息
[clean]
# (可选)清除命令
clean-command = "make clean"
# (可选)依赖项
# 注意:如果没有依赖项,忽略此项,不允许只留一个[[depends]]
# [[depends]]
# name = "depend1"
# version = "0.1.1"
# (可选)环境变量
# 注意:如果没有环境变量,忽略此项,不允许只留一个[[envs]]
# [[envs]]
# key = "PATH"
# value = "/usr/bin"