diff --git a/kernel/src/ipc/pipe.rs b/kernel/src/ipc/pipe.rs index 05b8927c..288a3662 100644 --- a/kernel/src/ipc/pipe.rs +++ b/kernel/src/ipc/pipe.rs @@ -1,4 +1,7 @@ +use core::sync::atomic::compiler_fence; + use crate::{ + arch::ipc::signal::{SigCode, Signal}, filesystem::vfs::{ core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem, FileType, IndexNode, Metadata, @@ -8,7 +11,7 @@ use crate::{ wait_queue::WaitQueue, }, net::event_poll::{EPollEventType, EPollItem, EventPoll}, - process::ProcessState, + process::{ProcessManager, ProcessState}, sched::SchedMode, time::PosixTimeSpec, }; @@ -20,6 +23,8 @@ use alloc::{ }; use system_error::SystemError; +use super::signal_types::{SigInfo, SigType}; + /// 我们设定pipe_buff的总大小为1024字节 const PIPE_BUFF_SIZE: usize = 1024; @@ -59,6 +64,7 @@ pub struct InnerPipeInode { metadata: Metadata, reader: u32, writer: u32, + had_reader: bool, epitems: SpinLock>>, } @@ -131,6 +137,7 @@ impl LockedPipeInode { valid_cnt: 0, read_pos: 0, write_pos: 0, + had_reader: false, data: [0; PIPE_BUFF_SIZE], metadata: Metadata { @@ -278,15 +285,27 @@ impl IndexNode for LockedPipeInode { mut data: SpinLockGuard, mode: &crate::filesystem::vfs::file::FileMode, ) -> Result<(), SystemError> { + let accmode = mode.accmode(); let mut guard = self.inner.lock(); // 不能以读写方式打开管道 - if mode.contains(FileMode::O_RDWR) { + if accmode == FileMode::O_RDWR.bits() { return Err(SystemError::EACCES); - } - if mode.contains(FileMode::O_RDONLY) { + } else if accmode == FileMode::O_RDONLY.bits() { guard.reader += 1; - } - if mode.contains(FileMode::O_WRONLY) { + guard.had_reader = true; + // println!( + // "FIFO: pipe try open in read mode with reader pid:{:?}", + // ProcessManager::current_pid() + // ); + } else if accmode == FileMode::O_WRONLY.bits() { + // println!( + // "FIFO: pipe try open in write mode with {} reader, writer pid:{:?}", + // guard.reader, + // ProcessManager::current_pid() + // ); + if guard.reader == 0 && mode.contains(FileMode::O_NONBLOCK) { + return Err(SystemError::ENXIO); + } guard.writer += 1; } @@ -311,10 +330,11 @@ impl IndexNode for LockedPipeInode { } else { return Err(SystemError::EBADF); } + let accmode = mode.accmode(); let mut guard = self.inner.lock(); // 写端关闭 - if mode.contains(FileMode::O_WRONLY) { + if accmode == FileMode::O_WRONLY.bits() { assert!(guard.writer > 0); guard.writer -= 1; // 如果已经没有写端了,则唤醒读端 @@ -325,7 +345,7 @@ impl IndexNode for LockedPipeInode { } // 读端关闭 - if mode.contains(FileMode::O_RDONLY) { + if accmode == FileMode::O_RDONLY.bits() { assert!(guard.reader > 0); guard.reader -= 1; // 如果已经没有写端了,则唤醒读端 @@ -361,7 +381,35 @@ impl IndexNode for LockedPipeInode { let mut inode = self.inner.lock(); if inode.reader == 0 { - // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号 + if !inode.had_reader { + // 如果从未有读端,直接返回 ENXIO,无论是否阻塞模式 + return Err(SystemError::ENXIO); + } else { + // 如果曾经有读端,现在已关闭 + match mode.contains(FileMode::O_NONBLOCK) { + true => { + // 非阻塞模式,直接返回 EPIPE + return Err(SystemError::EPIPE); + } + false => { + let sig = Signal::SIGPIPE; + let mut info = SigInfo::new( + sig, + 0, + SigCode::Kernel, + SigType::Kill(ProcessManager::current_pid()), + ); + compiler_fence(core::sync::atomic::Ordering::SeqCst); + + let _retval = sig + .send_signal_info(Some(&mut info), ProcessManager::current_pid()) + .map(|x| x as usize); + + compiler_fence(core::sync::atomic::Ordering::SeqCst); + return Err(SystemError::EPIPE); + } + } + } } // 如果管道空间不够 diff --git a/kernel/src/net/socket/inet/stream/mod.rs b/kernel/src/net/socket/inet/stream/mod.rs index 68124cfe..3cde0925 100644 --- a/kernel/src/net/socket/inet/stream/mod.rs +++ b/kernel/src/net/socket/inet/stream/mod.rs @@ -328,26 +328,23 @@ impl Socket for TcpSocket { .recv_buffer_size() } - fn shutdown(&self, how: ShutdownTemp) -> Result<(), SystemError> { let self_shutdown = self.shutdown.get().bits(); let diff = how.bits().difference(self_shutdown); - match diff.is_empty(){ - true => { - return Ok(()) - }, + match diff.is_empty() { + true => return Ok(()), false => { - if diff.contains(ShutdownBit::SHUT_RD){ + if diff.contains(ShutdownBit::SHUT_RD) { self.shutdown.recv_shutdown(); // TODO 协议栈处理 } - if diff.contains(ShutdownBit::SHUT_WR){ + if diff.contains(ShutdownBit::SHUT_WR) { self.shutdown.send_shutdown(); // TODO 协议栈处理 } - }, + } } - Ok(()) + Ok(()) } fn close(&self) -> Result<(), SystemError> { diff --git a/kernel/src/process/mod.rs b/kernel/src/process/mod.rs index 0fd4c672..4722cb7c 100644 --- a/kernel/src/process/mod.rs +++ b/kernel/src/process/mod.rs @@ -275,7 +275,12 @@ impl ProcessManager { // avoid deadlock drop(writer); - let rq = cpu_rq(pcb.sched_info().on_cpu().unwrap().data() as usize); + let rq = cpu_rq( + pcb.sched_info() + .on_cpu() + .unwrap_or(smp_get_processor_id()) + .data() as usize, + ); let (rq, _guard) = rq.self_lock(); rq.update_rq_clock(); diff --git a/kernel/src/sched/clock.rs b/kernel/src/sched/clock.rs index d203e9d1..c1eed405 100644 --- a/kernel/src/sched/clock.rs +++ b/kernel/src/sched/clock.rs @@ -1,12 +1,12 @@ //! 这个文件实现的是调度过程中涉及到的时钟 //! -use crate::{arch::CurrentTimeArch, time::TimeArch}; +use crate::{arch::CurrentTimeArch, smp::cpu::ProcessorId, time::TimeArch}; pub struct SchedClock; impl SchedClock { #[inline] - pub fn sched_clock_cpu(_cpu: usize) -> u64 { + pub fn sched_clock_cpu(_cpu: ProcessorId) -> u64 { #[cfg(target_arch = "x86_64")] { if crate::arch::driver::tsc::TSCManager::cpu_khz() == 0 { diff --git a/kernel/src/sched/cputime.rs b/kernel/src/sched/cputime.rs index 8d7b0f15..c372238c 100644 --- a/kernel/src/sched/cputime.rs +++ b/kernel/src/sched/cputime.rs @@ -1,14 +1,17 @@ use core::sync::atomic::{compiler_fence, AtomicUsize, Ordering}; use crate::{ - arch::CurrentIrqArch, exception::InterruptArch, process::ProcessControlBlock, - smp::core::smp_get_processor_id, time::jiffies::TICK_NESC, + arch::CurrentIrqArch, + exception::InterruptArch, + process::ProcessControlBlock, + smp::{core::smp_get_processor_id, cpu::ProcessorId}, + time::jiffies::TICK_NESC, }; use alloc::sync::Arc; use super::{clock::SchedClock, cpu_irq_time}; -pub fn irq_time_read(cpu: usize) -> u64 { +pub fn irq_time_read(cpu: ProcessorId) -> u64 { compiler_fence(Ordering::SeqCst); let irqtime = cpu_irq_time(cpu); @@ -49,7 +52,7 @@ impl IrqTime { } pub fn irqtime_start() { - let cpu = smp_get_processor_id().data() as usize; + let cpu = smp_get_processor_id(); let irq_time = cpu_irq_time(cpu); compiler_fence(Ordering::SeqCst); irq_time.irq_start_time = SchedClock::sched_clock_cpu(cpu) as u64; @@ -58,7 +61,7 @@ impl IrqTime { pub fn irqtime_account_irq(_pcb: Arc) { compiler_fence(Ordering::SeqCst); - let cpu = smp_get_processor_id().data() as usize; + let cpu = smp_get_processor_id(); let irq_time = cpu_irq_time(cpu); compiler_fence(Ordering::SeqCst); let delta = SchedClock::sched_clock_cpu(cpu) as u64 - irq_time.irq_start_time; @@ -93,7 +96,7 @@ impl CpuTimeFunc { let mut accounted = Self::steal_account_process_time(max); if accounted < max { - let irqtime = cpu_irq_time(smp_get_processor_id().data() as usize); + let irqtime = cpu_irq_time(smp_get_processor_id()); accounted += irqtime.irqtime_tick_accounted(max - accounted); } diff --git a/kernel/src/sched/mod.rs b/kernel/src/sched/mod.rs index f453673e..f21ff6cc 100644 --- a/kernel/src/sched/mod.rs +++ b/kernel/src/sched/mod.rs @@ -63,8 +63,8 @@ pub const SCHED_CAPACITY_SHIFT: u64 = SCHED_FIXEDPOINT_SHIFT; pub const SCHED_CAPACITY_SCALE: u64 = 1 << SCHED_CAPACITY_SHIFT; #[inline] -pub fn cpu_irq_time(cpu: usize) -> &'static mut IrqTime { - unsafe { CPU_IRQ_TIME.as_mut().unwrap()[cpu] } +pub fn cpu_irq_time(cpu: ProcessorId) -> &'static mut IrqTime { + unsafe { CPU_IRQ_TIME.as_mut().unwrap()[cpu.data() as usize] } } #[inline] @@ -289,7 +289,7 @@ pub struct CpuRunQueue { lock: SpinLock<()>, lock_on_who: AtomicUsize, - cpu: usize, + cpu: ProcessorId, clock_task: u64, clock: u64, prev_irq_time: u64, @@ -329,7 +329,7 @@ pub struct CpuRunQueue { } impl CpuRunQueue { - pub fn new(cpu: usize) -> Self { + pub fn new(cpu: ProcessorId) -> Self { Self { lock: SpinLock::new(()), lock_on_who: AtomicUsize::new(usize::MAX), @@ -460,6 +460,7 @@ impl CpuRunQueue { self.enqueue_task(pcb.clone(), flags); *pcb.sched_info().on_rq.lock_irqsave() = OnRq::Queued; + pcb.sched_info().set_on_cpu(Some(self.cpu)); } /// 检查对应的task是否可以抢占当前运行的task @@ -638,7 +639,7 @@ impl CpuRunQueue { let cpu = self.cpu; - if cpu == smp_get_processor_id().data() as usize { + if cpu == smp_get_processor_id() { // assert!( // Arc::ptr_eq(¤t, &ProcessManager::current_pcb()), // "rq current name {} process current {}", @@ -653,7 +654,7 @@ impl CpuRunQueue { } // 向目标cpu发送重调度ipi - send_resched_ipi(ProcessorId::new(cpu as u32)); + send_resched_ipi(cpu); } /// 选择下一个task @@ -986,7 +987,7 @@ pub fn sched_init() { let mut cpu_runqueue = Vec::with_capacity(PerCpu::MAX_CPU_NUM as usize); for cpu in 0..PerCpu::MAX_CPU_NUM as usize { - let rq = Arc::new(CpuRunQueue::new(cpu)); + let rq = Arc::new(CpuRunQueue::new(ProcessorId::new(cpu as u32))); rq.cfs.force_mut().set_rq(Arc::downgrade(&rq)); cpu_runqueue.push(rq); } diff --git a/user/apps/test_fifo_write/Makefile b/user/apps/test_fifo_write/Makefile new file mode 100644 index 00000000..ebc0d6c8 --- /dev/null +++ b/user/apps/test_fifo_write/Makefile @@ -0,0 +1,20 @@ +ifeq ($(ARCH), x86_64) + CROSS_COMPILE=x86_64-linux-musl- +else ifeq ($(ARCH), riscv64) + CROSS_COMPILE=riscv64-linux-musl- +endif + +CC=$(CROSS_COMPILE)gcc + +.PHONY: all +all: main.c + $(CC) -static -o test_fifo_write main.c + +.PHONY: install clean +install: all + mv test_fifo_write $(DADK_CURRENT_BUILD_DIR)/test_fifo_write + +clean: + rm test_fifo_write *.o + +fmt: diff --git a/user/apps/test_fifo_write/main.c b/user/apps/test_fifo_write/main.c new file mode 100644 index 00000000..c4f91d45 --- /dev/null +++ b/user/apps/test_fifo_write/main.c @@ -0,0 +1,210 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define TEST_ASSERT(left, right, success_msg, fail_msg) \ + do { \ + if ((left) == (right)) { \ + printf("[PASS] %s\n", success_msg); \ + } else { \ + printf("[FAIL] %s: Expected %d, but got %d\n", fail_msg, (right), \ + (left)); \ + } \ + } while (0) + +#define FIFO_PATH "/bin/test_fifo" // 使用 /tmp 目录避免权限问题 + +typedef struct { + int fd; + int error_code; +} FifoWriteResult; + +// 信号处理函数 +void sigpipe_handler(int signo) { + if (signo == SIGPIPE) { + printf("Received SIGPIPE signal. Write operation failed.\n"); + } +} + +const char *scenarios[] = {"No readers (FIFO never had readers)", + "Reader exists but disconnects", + "Active reader exists"}; + +FifoWriteResult test_fifo_write(int scenario_index, int nonblocking) { + FifoWriteResult result = {.fd = -1, .error_code = 0}; + int fd; + const char *data = "Hello, FIFO!"; + + // Set write mode and non-blocking flag + int flags = O_WRONLY; + if (nonblocking) { + flags |= O_NONBLOCK; + } + + // Open the FIFO write end + fd = open(FIFO_PATH, flags); + if (fd == -1) { + result.fd = fd; + result.error_code = errno; + + if (errno == ENXIO) { + printf("Result: Failed to open FIFO for writing (ENXIO: No readers).\n"); + } else { + perror("Failed to open FIFO for writing"); + } + return result; // Return early with error details + } + + // Write data + ssize_t bytes_written = write(fd, data, strlen(data)); + if (bytes_written == -1) { + result.error_code = errno; + + if (bytes_written == -1) { + if (errno == EPIPE) { + printf("Result: Write failed with EPIPE (no readers available).\n"); + } else if (errno == ENXIO) { + printf("Result: Write failed with ENXIO (FIFO never had readers).\n"); + } else if (errno == EAGAIN) { + printf("Result: Write failed with EAGAIN (nonblocking write, pipe full " + "or no readers).\n"); + } else { + perror("Write failed with an unexpected error"); + } + } else { + printf("Result: Write succeeded. Bytes written: %zd\n", bytes_written); + } + + result.fd = fd; + close(fd); + return result; // Return with fd and error_code + } +} + +void test_case1(int nonblocking) { + // Case 1: No readers (FIFO never had readers) + FifoWriteResult result = test_fifo_write(0, nonblocking); + + char buffer[100]; + sprintf(buffer, "Fail with unexpected error %d", result.error_code); + TEST_ASSERT(result.error_code, ENXIO, "write(2) fails with the error ENXIO", + buffer); +} + +void test_case2(int nonblocking) { + pid_t reader_pid; + + // Case 2: Reader exists but disconnects + reader_pid = fork(); + if (reader_pid == 0) { + // Child process acts as a reader + int reader_fd = open(FIFO_PATH, O_RDONLY); + if (reader_fd == -1) { + perror("Reader failed to open FIFO"); + exit(EXIT_FAILURE); + } + sleep(2); // Simulate a brief existence of the reader + close(reader_fd); + exit(EXIT_SUCCESS); + } + + sleep(5); // Ensure the reader has opened the FIFO + FifoWriteResult result = test_fifo_write(1, nonblocking); + waitpid(reader_pid, NULL, 0); // Wait for the reader process to exit + + if (nonblocking) { + TEST_ASSERT(result.error_code, EPIPE, + "Non-Blocking Write failed with EPIPE", + "Non-Blocking Write failed with wrong error type"); + } else { + TEST_ASSERT(result.error_code, EPIPE, "Blocking Write failed with EPIPE", + "Blocking Write failed with wrong error type"); + } +} + +void test_case3(int nonblocking) { + pid_t reader_pid; + + // Case 3: Active reader exists + reader_pid = fork(); + if (reader_pid == 0) { + // Child process acts as a reader + int reader_fd = open(FIFO_PATH, O_RDONLY); + if (reader_fd == -1) { + perror("Reader failed to open FIFO"); + exit(EXIT_FAILURE); + } + sleep(5); // Keep the reader active + close(reader_fd); + exit(EXIT_SUCCESS); + } + + sleep(1); // Ensure the reader has opened the FIFO + FifoWriteResult result = test_fifo_write(2, nonblocking); + + waitpid(reader_pid, NULL, 0); // Wait for the reader process to exit + + TEST_ASSERT(result.error_code, 0, "write succeed", "write failed"); +} + +void run_tests(int nonblocking) { + for (int i = 0; i < 3; i++) { + printf("\n--- Testing: %s (nonblocking=%d) ---\n", scenarios[i], + nonblocking); + switch (i) { + case 0: + // test_case1(nonblocking); + break; + case 1: + test_case2(nonblocking); + break; + case 2: + // test_case3(nonblocking); + break; + } + } +} + +void test_blocking() { + // 创建 FIFO + if (mkfifo(FIFO_PATH, 0666) == -1 && errno != EEXIST) { + perror("mkfifo failed"); + exit(EXIT_FAILURE); + } + + // 测试阻塞模式下的三种情况 + printf("========== Testing Blocking Mode ==========\n"); + run_tests(0); // 阻塞模式 + // 删除 FIFO + unlink(FIFO_PATH); +} + +void test_non_blocking() { + // 创建 FIFO + if (mkfifo(FIFO_PATH, 0666) == -1 && errno != EEXIST) { + perror("mkfifo failed"); + exit(EXIT_FAILURE); + } + // 测试非阻塞模式下的三种情况 + printf("\n========== Testing Nonblocking Mode ==========\n"); + run_tests(1); // 非阻塞模式 + // 删除 FIFO + unlink(FIFO_PATH); +} + +int main() { + // 设置 SIGPIPE 信号处理 + signal(SIGPIPE, sigpipe_handler); + +// test_blocking(); + test_non_blocking(); + + printf("\nAll tests completed.\n"); + return 0; +} \ No newline at end of file diff --git a/user/dadk/config/test_fifo_write_0_1_0.toml b/user/dadk/config/test_fifo_write_0_1_0.toml new file mode 100644 index 00000000..aa29d3b9 --- /dev/null +++ b/user/dadk/config/test_fifo_write_0_1_0.toml @@ -0,0 +1,41 @@ +# 用户程序名称 +name = "test_fifo_write" +# 版本号 +version = "0.1.0" +# 用户程序描述信息 +description = "一个用来测试fifo_write行为的app" + +# (可选)默认: false 是否只构建一次,如果为true,DADK会在构建成功后,将构建结果缓存起来,下次构建时,直接使用缓存的构建结果 +build-once = false +# (可选) 默认: false 是否只安装一次,如果为true,DADK会在安装成功后,不再重复安装 +install-once = false +# 目标架构 +# 可选值:"x86_64", "aarch64", "riscv64" +target-arch = ["x86_64"] + +# 任务源 +[task-source] +# 构建类型 +# 可选值:"build-from_source", "install-from-prebuilt" +type = "build-from-source" +# 构建来源 +# "build_from_source" 可选值:"git", "local", "archive" +# "install_from_prebuilt" 可选值:"local", "archive" +source = "local" +# 路径或URL +source-path = "user/apps/test_fifo_write" + +# 构建相关信息 +[build] +# (可选)构建命令 +build-command = "make install" + +# 安装相关信息 +[install] +# (可选)安装到DragonOS的路径 +in-dragonos-path = "/bin" + +# clean相关信息 +[clean] +# (可选)清除命令 +clean-command = "make clean"