diff --git a/kernel/aster-nix/src/fs/utils/channel.rs b/kernel/aster-nix/src/fs/utils/channel.rs
index f219c8c1d..de88e18c9 100644
--- a/kernel/aster-nix/src/fs/utils/channel.rs
+++ b/kernel/aster-nix/src/fs/utils/channel.rs
@@ -478,10 +478,15 @@ fn check_status_flags(flags: StatusFlags) -> Result<()> {
#[cfg(ktest)]
mod test {
use alloc::sync::Arc;
+ use core::sync::atomic;
- use ostd::prelude::*;
+ use ostd::{prelude::*, sync::AtomicBits};
- use crate::fs::utils::Channel;
+ use super::*;
+ use crate::thread::{
+ kernel_thread::{KernelThreadExt, ThreadOptions},
+ Thread,
+ };
#[ktest]
fn test_non_copy() {
@@ -503,4 +508,116 @@ mod test {
assert_eq!(data, expected_data);
}
}
+
+ #[derive(Clone, Copy, Debug, PartialEq, Eq)]
+ enum Ordering {
+ ProduceThenConsume,
+ ConsumeThenProduce,
+ }
+
+ fn test_blocking
(produce: P, consume: C, ordering: Ordering)
+ where
+ P: Fn(Producer) + Sync + Send + 'static,
+ C: Fn(Consumer) + Sync + Send + 'static,
+ {
+ let channel = Channel::with_capacity(1).unwrap();
+ let (producer, consumer) = channel.split();
+
+ // FIXME: `ThreadOptions::new` currently accepts `Fn`, forcing us to use `SpinLock` to gain
+ // internal mutability. We should avoid this `SpinLock` by making `ThreadOptions::new`
+ // accept `FnOnce`.
+ let producer_with_lock = SpinLock::new(Some(producer));
+ let consumer_with_lock = SpinLock::new(Some(consumer));
+
+ let signal_producer = Arc::new(AtomicBool::new(false));
+ let signal_consumer = signal_producer.clone();
+
+ let producer = Thread::spawn_kernel_thread(ThreadOptions::new(move || {
+ let producer = producer_with_lock.lock().take().unwrap();
+
+ if ordering == Ordering::ConsumeThenProduce {
+ while !signal_producer.load(atomic::Ordering::Relaxed) {
+ Thread::yield_now();
+ }
+ } else {
+ signal_producer.store(true, atomic::Ordering::Relaxed);
+ }
+
+ produce(producer);
+ }));
+
+ let consumer = Thread::spawn_kernel_thread(ThreadOptions::new(move || {
+ let consumer = consumer_with_lock.lock().take().unwrap();
+
+ if ordering == Ordering::ProduceThenConsume {
+ while !signal_consumer.load(atomic::Ordering::Relaxed) {
+ Thread::yield_now();
+ }
+ } else {
+ signal_consumer.store(true, atomic::Ordering::Relaxed);
+ }
+
+ consume(consumer);
+ }));
+
+ producer.join();
+ consumer.join();
+ }
+
+ #[ktest]
+ fn test_read_empty() {
+ test_blocking(
+ |producer| {
+ assert_eq!(producer.write(&[1]).unwrap(), 1);
+ },
+ |consumer| {
+ let mut buf = [0; 1];
+ assert_eq!(consumer.read(&mut buf).unwrap(), 1);
+ assert_eq!(&buf, &[1]);
+ },
+ Ordering::ConsumeThenProduce,
+ );
+ }
+
+ #[ktest]
+ fn test_write_full() {
+ test_blocking(
+ |producer| {
+ assert_eq!(producer.write(&[1, 2]).unwrap(), 1);
+ assert_eq!(producer.write(&[2]).unwrap(), 1);
+ },
+ |consumer| {
+ let mut buf = [0; 2];
+ assert_eq!(consumer.read(&mut buf).unwrap(), 1);
+ assert_eq!(&buf[..1], &[1]);
+ assert_eq!(consumer.read(&mut buf).unwrap(), 1);
+ assert_eq!(&buf[..1], &[2]);
+ },
+ Ordering::ProduceThenConsume,
+ );
+ }
+
+ #[ktest]
+ fn test_read_closed() {
+ test_blocking(
+ |producer| drop(producer),
+ |consumer| {
+ let mut buf = [0; 1];
+ assert_eq!(consumer.read(&mut buf).unwrap(), 0);
+ },
+ Ordering::ConsumeThenProduce,
+ );
+ }
+
+ #[ktest]
+ fn test_write_closed() {
+ test_blocking(
+ |producer| {
+ assert_eq!(producer.write(&[1, 2]).unwrap(), 1);
+ assert_eq!(producer.write(&[2]).unwrap_err().error(), Errno::EPIPE);
+ },
+ |consumer| drop(consumer),
+ Ordering::ProduceThenConsume,
+ );
+ }
}