增加epoll机制 (#455)

* ## 增加epoll机制
- 增加epoll机制
- 添加事件等待队列,提升socket性能
- 优化poll,删除不能poll的文件系统中的poll方法

* 添加细节注释

* 修复文件关闭后epoll还持有对应描述符的文件弱引用的bug

* 将EPollEvent设计为POSIX标准

* 修改s到us转换的计算错误
This commit is contained in:
GnoCiYeH 2023-12-25 18:08:12 +08:00 committed by GitHub
parent 070e991008
commit 406099704e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1667 additions and 228 deletions

View File

@ -4,7 +4,7 @@ use crate::filesystem::vfs::file::FileMode;
use crate::filesystem::vfs::syscall::ModeType;
use crate::filesystem::vfs::{
core::generate_inode_id, make_rawdev, FilePrivateData, FileSystem, FileType, IndexNode,
Metadata, PollStatus,
Metadata,
};
use crate::syscall::SystemError;
use crate::{libs::spinlock::SpinLock, time::TimeSpec};
@ -108,10 +108,6 @@ impl IndexNode for LockedAhciInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// 读设备 - 应该调用设备的函数读写,而不是通过文件系统读写
fn read_at(
&self,

View File

@ -8,7 +8,7 @@ use crate::{
devfs::{devfs_register, DevFS, DeviceINode},
vfs::{
core::generate_inode_id, file::FileMode, syscall::ModeType, FileType, IndexNode,
Metadata, PollStatus,
Metadata,
},
},
include::bindings::bindings::vfs_file_operations_t,
@ -152,10 +152,6 @@ impl IndexNode for LockedPS2KeyBoardInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ);
}
fn metadata(&self) -> Result<Metadata, SystemError> {
return Ok(self.0.read().metadata.clone());
}

View File

@ -165,7 +165,7 @@ impl<T: Transport> phy::Device for VirtioNICDriver<T> {
fn transmit(&mut self, _timestamp: smoltcp::time::Instant) -> Option<Self::TxToken<'_>> {
// kdebug!("VirtioNet: transmit");
if self.inner.lock().can_send() {
if self.inner.lock_irqsave().can_send() {
// kdebug!("VirtioNet: can send");
return Some(VirtioNetToken::new(self.clone(), None));
} else {

View File

@ -214,10 +214,6 @@ impl IndexNode for TtyDevice {
return Err(SystemError::EIO);
}
fn poll(&self) -> Result<crate::filesystem::vfs::PollStatus, SystemError> {
return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
}
fn fs(&self) -> Arc<dyn crate::filesystem::vfs::FileSystem> {
return self.fs.read().upgrade().unwrap();
}

View File

@ -6,7 +6,7 @@ use super::vfs::{
core::{generate_inode_id, ROOT_INODE},
file::FileMode,
syscall::ModeType,
FileSystem, FileType, FsInfo, IndexNode, Metadata, PollStatus,
FileSystem, FileType, FsInfo, IndexNode, Metadata,
};
use crate::{
kerror, kinfo,
@ -489,18 +489,6 @@ impl IndexNode for LockedDevFSInode {
return Ok(());
}
fn poll(&self) -> Result<super::vfs::PollStatus, SystemError> {
// 加锁
let inode: SpinLockGuard<DevFSInode> = self.0.lock();
// 检查当前inode是否为一个文件夹如果是的话就返回错误
if inode.metadata.file_type == FileType::Dir {
return Err(SystemError::EISDIR);
}
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// 读设备 - 应该调用设备的函数读写,而不是通过文件系统读写
fn read_at(
&self,

View File

@ -2,7 +2,7 @@ use crate::filesystem::vfs::file::FileMode;
use crate::filesystem::vfs::make_rawdev;
use crate::filesystem::vfs::syscall::ModeType;
use crate::filesystem::vfs::{
core::generate_inode_id, FilePrivateData, FileSystem, FileType, IndexNode, Metadata, PollStatus,
core::generate_inode_id, FilePrivateData, FileSystem, FileType, IndexNode, Metadata,
};
use crate::{libs::spinlock::SpinLock, syscall::SystemError, time::TimeSpec};
use alloc::{
@ -102,10 +102,6 @@ impl IndexNode for LockedNullInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// 读设备 - 应该调用设备的函数读写,而不是通过文件系统读写
fn read_at(
&self,

View File

@ -2,7 +2,7 @@ use crate::filesystem::vfs::file::FileMode;
use crate::filesystem::vfs::make_rawdev;
use crate::filesystem::vfs::syscall::ModeType;
use crate::filesystem::vfs::{
core::generate_inode_id, FilePrivateData, FileSystem, FileType, IndexNode, Metadata, PollStatus,
core::generate_inode_id, FilePrivateData, FileSystem, FileType, IndexNode, Metadata,
};
use crate::{libs::spinlock::SpinLock, syscall::SystemError, time::TimeSpec};
use alloc::{
@ -102,10 +102,6 @@ impl IndexNode for LockedZeroInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// 读设备 - 应该调用设备的函数读写,而不是通过文件系统读写
fn read_at(
&self,

View File

@ -17,7 +17,7 @@ use crate::{
core::generate_inode_id,
file::{FileMode, FilePrivateData},
syscall::ModeType,
FileSystem, FileType, IndexNode, InodeId, Metadata, PollStatus,
FileSystem, FileType, IndexNode, InodeId, Metadata,
},
kerror,
libs::{
@ -1415,18 +1415,6 @@ impl IndexNode for LockedFATInode {
}
}
fn poll(&self) -> Result<PollStatus, SystemError> {
// 加锁
let inode: SpinLockGuard<FATInode> = self.0.lock();
// 检查当前inode是否为一个文件夹如果是的话就返回错误
if inode.metadata.file_type == FileType::Dir {
return Err(SystemError::EISDIR);
}
return Ok(PollStatus::READ | PollStatus::WRITE);
}
fn create(
&self,
name: &str,

View File

@ -21,7 +21,7 @@ use self::callback::{KernCallbackData, KernFSCallback, KernInodePrivateData};
use super::vfs::{
core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
FileType, FsInfo, IndexNode, InodeId, Metadata, PollStatus,
FileType, FsInfo, IndexNode, InodeId, Metadata,
};
pub mod callback;
@ -293,11 +293,6 @@ impl IndexNode for KernFSInode {
return Ok(keys);
}
fn poll(&self) -> Result<PollStatus, SystemError> {
// todo: 根据inode的具体attribute返回PollStatus
return Ok(PollStatus::READ | PollStatus::WRITE);
}
fn read_at(
&self,
offset: usize,

View File

@ -29,7 +29,7 @@ use crate::{
use super::vfs::{
file::{FileMode, FilePrivateData},
syscall::ModeType,
FileSystem, FsInfo, IndexNode, InodeId, Metadata, PollStatus,
FileSystem, FsInfo, IndexNode, InodeId, Metadata,
};
/// @brief 进程文件类型
@ -485,18 +485,6 @@ impl IndexNode for LockedProcFSInode {
return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
}
fn poll(&self) -> Result<PollStatus, SystemError> {
// 加锁
let inode: SpinLockGuard<ProcFSInode> = self.0.lock();
// 检查当前inode是否为一个文件夹如果是的话就返回错误
if inode.metadata.file_type == FileType::Dir {
return Err(SystemError::EISDIR);
}
return Ok(PollStatus::READ);
}
fn fs(&self) -> Arc<dyn FileSystem> {
return self.0.lock().fs.upgrade().unwrap();
}

View File

@ -18,7 +18,7 @@ use crate::{
use super::vfs::{
file::FilePrivateData, syscall::ModeType, FileSystem, FsInfo, IndexNode, InodeId, Metadata,
PollStatus, SpecialNodeData,
SpecialNodeData,
};
/// RamFS的inode名称的最大长度
@ -210,18 +210,6 @@ impl IndexNode for LockedRamFSInode {
return Ok(len);
}
fn poll(&self) -> Result<PollStatus, SystemError> {
// 加锁
let inode: SpinLockGuard<RamFSInode> = self.0.lock();
// 检查当前inode是否为一个文件夹如果是的话就返回错误
if inode.metadata.file_type == FileType::Dir {
return Err(SystemError::EISDIR);
}
return Ok(PollStatus::READ | PollStatus::WRITE);
}
fn fs(&self) -> Arc<dyn FileSystem> {
return self.0.lock().fs.upgrade().unwrap();
}

View File

@ -1,4 +1,8 @@
use alloc::{string::String, sync::Arc, vec::Vec};
use alloc::{
string::String,
sync::{Arc, Weak},
vec::Vec,
};
use crate::{
driver::{
@ -9,6 +13,10 @@ use crate::{
ipc::pipe::PipeFsPrivateData,
kerror,
libs::spinlock::SpinLock,
net::{
event_poll::{EPollItem, EPollPrivateData, EventPoll},
socket::SocketInode,
},
process::ProcessManager,
syscall::SystemError,
};
@ -27,6 +35,8 @@ pub enum FilePrivateData {
DevFS(DevicePrivateData),
/// tty设备文件的私有信息
Tty(TtyFilePrivateData),
/// epoll私有信息
EPoll(EPollPrivateData),
/// 不需要文件私有信息
Unused,
}
@ -396,6 +406,38 @@ impl File {
self.inode.resize(len)?;
return Ok(());
}
/// ## 向该文件添加一个EPollItem对象
///
/// 在文件状态发生变化时需要向epoll通知
pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
match self.file_type {
FileType::Socket => {
let inode = self.inode.downcast_ref::<SocketInode>().unwrap();
let mut socket = inode.inner();
return socket.add_epoll(epitem);
}
_ => return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP),
}
}
/// ## 删除一个绑定的epoll
pub fn remove_epoll(&mut self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
match self.file_type {
FileType::Socket => {
let inode = self.inode.downcast_ref::<SocketInode>().unwrap();
let mut socket = inode.inner();
return socket.remove_epoll(epoll);
}
_ => return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP),
}
}
pub fn poll(&self) -> Result<usize, SystemError> {
self.inode.poll()
}
}
impl Drop for File {

View File

@ -166,7 +166,10 @@ pub trait IndexNode: Any + Sync + Send + Debug {
/// @brief 获取当前inode的状态。
///
/// @return PollStatus结构体
fn poll(&self) -> Result<PollStatus, SystemError>;
fn poll(&self) -> Result<usize, SystemError> {
// 若文件系统没有实现此方法,则返回“不支持”
return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
}
/// @brief 获取inode的元数据
///

View File

@ -180,11 +180,6 @@ impl IndexNode for MountFSInode {
return self.inner_inode.write_at(offset, len, buf, data);
}
#[inline]
fn poll(&self) -> Result<super::PollStatus, SystemError> {
return self.inner_inode.poll();
}
#[inline]
fn fs(&self) -> Arc<dyn FileSystem> {
return self.mount_fs.clone();

View File

@ -3,7 +3,7 @@ use crate::{
exception::InterruptArch,
filesystem::vfs::{
core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
FileType, IndexNode, Metadata, PollStatus,
FileType, IndexNode, Metadata,
},
libs::{spinlock::SpinLock, wait_queue::WaitQueue},
process::ProcessState,
@ -310,10 +310,6 @@ impl IndexNode for LockedPipeInode {
return Ok(len);
}
fn poll(&self) -> Result<PollStatus, crate::syscall::SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
fn as_any_ref(&self) -> &dyn core::any::Any {
self
}

View File

@ -343,6 +343,9 @@ pub struct RBTree<K: Ord, V> {
len: usize,
}
unsafe impl<K: Ord, V> Send for RBTree<K, V> {}
unsafe impl<K: Ord, V> Sync for RBTree<K, V> {}
// Drop all owned pointers if the tree is dropped
impl<K: Ord, V> Drop for RBTree<K, V> {
#[inline]

View File

@ -28,20 +28,9 @@ pub struct WaitQueue(SpinLock<InnerWaitQueue>);
impl WaitQueue {
pub const INIT: WaitQueue = WaitQueue(SpinLock::new(InnerWaitQueue::INIT));
fn before_sleep_check(&self, max_preempt: usize) {
let pcb = ProcessManager::current_pcb();
if unlikely(pcb.preempt_count() > max_preempt) {
kwarn!(
"Process {:?}: Try to sleep when preempt count is {}",
pcb.pid(),
pcb.preempt_count()
);
}
}
/// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断
pub fn sleep(&self) {
self.before_sleep_check(0);
before_sleep_check(0);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock_irqsave();
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
panic!("sleep error: {:?}", e);
@ -56,7 +45,7 @@ impl WaitQueue {
where
F: FnOnce(),
{
self.before_sleep_check(0);
before_sleep_check(0);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
@ -85,7 +74,7 @@ impl WaitQueue {
/// 由于sleep_without_schedule不会调用调度函数因此如果开发者忘记在执行本函数之后手动调用调度函数
/// 由于时钟中断到来或者其他cpu kick了当前cpu可能会导致一些未定义的行为。
pub unsafe fn sleep_without_schedule(&self) {
self.before_sleep_check(0);
before_sleep_check(1);
// 安全检查:确保当前处于中断禁止状态
assert!(CurrentIrqArch::is_irq_enabled() == false);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
@ -97,7 +86,7 @@ impl WaitQueue {
}
pub unsafe fn sleep_without_schedule_uninterruptible(&self) {
self.before_sleep_check(0);
before_sleep_check(0);
// 安全检查:确保当前处于中断禁止状态
assert!(CurrentIrqArch::is_irq_enabled() == false);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
@ -109,7 +98,7 @@ impl WaitQueue {
}
/// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断
pub fn sleep_uninterruptible(&self) {
self.before_sleep_check(0);
before_sleep_check(0);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
@ -124,7 +113,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断。
/// 在当前进程的pcb加入队列后解锁指定的自旋锁。
pub fn sleep_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>) {
self.before_sleep_check(1);
before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
@ -140,7 +129,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断。
/// 在当前进程的pcb加入队列后解锁指定的Mutex。
pub fn sleep_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>) {
self.before_sleep_check(1);
before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
@ -156,7 +145,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断。
/// 在当前进程的pcb加入队列后解锁指定的自旋锁。
pub fn sleep_uninterruptible_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>) {
self.before_sleep_check(1);
before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
@ -172,7 +161,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断。
/// 在当前进程的pcb加入队列后解锁指定的Mutex。
pub fn sleep_uninterruptible_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>) {
self.before_sleep_check(1);
before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
@ -259,3 +248,121 @@ impl InnerWaitQueue {
wait_list: LinkedList::new(),
};
}
fn before_sleep_check(max_preempt: usize) {
let pcb = ProcessManager::current_pcb();
if unlikely(pcb.preempt_count() > max_preempt) {
kwarn!(
"Process {:?}: Try to sleep when preempt count is {}",
pcb.pid(),
pcb.preempt_count()
);
}
}
/// 事件等待队列
#[derive(Debug)]
pub struct EventWaitQueue {
wait_list: SpinLock<Vec<(u64, Arc<ProcessControlBlock>)>>,
}
impl EventWaitQueue {
pub fn new() -> Self {
Self {
wait_list: SpinLock::new(Vec::new()),
}
}
/// ## 让当前进程在该队列上等待感兴趣的事件
///
/// ### 参数
/// - events: 进程感兴趣的事件events最好是为位表示一位表示一个事件
///
/// 注意,使用前应该注意有可能其他地方定义了冲突的事件,可能会导致未定义行为
pub fn sleep(&self, events: u64) {
before_sleep_check(0);
let mut guard = self.wait_list.lock_irqsave();
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
panic!("sleep error: {:?}", e);
});
guard.push((events, ProcessManager::current_pcb()));
drop(guard);
sched();
}
pub unsafe fn sleep_without_schedule(&self, events: u64) {
before_sleep_check(1);
let mut guard = self.wait_list.lock_irqsave();
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
panic!("sleep error: {:?}", e);
});
guard.push((events, ProcessManager::current_pcb()));
drop(guard);
}
pub fn sleep_unlock_spinlock<T>(&self, events: u64, to_unlock: SpinLockGuard<T>) {
before_sleep_check(1);
let mut guard = self.wait_list.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
panic!("sleep error: {:?}", e);
});
drop(irq_guard);
guard.push((events, ProcessManager::current_pcb()));
drop(to_unlock);
drop(guard);
sched();
}
/// ### 唤醒该队列上等待events的进程
///
/// ### 参数
/// - events: 发生的事件
///
/// 需要注意的是只要触发了events中的任意一件事件进程都会被唤醒
pub fn wakeup_any(&self, events: u64) -> usize {
let mut ret = 0;
let _ = self.wait_list.lock().extract_if(|(es, pcb)| {
if *es & events > 0 {
// 有感兴趣的事件
if ProcessManager::wakeup(pcb).is_ok() {
ret += 1;
return true;
} else {
return false;
}
} else {
return false;
}
});
ret
}
/// ### 唤醒该队列上等待events的进程
///
/// ### 参数
/// - events: 发生的事件
///
/// 需要注意的是,只有满足所有事件的进程才会被唤醒
pub fn wakeup(&self, events: u64) -> usize {
let mut ret = 0;
let _ = self.wait_list.lock().extract_if(|(es, pcb)| {
if *es == events {
// 有感兴趣的事件
if ProcessManager::wakeup(pcb).is_ok() {
ret += 1;
return true;
} else {
return false;
}
} else {
return false;
}
});
ret
}
pub fn wakeup_all(&self) {
self.wakeup_any(u64::MAX);
}
}

View File

@ -0,0 +1,824 @@
use core::{
fmt::Debug,
sync::atomic::{AtomicBool, Ordering},
};
use alloc::{
collections::LinkedList,
sync::{Arc, Weak},
vec::Vec,
};
use crate::{
arch::sched::sched,
filesystem::vfs::{
file::{File, FileMode},
FilePrivateData, IndexNode, Metadata,
},
include::bindings::bindings::INT32_MAX,
libs::{
rbtree::RBTree,
rwlock::RwLock,
spinlock::{SpinLock, SpinLockGuard},
wait_queue::WaitQueue,
},
process::ProcessManager,
syscall::SystemError,
time::{
timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper},
TimeSpec,
},
};
pub mod syscall;
#[derive(Debug, Clone)]
pub struct LockedEventPoll(Arc<SpinLock<EventPoll>>);
/// 内核的Epoll对象结构体当用户创建一个Epoll时内核就会创建一个该类型对象
/// 它对应一个epfd
#[derive(Debug)]
pub struct EventPoll {
/// epoll_wait用到的等待队列
epoll_wq: WaitQueue,
/// 维护所有添加进来的socket的红黑树
ep_items: RBTree<i32, Arc<EPollItem>>,
/// 接收就绪的描述符列表
ready_list: LinkedList<Arc<EPollItem>>,
/// 是否已经关闭
shutdown: AtomicBool,
self_ref: Option<Weak<SpinLock<EventPoll>>>,
}
impl EventPoll {
pub const EP_MAX_EVENTS: u32 = INT32_MAX / (core::mem::size_of::<EPollEvent>() as u32);
pub fn new() -> Self {
Self {
epoll_wq: WaitQueue::INIT,
ep_items: RBTree::new(),
ready_list: LinkedList::new(),
shutdown: AtomicBool::new(false),
self_ref: None,
}
}
}
/// EpollItem表示的是Epoll所真正管理的对象
/// 每当用户向Epoll添加描述符时都会注册一个新的EpollItemEpollItem携带了一些被监听的描述符的必要信息
#[derive(Debug)]
pub struct EPollItem {
/// 对应的Epoll
epoll: Weak<SpinLock<EventPoll>>,
/// 用户注册的事件
event: RwLock<EPollEvent>,
/// 监听的描述符
fd: i32,
/// 对应的文件
file: Weak<SpinLock<File>>,
}
impl EPollItem {
pub fn new(
epoll: Weak<SpinLock<EventPoll>>,
events: EPollEvent,
fd: i32,
file: Weak<SpinLock<File>>,
) -> Self {
Self {
epoll,
event: RwLock::new(events),
fd,
file,
}
}
pub fn epoll(&self) -> Weak<SpinLock<EventPoll>> {
self.epoll.clone()
}
pub fn event(&self) -> &RwLock<EPollEvent> {
&self.event
}
pub fn file(&self) -> Weak<SpinLock<File>> {
self.file.clone()
}
pub fn fd(&self) -> i32 {
self.fd
}
/// ## 通过epoll_item来执行绑定文件的poll方法并获取到感兴趣的事件
fn ep_item_poll(&self) -> EPollEventType {
let file = self.file.upgrade();
if file.is_none() {
return EPollEventType::empty();
}
if let Ok(events) = file.unwrap().lock_irqsave().poll() {
let events = events as u32 & self.event.read().events;
return EPollEventType::from_bits_truncate(events);
}
return EPollEventType::empty();
}
}
/// ### Epoll文件的私有信息
#[derive(Debug, Clone)]
pub struct EPollPrivateData {
epoll: LockedEventPoll,
}
/// ### 该结构体将Epoll加入文件系统
#[derive(Debug)]
pub struct EPollInode {
epoll: LockedEventPoll,
}
impl EPollInode {
pub fn new(epoll: LockedEventPoll) -> Arc<Self> {
Arc::new(Self { epoll })
}
}
impl IndexNode for EPollInode {
fn read_at(
&self,
_offset: usize,
_len: usize,
_buf: &mut [u8],
_data: &mut crate::filesystem::vfs::FilePrivateData,
) -> Result<usize, crate::syscall::SystemError> {
Err(SystemError::ENOSYS)
}
fn write_at(
&self,
_offset: usize,
_len: usize,
_buf: &[u8],
_data: &mut crate::filesystem::vfs::FilePrivateData,
) -> Result<usize, crate::syscall::SystemError> {
Err(SystemError::ENOSYS)
}
fn poll(&self) -> Result<usize, crate::syscall::SystemError> {
// 需要实现epoll嵌套epoll时需要实现这里
todo!()
}
fn fs(&self) -> Arc<dyn crate::filesystem::vfs::FileSystem> {
todo!()
}
fn as_any_ref(&self) -> &dyn core::any::Any {
self
}
fn list(&self) -> Result<Vec<alloc::string::String>, crate::syscall::SystemError> {
Err(SystemError::ENOSYS)
}
fn metadata(&self) -> Result<Metadata, SystemError> {
Ok(Metadata::default())
}
fn close(&self, _data: &mut FilePrivateData) -> Result<(), SystemError> {
// 释放资源
let mut epoll = self.epoll.0.lock_irqsave();
// 唤醒epoll上面等待的所有进程
epoll.shutdown.store(true, Ordering::SeqCst);
epoll.ep_wake_all();
let fds = epoll.ep_items.keys().cloned().collect::<Vec<_>>();
// 清理红黑树里面的epitems
for fd in fds {
let file = ProcessManager::current_pcb()
.fd_table()
.read()
.get_file_by_fd(fd);
if file.is_some() {
file.unwrap()
.lock_irqsave()
.remove_epoll(&Arc::downgrade(&self.epoll.0))?;
}
epoll.ep_items.remove(&fd);
}
Ok(())
}
fn open(&self, _data: &mut FilePrivateData, _mode: &FileMode) -> Result<(), SystemError> {
Ok(())
}
}
impl EventPoll {
/// ## 创建epoll对象
///
/// ### 参数
/// - flags: 创建的epoll文件的FileMode
///
/// ### 返回值
/// - 成功则返回Ok(fd)否则返回Err
pub fn do_create_epoll(flags: FileMode) -> Result<usize, SystemError> {
if !flags.difference(FileMode::O_CLOEXEC).is_empty() {
return Err(SystemError::EINVAL);
}
// 创建epoll
let epoll = LockedEventPoll(Arc::new(SpinLock::new(EventPoll::new())));
epoll.0.lock_irqsave().self_ref = Some(Arc::downgrade(&epoll.0));
// 创建epoll的inode对象
let epoll_inode = EPollInode::new(epoll.clone());
let mut ep_file = File::new(
epoll_inode,
FileMode::O_RDWR | (flags & FileMode::O_CLOEXEC),
)?;
// 设置ep_file的FilePrivateData
ep_file.private_data = FilePrivateData::EPoll(EPollPrivateData { epoll });
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let mut fd_table_guard = fd_table.write();
let fd = fd_table_guard.alloc_fd(ep_file, None)?;
Ok(fd as usize)
}
/// ## epoll_ctl的具体实现
///
/// 根据不同的op对epoll文件进行增删改
///
/// ### 参数
/// - epfd: 操作的epoll文件描述符
/// - op: 对应的操作
/// - fd: 操作对应的文件描述符
/// - epds: 从用户态传入的event若op为EpollCtlAdd则对应注册的监听事件若op为EPollCtlMod则对应更新的事件删除操作不涉及此字段
/// - nonblock: 定义这次操作是否为非阻塞有可能其他地方占有EPoll的锁
pub fn do_epoll_ctl(
epfd: i32,
op: EPollCtlOption,
fd: i32,
epds: &mut EPollEvent,
nonblock: bool,
) -> Result<usize, SystemError> {
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let fd_table_guard = fd_table.read();
// 获取epoll和对应fd指向的文件
let ep_file = fd_table_guard
.get_file_by_fd(epfd)
.ok_or(SystemError::EBADF)?;
let dst_file = fd_table_guard
.get_file_by_fd(fd)
.ok_or(SystemError::EBADF)?;
// 检查是否允许 EPOLLWAKEUP
if op != EPollCtlOption::EpollCtlDel {
epds.events &= !EPollEventType::EPOLLWAKEUP.bits();
}
let events = EPollEventType::from_bits_truncate(epds.events);
// 检查获取到的两个文件的正确性
// 首先是不能自己嵌套自己
// 然后ep_file必须是epoll文件
if Arc::ptr_eq(&ep_file, &dst_file) || !Self::is_epoll_file(&ep_file) {
return Err(SystemError::EINVAL);
}
if op != EPollCtlOption::EpollCtlDel && events.contains(EPollEventType::EPOLLEXCLUSIVE) {
// epoll独占模式下不允许EpollCtlMod
if op == EPollCtlOption::EpollCtlMod {
return Err(SystemError::EINVAL);
}
// 不支持嵌套的独占唤醒
if op == EPollCtlOption::EpollCtlAdd && Self::is_epoll_file(&dst_file)
|| !events
.difference(EPollEventType::EPOLLEXCLUSIVE_OK_BITS)
.is_empty()
{
return Err(SystemError::EINVAL);
}
}
// 从FilePrivateData获取到epoll
if let FilePrivateData::EPoll(epoll_data) = &ep_file.lock_irqsave().private_data {
let mut epoll_guard = {
if nonblock {
// 如果设置非阻塞,则尝试获取一次锁
if let Ok(guard) = epoll_data.epoll.0.try_lock_irqsave() {
guard
} else {
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
} else {
epoll_data.epoll.0.lock_irqsave()
}
};
if op == EPollCtlOption::EpollCtlAdd {
// TODO: 循环检查是否为epoll嵌套epoll的情况如果是则需要检测其深度
// 这里是需要一种检测算法的但是目前未考虑epoll嵌套epoll的情况所以暂时未实现
// Linux算法https://code.dragonos.org.cn/xref/linux-6.1.9/fs/eventpoll.c?r=&mo=56953&fi=2057#2133
if Self::is_epoll_file(&dst_file) {
todo!();
}
}
let ep_item = epoll_guard.ep_items.get(&fd);
match op {
EPollCtlOption::EpollCtlAdd => {
// 如果已经存在,则返回错误
if ep_item.is_some() {
return Err(SystemError::EEXIST);
}
// 设置epoll
let epitem = Arc::new(EPollItem::new(
Arc::downgrade(&epoll_data.epoll.0),
*epds,
fd,
Arc::downgrade(&dst_file),
));
Self::ep_insert(&mut epoll_guard, dst_file, epitem)?;
}
EPollCtlOption::EpollCtlDel => {
// 不存在则返回错误
if ep_item.is_none() {
return Err(SystemError::ENOENT);
}
// 删除
Self::ep_remove(&mut epoll_guard, fd, Some(dst_file))?;
}
EPollCtlOption::EpollCtlMod => {
// 不存在则返回错误
if ep_item.is_none() {
return Err(SystemError::ENOENT);
}
let ep_item = ep_item.unwrap().clone();
if ep_item.event.read().events & EPollEventType::EPOLLEXCLUSIVE.bits() != 0 {
epds.events |=
EPollEventType::EPOLLERR.bits() | EPollEventType::EPOLLHUP.bits();
Self::ep_modify(&mut epoll_guard, ep_item, &epds)?;
}
}
}
}
Ok(0)
}
/// ## epoll_wait的具体实现
pub fn do_epoll_wait(
epfd: i32,
epoll_event: &mut [EPollEvent],
max_events: i32,
timespec: Option<TimeSpec>,
) -> Result<usize, SystemError> {
let current_pcb = ProcessManager::current_pcb();
let fd_table = current_pcb.fd_table();
let fd_table_guard = fd_table.read();
// 获取epoll文件
let ep_file = fd_table_guard
.get_file_by_fd(epfd)
.ok_or(SystemError::EBADF)?;
drop(fd_table_guard);
// 确保是epoll file
if !Self::is_epoll_file(&ep_file) {
return Err(SystemError::EINVAL);
}
// 从epoll文件获取到epoll
let mut epolldata = None;
if let FilePrivateData::EPoll(epoll_data) = &ep_file.lock_irqsave().private_data {
epolldata = Some(epoll_data.clone())
}
if epolldata.is_some() {
let epoll_data = epolldata.unwrap();
let epoll = epoll_data.epoll.clone();
let epoll_guard = epoll.0.lock_irqsave();
let mut timeout = false;
if timespec.is_some() {
let timespec = timespec.unwrap();
if !(timespec.tv_sec > 0 || timespec.tv_nsec > 0) {
// 非阻塞情况
timeout = true;
}
}
// 判断epoll上有没有就绪事件
let mut available = epoll_guard.ep_events_available();
drop(epoll_guard);
loop {
if available {
// 如果有就绪的事件,则直接返回就绪事件
return Self::ep_send_events(epoll.clone(), epoll_event, max_events);
}
if epoll.0.lock_irqsave().shutdown.load(Ordering::SeqCst) {
// 如果已经关闭
return Err(SystemError::EBADF);
}
// 如果超时
if timeout {
return Ok(0);
}
// 自旋等待一段时间
available = {
let mut ret = false;
for _ in 0..50 {
if let Ok(guard) = epoll.0.try_lock_irqsave() {
if guard.ep_events_available() {
ret = true;
break;
}
}
}
// 最后再次不使用try_lock尝试
if !ret {
ret = epoll.0.lock_irqsave().ep_events_available();
}
ret
};
if available {
continue;
}
// 如果有未处理的信号则返回错误
if current_pcb.sig_info().sig_pending().signal().bits() != 0 {
return Err(SystemError::EINTR);
}
// 还未等待到事件发生,则睡眠
// 注册定时器
let mut timer = None;
if timespec.is_some() {
let timespec = timespec.unwrap();
let handle = WakeUpHelper::new(current_pcb.clone());
let jiffies = next_n_us_timer_jiffies(
(timespec.tv_sec * 1000000 + timespec.tv_nsec / 1000) as u64,
);
let inner = Timer::new(handle, jiffies);
inner.activate();
timer = Some(inner);
}
let guard = epoll.0.lock_irqsave();
unsafe { guard.epoll_wq.sleep_without_schedule() };
drop(guard);
sched();
// 被唤醒后,检查是否有事件可读
available = epoll.0.lock_irqsave().ep_events_available();
if timer.is_some() {
if timer.as_ref().unwrap().timeout() {
// 超时
timeout = true;
} else {
// 未超时,则取消计时器
timer.unwrap().cancel();
}
}
}
} else {
panic!("An epoll file does not have the corresponding private information");
}
}
/// ## 将已经准备好的事件拷贝到用户空间
///
/// ### 参数
/// - epoll: 对应的epoll
/// - user_event: 用户空间传入的epoll_event地址因为内存对其问题所以这里需要直接操作地址
/// - max_events: 处理的最大事件数量
fn ep_send_events(
epoll: LockedEventPoll,
user_event: &mut [EPollEvent],
max_events: i32,
) -> Result<usize, SystemError> {
let mut ep_guard = epoll.0.lock_irqsave();
let mut res: usize = 0;
// 在水平触发模式下需要将epitem再次加入队列在下次循环再次判断是否还有事件
// (所以边缘触发的效率会高于水平触发,但是水平触发某些情况下能够使得更迅速地向用户反馈)
let mut push_back = Vec::new();
while let Some(epitem) = ep_guard.ready_list.pop_front() {
if res >= max_events as usize {
push_back.push(epitem);
break;
}
let ep_events = EPollEventType::from_bits_truncate(epitem.event.read().events);
// 再次poll获取事件(为了防止水平触发一直加入队列)
let revents = epitem.ep_item_poll();
if revents.is_empty() {
continue;
}
// 构建触发事件结构体
let event = EPollEvent {
events: revents.bits,
data: epitem.event.read().data,
};
// 这里是需要判断下一个写入的位置是否为空指针
// TODO:这里有可能会出现事件丢失的情况
// 如果用户传入的数组长度小于传入的max_event到这里时如果已经到数组最大长度但是未到max_event
// 会出现的问题是我们会把这个数据写入到后面的内存中,用户无法在传入的数组中拿到事件,而且写脏数据到了后面一片内存,导致事件丢失
// 出现这个问题的几率比较小,首先是因为用户的使用不规范,后因为前面地址校验是按照max_event来校验的只会在两块内存连着分配时出现但是也是需要考虑的
// 以下的写法判断并无意义,只是记一下错误处理
// offset += core::mem::size_of::<EPollEvent>();
// if offset >= max_offset {
// // 当前指向的地址已为空则把epitem放回队列
// ep_guard.ready_list.push_back(epitem.clone());
// if res == 0 {
// // 一个都未写入成功,表明用户传进的地址就是有问题的
// return Err(SystemError::EFAULT);
// }
// }
// 拷贝到用户空间
user_event[res] = event;
// 记数加一
res += 1;
// crate::kdebug!("ep send {event:?}");
if ep_events.contains(EPollEventType::EPOLLONESHOT) {
let mut event_writer = epitem.event.write();
let new_event = event_writer.events & EPollEventType::EP_PRIVATE_BITS.bits;
event_writer.set_events(new_event);
} else if !ep_events.contains(EPollEventType::EPOLLET) {
push_back.push(epitem);
}
}
for item in push_back {
ep_guard.ep_add_ready(item);
}
Ok(res)
}
// ### 查看文件是否为epoll文件
fn is_epoll_file(file: &Arc<SpinLock<File>>) -> bool {
if let FilePrivateData::EPoll(_) = file.lock_irqsave().private_data {
return true;
}
return false;
}
fn ep_insert(
epoll_guard: &mut SpinLockGuard<EventPoll>,
dst_file: Arc<SpinLock<File>>,
epitem: Arc<EPollItem>,
) -> Result<(), SystemError> {
if Self::is_epoll_file(&dst_file) {
return Err(SystemError::ENOSYS);
// TODO现在的实现先不考虑嵌套其它类型的文件(暂时只针对socket),这里的嵌套指epoll/select/poll
}
let test_poll = dst_file.lock_irqsave().poll();
if test_poll.is_err() {
if test_poll.unwrap_err() == SystemError::EOPNOTSUPP_OR_ENOTSUP {
// 如果目标文件不支持poll
return Err(SystemError::ENOSYS);
}
}
epoll_guard.ep_items.insert(epitem.fd, epitem.clone());
// 检查文件是否已经有事件发生
let event = epitem.ep_item_poll();
if !event.is_empty() {
// 加入到就绪队列
epoll_guard.ep_add_ready(epitem.clone());
epoll_guard.ep_wake_one();
}
// TODO 嵌套epoll
// 这个标志是用与电源管理相关,暂时不支持
if epitem.event.read().events & EPollEventType::EPOLLWAKEUP.bits() != 0 {
return Err(SystemError::ENOSYS);
}
dst_file.lock_irqsave().add_epoll(epitem.clone())?;
Ok(())
}
pub fn ep_remove(
epoll: &mut SpinLockGuard<EventPoll>,
fd: i32,
dst_file: Option<Arc<SpinLock<File>>>,
) -> Result<(), SystemError> {
if dst_file.is_some() {
let dst_file = dst_file.unwrap();
let mut file_guard = dst_file.lock_irqsave();
file_guard.remove_epoll(epoll.self_ref.as_ref().unwrap())?;
}
let epitem = epoll.ep_items.remove(&fd).unwrap();
let _ = epoll
.ready_list
.extract_if(|item| Arc::ptr_eq(item, &epitem));
Ok(())
}
/// ## 修改已经注册的监听事件
///
/// ### 参数
/// - epoll_guard: EventPoll的锁
/// - epitem: 需要修改的描述符对应的epitem
/// - event: 新的事件
fn ep_modify(
epoll_guard: &mut SpinLockGuard<EventPoll>,
epitem: Arc<EPollItem>,
event: &EPollEvent,
) -> Result<(), SystemError> {
let mut epi_event_guard = epitem.event.write();
// 修改epitem
epi_event_guard.events = event.events;
epi_event_guard.data = event.data;
drop(epi_event_guard);
// 修改后检查文件是否已经有感兴趣事件发生
let event = epitem.ep_item_poll();
if !event.is_empty() {
epoll_guard.ep_add_ready(epitem.clone());
epoll_guard.ep_wake_one();
}
// TODO:处理EPOLLWAKEUP目前不支持
Ok(())
}
/// ### 判断epoll是否有就绪item
pub fn ep_events_available(&self) -> bool {
!self.ready_list.is_empty()
}
/// ### 将epitem加入到就绪队列如果为重复添加则忽略
pub fn ep_add_ready(&mut self, epitem: Arc<EPollItem>) {
let ret = self.ready_list.iter().find(|epi| Arc::ptr_eq(epi, &epitem));
if ret.is_none() {
self.ready_list.push_back(epitem);
}
}
/// ### 判断该epoll上是否有进程在等待
pub fn ep_has_waiter(&self) -> bool {
self.epoll_wq.len() != 0
}
/// ### 唤醒所有在epoll上等待的进程
pub fn ep_wake_all(&self) {
self.epoll_wq.wakeup_all(None);
}
/// ### 唤醒所有在epoll上等待的首个进程
pub fn ep_wake_one(&self) {
self.epoll_wq.wakeup(None);
}
}
/// 与C兼容的Epoll事件结构体
#[derive(Copy, Clone, Default)]
#[repr(packed)]
pub struct EPollEvent {
/// 表示触发的事件
events: u32,
/// 内核态不使用该字段,该字段由用户态自由使用,在事件发生时内核将会原样返回
data: u64,
}
impl Debug for EPollEvent {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let events = self.events;
let u64 = self.data;
f.debug_struct("epoll_event")
.field("events", &events)
.field("data", &u64)
.finish()
}
}
impl EPollEvent {
pub fn set_events(&mut self, events: u32) {
self.events = events;
}
pub fn events(&self) -> u32 {
self.events
}
}
/// ## epoll_ctl函数的参数
#[derive(Debug, PartialEq)]
pub enum EPollCtlOption {
/// 注册新的文件描述符到epfd
EpollCtlAdd,
/// 将对应的文件描述符从epfd中删除
EpollCtlDel,
/// 修改已经注册的文件描述符的监听事件
EpollCtlMod,
}
impl EPollCtlOption {
pub fn from_op_num(op: usize) -> Result<Self, SystemError> {
match op {
1 => Ok(Self::EpollCtlAdd),
2 => Ok(Self::EpollCtlDel),
3 => Ok(Self::EpollCtlMod),
_ => Err(SystemError::EINVAL),
}
}
}
bitflags! {
#[allow(dead_code)]
pub struct EPollEventType: u32 {
/// 对应的描述符有新的数据可读时会触发
const EPOLLIN = 0x00000001;
/// 对应的描述符有紧急数据可读时会触发
const EPOLLPRI = 0x00000002;
/// 对应的描述符可以写入数据时会触发
const EPOLLOUT = 0x00000004;
/// 对应的描述符发生错误时会触发
const EPOLLERR = 0x00000008;
/// 对应的描述符被挂断(连接关闭)时会触发
const EPOLLHUP = 0x00000010;
/// 对应的描述符不是一个有效的文件描述符时会触发
const EPOLLNVAL = 0x00000020;
/// 普通数据可读,类似于`EPOLLIN`
const EPOLLRDNORM = 0x00000040;
/// 优先级带外数据可读
const EPOLLRDBAND = 0x00000080;
/// 普通数据可写,类似于'EPOLLOUT'
const EPOLLWRNORM = 0x00000100;
/// 优先级带外数据可写
const EPOLLWRBAND = 0x00000200;
/// 通过消息队列收到消息时会触
const EPOLLMSG = 0x00000400;
/// 对应的描述符被挂断(连接关闭)的一端发送了 FIN 时会触发(读关闭)
const EPOLLRDHUP = 0x00002000;
/// 以下为额外选项
///
/// 特定选项,用于异步 I/O目前未实现
const EPOLL_URING_WAKE = 1u32 << 27;
/// 设置epoll为独占模式
const EPOLLEXCLUSIVE = 1u32 << 28;
/// 允许在系统挂起时唤醒 epoll通常用于通过 eventfd 或 timerfd 唤醒 epoll,(通常与电源管理相关,未实现)
const EPOLLWAKEUP = 1u32 << 29;
/// 表示只监听一次事件,之后需要重新添加
const EPOLLONESHOT = 1u32 << 30;
/// 启用边缘触发模式(即只有下次触发事件时才会通过epoll_wait返回)
/// 对应为水平触发(默认)水平触发模式下若这次未处理完数据那epoll还会将其加入自己的就绪队列
const EPOLLET = 1u32 << 31;
/// 以下为组合码
const EPOLLINOUT_BITS = Self::EPOLLIN.bits | Self::EPOLLOUT.bits;
const EPOLLEXCLUSIVE_OK_BITS =
Self::EPOLLINOUT_BITS.bits
| Self::EPOLLERR.bits
| Self::EPOLLHUP.bits
| Self::EPOLLWAKEUP.bits
| Self::EPOLLET.bits
| Self::EPOLLEXCLUSIVE.bits;
const EP_PRIVATE_BITS =
Self::EPOLLWAKEUP.bits
| Self::EPOLLONESHOT.bits
| Self::EPOLLET.bits
| Self::EPOLLEXCLUSIVE.bits;
/// 表示epoll已经被释放但是在目前的设计中未用到
const POLLFREE = 0x4000;
}
}

View File

@ -0,0 +1,107 @@
use crate::{
arch::ipc::signal::SigSet,
filesystem::vfs::file::FileMode,
ipc::signal::set_current_sig_blocked,
mm::VirtAddr,
syscall::{
user_access::{UserBufferReader, UserBufferWriter},
Syscall, SystemError,
},
time::TimeSpec,
};
use super::{EPollCtlOption, EPollEvent, EventPoll};
impl Syscall {
pub fn epoll_create(max_size: i32) -> Result<usize, SystemError> {
if max_size < 0 {
return Err(SystemError::EINVAL);
}
return EventPoll::do_create_epoll(FileMode::empty());
}
pub fn epoll_create1(flag: usize) -> Result<usize, SystemError> {
let flags = FileMode::from_bits_truncate(flag as u32);
let ret = EventPoll::do_create_epoll(flags);
ret
}
pub fn epoll_wait(
epfd: i32,
events: VirtAddr,
max_events: i32,
timeout: i32,
) -> Result<usize, SystemError> {
if max_events <= 0 || max_events as u32 > EventPoll::EP_MAX_EVENTS {
return Err(SystemError::EINVAL);
}
let mut timespec = None;
if timeout == 0 {
timespec = Some(TimeSpec::new(0, 0));
}
if timeout > 0 {
let sec: i64 = timeout as i64 / 1000;
let nsec: i64 = 1000000 * (timeout as i64 % 1000);
timespec = Some(TimeSpec::new(sec, nsec))
}
// 从用户传入的地址中拿到epoll_events
let mut epds_writer = UserBufferWriter::new(
events.as_ptr::<EPollEvent>(),
max_events as usize * core::mem::size_of::<EPollEvent>(),
true,
)?;
let epoll_events = epds_writer.buffer::<EPollEvent>(0)?;
return EventPoll::do_epoll_wait(epfd, epoll_events, max_events, timespec);
}
pub fn epoll_ctl(epfd: i32, op: usize, fd: i32, event: VirtAddr) -> Result<usize, SystemError> {
let op = EPollCtlOption::from_op_num(op)?;
let mut epds = EPollEvent::default();
if op != EPollCtlOption::EpollCtlDel {
// 不为EpollCtlDel时不允许传入空指针
if event.is_null() {
return Err(SystemError::EFAULT);
}
// 还是一样的问题C标准的epoll_event大小为12字节而内核实现的epoll_event内存对齐后为16字节
// 这样分别拷贝其实和整体拷贝差别不大,内核使用内存对其版本甚至可能提升性能
let epds_reader = UserBufferReader::new(
event.as_ptr::<EPollEvent>(),
core::mem::size_of::<EPollEvent>(),
true,
)?;
// 拷贝到内核
epds_reader.copy_one_from_user(&mut epds, 0)?;
}
return EventPoll::do_epoll_ctl(epfd, op, fd, &mut epds, false);
}
/// ## 在epoll_wait时屏蔽某些信号
pub fn epoll_pwait(
epfd: i32,
epoll_event: VirtAddr,
max_events: i32,
timespec: i32,
mut sigmask: &mut SigSet,
) -> Result<usize, SystemError> {
// 设置屏蔽的信号
set_current_sig_blocked(&mut sigmask);
let wait_ret = Self::epoll_wait(epfd, epoll_event, max_events, timespec);
if wait_ret.is_err() && *wait_ret.as_ref().unwrap_err() != SystemError::EINTR {
// TODO: 恢复信号?
// linkhttps://code.dragonos.org.cn/xref/linux-6.1.9/fs/eventpoll.c#2294
}
wait_ret
}
}

View File

@ -3,14 +3,28 @@ use core::{
sync::atomic::AtomicUsize,
};
use alloc::{boxed::Box, collections::BTreeMap, sync::Arc};
use alloc::{
boxed::Box,
collections::BTreeMap,
sync::{Arc, Weak},
};
use crate::{driver::net::NetDriver, kwarn, libs::rwlock::RwLock, syscall::SystemError};
use smoltcp::wire::IpEndpoint;
use crate::{
driver::net::NetDriver,
kwarn,
libs::{rwlock::RwLock, spinlock::SpinLock},
net::event_poll::EventPoll,
syscall::SystemError,
};
use smoltcp::{iface::SocketHandle, wire::IpEndpoint};
use self::socket::SocketMetadata;
use self::{
event_poll::{EPollEventType, EPollItem},
socket::{SocketMetadata, HANDLE_MAP},
};
pub mod endpoints;
pub mod event_poll;
pub mod net_core;
pub mod socket;
pub mod syscall;
@ -28,28 +42,13 @@ pub fn generate_iface_id() -> usize {
.into();
}
/// @brief 用于指定socket的关闭类型
/// 参考https://pubs.opengroup.org/onlinepubs/9699919799/functions/shutdown.html
#[derive(Debug, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive)]
pub enum ShutdownType {
ShutRd = 0, // Disables further receive operations.
ShutWr = 1, // Disables further send operations.
ShutRdwr = 2, // Disables further send and receive operations.
}
impl TryFrom<i32> for ShutdownType {
type Error = SystemError;
fn try_from(value: i32) -> Result<Self, Self::Error> {
use num_traits::FromPrimitive;
<Self as FromPrimitive>::from_i32(value).ok_or(SystemError::EINVAL)
}
}
impl Into<i32> for ShutdownType {
fn into(self) -> i32 {
use num_traits::ToPrimitive;
<Self as ToPrimitive>::to_i32(&self).unwrap()
bitflags! {
/// @brief 用于指定socket的关闭类型
/// 参考https://opengrok.ringotek.cn/xref/linux-6.1.9/include/net/sock.h?fi=SHUTDOWN_MASK#1573
pub struct ShutdownType: u8 {
const RCV_SHUTDOWN = 1;
const SEND_SHUTDOWN = 2;
const SHUTDOWN_MASK = 3;
}
}
@ -69,7 +68,7 @@ pub trait Socket: Sync + Send + Debug {
///
/// @return - 成功:(返回读取的数据的长度,读取数据的端点).
/// - 失败:错误码
fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint);
fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint);
/// @brief 向socket中写入数据。如果socket是阻塞的那么直到写入的数据全部写入socket中才返回
///
@ -109,7 +108,7 @@ pub trait Socket: Sync + Send + Debug {
/// 此函数向远程端点发送关闭消息以指示本地端点不再接受新数据。
///
/// @return 返回是否成功关闭
fn shutdown(&self, _type: ShutdownType) -> Result<(), SystemError> {
fn shutdown(&mut self, _type: ShutdownType) -> Result<(), SystemError> {
return Err(SystemError::ENOSYS);
}
@ -156,8 +155,8 @@ pub trait Socket: Sync + Send + Debug {
/// The second boolean value indicates whether the socket is ready for writing. If it is true, then data can be written to the socket without blocking.
/// The third boolean value indicates whether the socket has encountered an error condition. If it is true, then the socket is in an error state and should be closed or reset
///
fn poll(&self) -> (bool, bool, bool) {
return (false, false, false);
fn poll(&self) -> EPollEventType {
return EPollEventType::empty();
}
/// @brief socket的ioctl函数
@ -199,6 +198,46 @@ pub trait Socket: Sync + Send + Debug {
kwarn!("setsockopt is not implemented");
return Ok(());
}
fn socket_handle(&self) -> SocketHandle;
fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
HANDLE_MAP
.write_irqsave()
.get_mut(&self.socket_handle())
.unwrap()
.add_epoll(epitem);
Ok(())
}
fn remove_epoll(
&mut self,
epoll: &Weak<SpinLock<event_poll::EventPoll>>,
) -> Result<(), SystemError> {
HANDLE_MAP
.write_irqsave()
.get_mut(&self.socket_handle())
.unwrap()
.remove_epoll(epoll)?;
Ok(())
}
fn clear_epoll(&mut self) -> Result<(), SystemError> {
let mut handle_map_guard = HANDLE_MAP.write_irqsave();
let handle_item = handle_map_guard.get_mut(&self.socket_handle()).unwrap();
for epitem in handle_item.epitems.lock_irqsave().iter() {
let epoll = epitem.epoll();
let _ = EventPoll::ep_remove(
&mut epoll.upgrade().unwrap().lock_irqsave(),
epitem.fd(),
None,
);
}
Ok(())
}
}
impl Clone for Box<dyn Socket> {

View File

@ -1,16 +1,19 @@
use alloc::{boxed::Box, collections::BTreeMap, sync::Arc};
use smoltcp::{socket::dhcpv4, wire};
use smoltcp::{iface::SocketHandle, socket::dhcpv4, wire};
use crate::{
driver::net::NetDriver,
kdebug, kinfo, kwarn,
libs::rwlock::RwLockReadGuard,
net::NET_DRIVERS,
net::{socket::SocketPollMethod, NET_DRIVERS},
syscall::SystemError,
time::timer::{next_n_ms_timer_jiffies, Timer, TimerFunction},
};
use super::socket::{SOCKET_SET, SOCKET_WAITQUEUE};
use super::{
event_poll::EPollEventType,
socket::{TcpSocket, HANDLE_MAP, SOCKET_SET},
};
/// The network poll function, which will be called by timer.
///
@ -126,7 +129,7 @@ pub fn poll_ifaces() {
for (_, iface) in guard.iter() {
iface.poll(&mut sockets).ok();
}
SOCKET_WAITQUEUE.wakeup_all(None);
let _ = send_event(&sockets);
}
/// 对ifaces进行轮询最多对SOCKET_SET尝试times次加锁。
@ -154,7 +157,7 @@ pub fn poll_ifaces_try_lock(times: u16) -> Result<(), SystemError> {
for (_, iface) in guard.iter() {
iface.poll(&mut sockets).ok();
}
SOCKET_WAITQUEUE.wakeup_all(None);
let _ = send_event(&sockets);
return Ok(());
}
@ -178,6 +181,103 @@ pub fn poll_ifaces_try_lock_onetime() -> Result<(), SystemError> {
for (_, iface) in guard.iter() {
iface.poll(&mut sockets).ok();
}
SOCKET_WAITQUEUE.wakeup_all(None);
send_event(&sockets)?;
return Ok(());
}
/// ### 处理轮询后的事件
fn send_event(sockets: &smoltcp::iface::SocketSet) -> Result<(), SystemError> {
for (handle, socket_type) in sockets.iter() {
let handle_guard = HANDLE_MAP.read_irqsave();
let item = handle_guard.get(&handle);
if item.is_none() {
continue;
}
let handle_item = item.unwrap();
// 获取socket上的事件
let mut events =
SocketPollMethod::poll(socket_type, handle_item.shutdown_type()).bits() as u64;
// 分发到相应类型socket处理
match socket_type {
smoltcp::socket::Socket::Raw(_) | smoltcp::socket::Socket::Udp(_) => {
handle_guard
.get(&handle)
.unwrap()
.wait_queue
.wakeup_any(events);
}
smoltcp::socket::Socket::Icmp(_) => unimplemented!("Icmp socket hasn't unimplemented"),
smoltcp::socket::Socket::Tcp(inner_socket) => {
if inner_socket.is_active() {
events |= TcpSocket::CAN_ACCPET;
}
if inner_socket.state() == smoltcp::socket::tcp::State::Established {
events |= TcpSocket::CAN_CONNECT;
}
handle_guard
.get(&handle)
.unwrap()
.wait_queue
.wakeup_any(events);
}
smoltcp::socket::Socket::Dhcpv4(_) => {}
smoltcp::socket::Socket::Dns(_) => unimplemented!("Dns socket hasn't unimplemented"),
}
drop(handle_guard);
wakeup_epoll(handle, events as u32)?;
// crate::kdebug!(
// "{} send_event {:?}",
// handle,
// EPollEventType::from_bits_truncate(events as u32)
// );
}
Ok(())
}
/// ### 处理epoll
fn wakeup_epoll(handle: SocketHandle, events: u32) -> Result<(), SystemError> {
let mut handle_guard = HANDLE_MAP.write_irqsave();
let handle_item = handle_guard.get_mut(&handle).unwrap();
let mut epitems_guard = handle_item.epitems.try_lock()?;
// 从events拿到epoll相关事件
let pollflags = EPollEventType::from_bits_truncate(events);
// 一次只取一个,因为一次也只有一个进程能拿到对应文件的🔓
if let Some(epitem) = epitems_guard.pop_front() {
let epoll = epitem.epoll().upgrade().unwrap();
let mut epoll_guard = epoll.try_lock()?;
let binding = epitem.clone();
let event_guard = binding.event().read();
let ep_events = EPollEventType::from_bits_truncate(event_guard.events());
// 检查事件合理性以及是否有感兴趣的事件
if !(ep_events
.difference(EPollEventType::EP_PRIVATE_BITS)
.is_empty()
|| pollflags.difference(ep_events).is_empty())
{
// TODO: 未处理pm相关
// 首先将就绪的epitem加入等待队列
epoll_guard.ep_add_ready(epitem.clone());
if epoll_guard.ep_has_waiter() {
if ep_events.contains(EPollEventType::EPOLLEXCLUSIVE)
&& !pollflags.contains(EPollEventType::POLLFREE)
{
// 避免惊群
epoll_guard.ep_wake_one();
} else {
epoll_guard.ep_wake_all();
}
}
}
epitems_guard.push_back(epitem);
}
Ok(())
}

View File

@ -1,35 +1,127 @@
#![allow(dead_code)]
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloc::{
boxed::Box,
collections::LinkedList,
sync::{Arc, Weak},
vec::Vec,
};
use hashbrown::HashMap;
use smoltcp::{
iface::{SocketHandle, SocketSet},
socket::{raw, tcp, udp},
socket::{
self, raw,
tcp::{self, State},
udp,
},
wire,
};
use crate::{
arch::rand::rand,
arch::{rand::rand, sched::sched},
driver::net::NetDriver,
filesystem::vfs::{syscall::ModeType, FileType, IndexNode, Metadata, PollStatus},
filesystem::vfs::{syscall::ModeType, FileType, IndexNode, Metadata},
kerror, kwarn,
libs::{
rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard},
spinlock::{SpinLock, SpinLockGuard},
wait_queue::WaitQueue,
wait_queue::EventWaitQueue,
},
syscall::SystemError,
};
use super::{net_core::poll_ifaces, Endpoint, Protocol, Socket, NET_DRIVERS};
use super::{
event_poll::{EPollEventType, EPollItem, EventPoll},
net_core::poll_ifaces,
Endpoint, Protocol, ShutdownType, Socket, NET_DRIVERS,
};
lazy_static! {
/// 所有socket的集合
/// TODO: 优化这里自己实现SocketSet现在这样的话不管全局有多少个网卡每个时间点都只会有1个进程能够访问socket
pub static ref SOCKET_SET: SpinLock<SocketSet<'static >> = SpinLock::new(SocketSet::new(vec![]));
pub static ref SOCKET_WAITQUEUE: WaitQueue = WaitQueue::INIT;
/// SocketHandle表每个SocketHandle对应一个SocketHandleItem
/// 注意!:在网卡中断中需要拿到这张表的🔓,在获取读锁时应该确保关中断避免死锁
pub static ref HANDLE_MAP: RwLock<HashMap<SocketHandle,SocketHandleItem>> = RwLock::new(HashMap::new());
/// 端口管理器
pub static ref PORT_MANAGER: PortManager = PortManager::new();
}
#[derive(Debug)]
pub struct SocketHandleItem {
/// socket元数据
metadata: SocketMetadata,
/// shutdown状态
pub shutdown_type: RwLock<ShutdownType>,
/// socket的waitqueue
pub wait_queue: EventWaitQueue,
/// epitems考虑写在这是否是最优解
pub epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
}
impl SocketHandleItem {
pub fn new(socket: &Box<dyn Socket>) -> Self {
Self {
metadata: socket.metadata().unwrap(),
shutdown_type: RwLock::new(ShutdownType::empty()),
wait_queue: EventWaitQueue::new(),
epitems: SpinLock::new(LinkedList::new()),
}
}
pub fn from_socket<A: Socket>(socket: &Box<A>) -> Self {
Self {
metadata: socket.metadata().unwrap(),
shutdown_type: RwLock::new(ShutdownType::empty()),
wait_queue: EventWaitQueue::new(),
epitems: SpinLock::new(LinkedList::new()),
}
}
/// ### 在socket的等待队列上睡眠
pub fn sleep(
socket_handle: SocketHandle,
events: u64,
handle_map_guard: RwLockReadGuard<'_, HashMap<SocketHandle, SocketHandleItem>>,
) {
unsafe {
handle_map_guard
.get(&socket_handle)
.unwrap()
.wait_queue
.sleep_without_schedule(events)
};
drop(handle_map_guard);
sched();
}
pub fn shutdown_type(&self) -> ShutdownType {
self.shutdown_type.read().clone()
}
pub fn shutdown_type_writer(&mut self) -> RwLockWriteGuard<ShutdownType> {
self.shutdown_type.write()
}
pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) {
self.epitems.lock_irqsave().push_back(epitem)
}
pub fn remove_epoll(&mut self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
let is_remove = !self
.epitems
.lock_irqsave()
.extract_if(|x| x.epoll().ptr_eq(epoll))
.collect::<Vec<_>>()
.is_empty();
if is_remove {
return Ok(());
}
Err(SystemError::ENOENT)
}
}
/// @brief TCP 和 UDP 的端口管理器。
/// 如果 TCP/UDP 的 socket 绑定了某个端口,它会在对应的表中记录,以检测端口冲突。
pub struct PortManager {
@ -280,7 +372,7 @@ impl RawSocket {
}
impl Socket for RawSocket {
fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
poll_ifaces();
loop {
// 如何优化这里?
@ -306,7 +398,11 @@ impl Socket for RawSocket {
}
}
drop(socket_set_guard);
SOCKET_WAITQUEUE.sleep();
SocketHandleItem::sleep(
self.socket_handle(),
EPollEventType::EPOLLIN.bits() as u64,
HANDLE_MAP.read_irqsave(),
);
}
}
@ -396,6 +492,10 @@ impl Socket for RawSocket {
fn box_clone(&self) -> alloc::boxed::Box<dyn Socket> {
return Box::new(self.clone());
}
fn socket_handle(&self) -> SocketHandle {
self.handle.0
}
}
/// @brief 表示udp socket
@ -475,7 +575,7 @@ impl UdpSocket {
impl Socket for UdpSocket {
/// @brief 在read函数执行之前请先bind到本地的指定端口
fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
loop {
// kdebug!("Wait22 to Read");
poll_ifaces();
@ -495,7 +595,11 @@ impl Socket for UdpSocket {
// return (Err(SystemError::ENOTCONN), Endpoint::Ip(None));
}
drop(socket_set_guard);
SOCKET_WAITQUEUE.sleep();
SocketHandleItem::sleep(
self.socket_handle(),
EPollEventType::EPOLLIN.bits() as u64,
HANDLE_MAP.read_irqsave(),
);
}
}
@ -562,11 +666,18 @@ impl Socket for UdpSocket {
return self.do_bind(socket, endpoint);
}
fn poll(&self) -> (bool, bool, bool) {
fn poll(&self) -> EPollEventType {
let sockets = SOCKET_SET.lock();
let socket = sockets.get::<udp::Socket>(self.handle.0);
return (socket.can_send(), socket.can_recv(), false);
return SocketPollMethod::udp_poll(
socket,
HANDLE_MAP
.read_irqsave()
.get(&self.socket_handle())
.unwrap()
.shutdown_type(),
);
}
/// @brief
@ -620,6 +731,10 @@ impl Socket for UdpSocket {
fn peer_endpoint(&self) -> Option<Endpoint> {
return self.remote_endpoint.clone();
}
fn socket_handle(&self) -> SocketHandle {
self.handle.0
}
}
/// @brief 表示 tcp socket
@ -641,6 +756,10 @@ impl TcpSocket {
/// 默认的接收缓冲区的大小 receive
pub const DEFAULT_TX_BUF_SIZE: usize = 512 * 1024;
/// TcpSocket的特殊事件用于在事件等待队列上sleep
pub const CAN_CONNECT: u64 = 1u64 << 63;
pub const CAN_ACCPET: u64 = 1u64 << 62;
/// @brief 创建一个原始的socket
///
/// @param protocol 协议号
@ -700,7 +819,16 @@ impl TcpSocket {
}
impl Socket for TcpSocket {
fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
if HANDLE_MAP
.read_irqsave()
.get(&self.socket_handle())
.unwrap()
.shutdown_type()
.contains(ShutdownType::RCV_SHUTDOWN)
{
return (Err(SystemError::ENOTCONN), Endpoint::Ip(None));
}
// kdebug!("tcp socket: read, buf len={}", buf.len());
loop {
@ -737,6 +865,13 @@ impl Socket for TcpSocket {
return (Err(SystemError::ENOTCONN), Endpoint::Ip(None));
}
tcp::RecvError::Finished => {
// 对端写端已关闭,我们应该关闭读端
HANDLE_MAP
.write_irqsave()
.get_mut(&self.socket_handle())
.unwrap()
.shutdown_type_writer()
.insert(ShutdownType::RCV_SHUTDOWN);
return (Err(SystemError::ENOTCONN), Endpoint::Ip(None));
}
}
@ -745,11 +880,24 @@ impl Socket for TcpSocket {
return (Err(SystemError::ENOTCONN), Endpoint::Ip(None));
}
drop(socket_set_guard);
SOCKET_WAITQUEUE.sleep();
SocketHandleItem::sleep(
self.socket_handle(),
EPollEventType::EPOLLIN.bits() as u64,
HANDLE_MAP.read_irqsave(),
);
}
}
fn write(&self, buf: &[u8], _to: Option<super::Endpoint>) -> Result<usize, SystemError> {
if HANDLE_MAP
.read_irqsave()
.get(&self.socket_handle())
.unwrap()
.shutdown_type()
.contains(ShutdownType::RCV_SHUTDOWN)
{
return Err(SystemError::ENOTCONN);
}
let mut socket_set_guard = SOCKET_SET.lock();
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handle.0);
@ -774,27 +922,18 @@ impl Socket for TcpSocket {
return Err(SystemError::ENOTCONN);
}
fn poll(&self) -> (bool, bool, bool) {
fn poll(&self) -> EPollEventType {
let mut socket_set_guard = SOCKET_SET.lock();
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handle.0);
let mut input = false;
let mut output = false;
let mut error = false;
if self.is_listening && socket.is_active() {
input = true;
} else if !socket.is_open() {
error = true;
} else {
if socket.may_recv() {
input = true;
}
if socket.can_send() {
output = true;
}
}
return (input, output, error);
return SocketPollMethod::tcp_poll(
socket,
HANDLE_MAP
.read_irqsave()
.get(&self.socket_handle())
.unwrap()
.shutdown_type(),
);
}
fn connect(&mut self, endpoint: Endpoint) -> Result<(), SystemError> {
@ -828,7 +967,11 @@ impl Socket for TcpSocket {
}
tcp::State::SynSent => {
drop(sockets);
SOCKET_WAITQUEUE.sleep();
SocketHandleItem::sleep(
self.socket_handle(),
Self::CAN_CONNECT,
HANDLE_MAP.read_irqsave(),
);
}
_ => {
return Err(SystemError::ECONNREFUSED);
@ -885,10 +1028,13 @@ impl Socket for TcpSocket {
return Err(SystemError::EINVAL);
}
fn shutdown(&self, _type: super::ShutdownType) -> Result<(), SystemError> {
let mut sockets = SOCKET_SET.lock();
let socket = sockets.get_mut::<tcp::Socket>(self.handle.0);
socket.close();
fn shutdown(&mut self, shutdown_type: super::ShutdownType) -> Result<(), SystemError> {
// TODO目前只是在表层判断对端不知晓后续需使用tcp实现
HANDLE_MAP
.write_irqsave()
.get_mut(&self.socket_handle())
.unwrap()
.shutdown_type = RwLock::new(shutdown_type);
return Ok(());
}
@ -940,12 +1086,24 @@ impl Socket for TcpSocket {
self.metadata.options,
);
Box::new(TcpSocket {
handle: old_handle,
let new_socket = Box::new(TcpSocket {
handle: old_handle.clone(),
local_endpoint: self.local_endpoint,
is_listening: false,
metadata,
})
});
// 更新handle表
let mut handle_guard = HANDLE_MAP.write_irqsave();
// 先删除原来的
let item = handle_guard.remove(&old_handle.0).unwrap();
// 按照smoltcp行为将新的handle绑定到原来的item
handle_guard.insert(new_handle.0, item);
let new_item = SocketHandleItem::from_socket(&new_socket);
// 插入新的item
handle_guard.insert(old_handle.0, new_item);
new_socket
};
// kdebug!("tcp accept: new socket: {:?}", new_socket);
drop(sockets);
@ -954,7 +1112,12 @@ impl Socket for TcpSocket {
return Ok((new_socket, Endpoint::Ip(Some(remote_ep))));
}
drop(sockets);
SOCKET_WAITQUEUE.sleep();
SocketHandleItem::sleep(
self.socket_handle(),
Self::CAN_ACCPET,
HANDLE_MAP.read_irqsave(),
);
}
}
@ -985,6 +1148,10 @@ impl Socket for TcpSocket {
fn box_clone(&self) -> alloc::boxed::Box<dyn Socket> {
return Box::new(self.clone());
}
fn socket_handle(&self) -> SocketHandle {
self.handle.0
}
}
/// @brief 地址族的枚举
@ -1150,10 +1317,17 @@ impl IndexNode for SocketInode {
&self,
_data: &mut crate::filesystem::vfs::FilePrivateData,
) -> Result<(), SystemError> {
let socket = self.0.lock();
let mut socket = self.0.lock_irqsave();
if let Some(Endpoint::Ip(Some(ip))) = socket.endpoint() {
PORT_MANAGER.unbind_port(socket.metadata().unwrap().socket_type, ip.port)?;
}
let _ = socket.clear_epoll();
HANDLE_MAP
.write_irqsave()
.remove(&socket.socket_handle())
.unwrap();
return Ok(());
}
@ -1177,19 +1351,9 @@ impl IndexNode for SocketInode {
return self.0.lock_no_preempt().write(&buf[0..len], None);
}
fn poll(&self) -> Result<crate::filesystem::vfs::PollStatus, SystemError> {
let (read, write, error) = self.0.lock().poll();
let mut result = PollStatus::empty();
if read {
result.insert(PollStatus::READ);
}
if write {
result.insert(PollStatus::WRITE);
}
if error {
result.insert(PollStatus::ERROR);
}
return Ok(result);
fn poll(&self) -> Result<usize, SystemError> {
let events = self.0.lock_irqsave().poll();
return Ok(events.bits() as usize);
}
fn fs(&self) -> alloc::sync::Arc<dyn crate::filesystem::vfs::FileSystem> {
@ -1218,3 +1382,100 @@ impl IndexNode for SocketInode {
return Ok(());
}
}
/// ### 为socket提供无锁的poll方法
///
/// 因为在网卡中断中需要轮询socket的状态如果使用socket文件或者其inode来poll
/// 在当前的设计会必然死锁所以引用这一个设计来解决提供无🔓的poll
pub struct SocketPollMethod;
impl SocketPollMethod {
pub fn poll(socket: &socket::Socket, shutdown: ShutdownType) -> EPollEventType {
match socket {
socket::Socket::Raw(_) => todo!(),
socket::Socket::Icmp(_) => todo!(),
socket::Socket::Udp(udp) => Self::udp_poll(udp, shutdown),
socket::Socket::Tcp(tcp) => Self::tcp_poll(tcp, shutdown),
socket::Socket::Dhcpv4(_) => todo!(),
socket::Socket::Dns(_) => todo!(),
}
}
pub fn tcp_poll(socket: &socket::tcp::Socket, shutdown: ShutdownType) -> EPollEventType {
let mut events = EPollEventType::empty();
if socket.is_listening() && socket.is_active() {
events.insert(EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM);
return events;
}
// socket已经关闭
if !socket.is_open() {
events.insert(EPollEventType::EPOLLHUP)
}
if shutdown.contains(ShutdownType::RCV_SHUTDOWN) {
events.insert(
EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM | EPollEventType::EPOLLRDHUP,
);
}
let state = socket.state();
if state != State::SynSent && state != State::SynReceived {
// socket有可读数据
if socket.can_recv() {
events.insert(EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM);
}
if !(shutdown.contains(ShutdownType::SEND_SHUTDOWN)) {
// 缓冲区可写
if socket.send_queue() < socket.send_capacity() {
events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
} else {
// TODO触发缓冲区已满的信号
todo!("A signal that the buffer is full needs to be sent");
}
} else {
// 如果我们的socket关闭了SEND_SHUTDOWNepoll事件就是EPOLLOUT
events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
}
} else if state == State::SynSent {
events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
}
// socket发生错误
if !socket.is_active() {
events.insert(EPollEventType::EPOLLERR);
}
events
}
pub fn udp_poll(socket: &socket::udp::Socket, shutdown: ShutdownType) -> EPollEventType {
let mut event = EPollEventType::empty();
if shutdown.contains(ShutdownType::RCV_SHUTDOWN) {
event.insert(
EPollEventType::EPOLLRDHUP | EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM,
);
}
if shutdown.contains(ShutdownType::SHUTDOWN_MASK) {
event.insert(EPollEventType::EPOLLHUP);
}
if socket.can_recv() {
event.insert(EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM);
}
if socket.can_send() {
event.insert(
EPollEventType::EPOLLOUT
| EPollEventType::EPOLLWRNORM
| EPollEventType::EPOLLWRBAND,
);
} else {
// TODO: 缓冲区空间不够,需要使用信号处理
todo!()
}
return event;
}
}

View File

@ -17,7 +17,10 @@ use crate::{
};
use super::{
socket::{PosixSocketType, RawSocket, SocketInode, SocketOptions, TcpSocket, UdpSocket},
socket::{
PosixSocketType, RawSocket, SocketHandleItem, SocketInode, SocketOptions, TcpSocket,
UdpSocket, HANDLE_MAP,
},
Endpoint, Protocol, ShutdownType, Socket,
};
@ -58,6 +61,10 @@ impl Syscall {
return Err(SystemError::EAFNOSUPPORT);
}
};
let handle_item = SocketHandleItem::new(&socket);
HANDLE_MAP
.write_irqsave()
.insert(socket.socket_handle(), handle_item);
// kdebug!("do_socket: socket: {socket:?}");
let socketinode: Arc<SocketInode> = SocketInode::new(socket);
let f = File::new(socketinode, FileMode::O_RDWR)?;
@ -248,7 +255,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = unsafe { socket.inner_no_preempt() };
let mut socket = unsafe { socket.inner_no_preempt() };
let (n, endpoint) = socket.read(buf);
drop(socket);
@ -279,7 +286,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = unsafe { socket.inner_no_preempt() };
let mut socket = unsafe { socket.inner_no_preempt() };
let mut buf = iovs.new_buf(true);
// 从socket中读取数据
@ -323,8 +330,8 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = unsafe { socket.inner_no_preempt() };
socket.shutdown(ShutdownType::try_from(how as i32)?)?;
let mut socket = unsafe { socket.inner_no_preempt() };
socket.shutdown(ShutdownType::from_bits_truncate(how as u8))?;
return Ok(0);
}

View File

@ -4,7 +4,7 @@ use core::{
};
use crate::{
arch::syscall::nr::*,
arch::{ipc::signal::SigSet, syscall::nr::*},
libs::{futex::constant::FutexFlag, rand::GRandFlags},
process::{
fork::KernelCloneArgs,
@ -1071,6 +1071,46 @@ impl Syscall {
res
}
SYS_EPOLL_CREATE => Self::epoll_create(args[0] as i32),
SYS_EPOLL_CREATE1 => Self::epoll_create1(args[0]),
SYS_EPOLL_CTL => Self::epoll_ctl(
args[0] as i32,
args[1],
args[2] as i32,
VirtAddr::new(args[3]),
),
SYS_EPOLL_WAIT => Self::epoll_wait(
args[0] as i32,
VirtAddr::new(args[1]),
args[2] as i32,
args[3] as i32,
),
SYS_EPOLL_PWAIT => {
let epfd = args[0] as i32;
let epoll_event = VirtAddr::new(args[1]);
let max_events = args[2] as i32;
let timespec = args[3] as i32;
let sigmask_addr = args[4] as *mut SigSet;
if sigmask_addr.is_null() {
return Self::epoll_wait(epfd, epoll_event, max_events, timespec);
}
let sigmask_reader =
UserBufferReader::new(sigmask_addr, core::mem::size_of::<SigSet>(), true)?;
let mut sigmask = sigmask_reader.read_one_from_user::<SigSet>(0)?.clone();
Self::epoll_pwait(
args[0] as i32,
VirtAddr::new(args[1]),
args[2] as i32,
args[3] as i32,
&mut sigmask,
)
}
// 目前为了适配musl-libc,以下系统调用先这样写着
SYS_GETRANDOM => {
let flags = GRandFlags::from_bits(args[2] as u8).ok_or(SystemError::EINVAL)?;

View File

@ -2,7 +2,7 @@ use crate::filesystem::devfs::{DevFS, DeviceINode};
use crate::filesystem::vfs::{
core::generate_inode_id,
file::{File, FileMode},
make_rawdev, FilePrivateData, FileSystem, FileType, IndexNode, Metadata, PollStatus,
make_rawdev, FilePrivateData, FileSystem, FileType, IndexNode, Metadata,
};
use crate::process::ProcessManager;
use crate::{arch::KVMArch, libs::spinlock::SpinLock, syscall::SystemError, time::TimeSpec};
@ -119,10 +119,6 @@ impl IndexNode for LockedKvmInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// @brief io control接口
///
/// @param cmd 命令

View File

@ -3,7 +3,7 @@ use crate::arch::KVMArch;
use crate::filesystem::devfs::DevFS;
use crate::filesystem::vfs::{
core::generate_inode_id, file::FileMode, make_rawdev, FilePrivateData, FileSystem, FileType,
IndexNode, Metadata, PollStatus,
IndexNode, Metadata,
};
use crate::mm::VirtAddr;
use crate::syscall::user_access::copy_from_user;
@ -127,10 +127,6 @@ impl IndexNode for LockedVcpuInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// @brief io control接口
///
/// @param cmd 命令

View File

@ -2,7 +2,7 @@ use crate::filesystem::devfs::DevFS;
use crate::filesystem::vfs::{
core::generate_inode_id,
file::{File, FileMode},
make_rawdev, FilePrivateData, FileSystem, FileType, IndexNode, Metadata, PollStatus,
make_rawdev, FilePrivateData, FileSystem, FileType, IndexNode, Metadata,
};
use crate::mm::VirtAddr;
use crate::process::ProcessManager;
@ -125,10 +125,6 @@ impl IndexNode for LockedVmInode {
return Ok(());
}
fn poll(&self) -> Result<PollStatus, SystemError> {
return Ok(PollStatus::READ | PollStatus::WRITE);
}
/// @brief io control接口
///
/// @param cmd 命令