From 94eba6d85eb9e62ddd904c1132d556b808cc3174 Mon Sep 17 00:00:00 2001 From: Ruihan Li Date: Sat, 6 Jul 2024 18:20:02 +0800 Subject: [PATCH] Add unit tests about blocking behavior --- kernel/aster-nix/src/fs/utils/channel.rs | 121 ++++++++++++++++++++++- 1 file changed, 119 insertions(+), 2 deletions(-) diff --git a/kernel/aster-nix/src/fs/utils/channel.rs b/kernel/aster-nix/src/fs/utils/channel.rs index f219c8c1d..de88e18c9 100644 --- a/kernel/aster-nix/src/fs/utils/channel.rs +++ b/kernel/aster-nix/src/fs/utils/channel.rs @@ -478,10 +478,15 @@ fn check_status_flags(flags: StatusFlags) -> Result<()> { #[cfg(ktest)] mod test { use alloc::sync::Arc; + use core::sync::atomic; - use ostd::prelude::*; + use ostd::{prelude::*, sync::AtomicBits}; - use crate::fs::utils::Channel; + use super::*; + use crate::thread::{ + kernel_thread::{KernelThreadExt, ThreadOptions}, + Thread, + }; #[ktest] fn test_non_copy() { @@ -503,4 +508,116 @@ mod test { assert_eq!(data, expected_data); } } + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum Ordering { + ProduceThenConsume, + ConsumeThenProduce, + } + + fn test_blocking(produce: P, consume: C, ordering: Ordering) + where + P: Fn(Producer) + Sync + Send + 'static, + C: Fn(Consumer) + Sync + Send + 'static, + { + let channel = Channel::with_capacity(1).unwrap(); + let (producer, consumer) = channel.split(); + + // FIXME: `ThreadOptions::new` currently accepts `Fn`, forcing us to use `SpinLock` to gain + // internal mutability. We should avoid this `SpinLock` by making `ThreadOptions::new` + // accept `FnOnce`. + let producer_with_lock = SpinLock::new(Some(producer)); + let consumer_with_lock = SpinLock::new(Some(consumer)); + + let signal_producer = Arc::new(AtomicBool::new(false)); + let signal_consumer = signal_producer.clone(); + + let producer = Thread::spawn_kernel_thread(ThreadOptions::new(move || { + let producer = producer_with_lock.lock().take().unwrap(); + + if ordering == Ordering::ConsumeThenProduce { + while !signal_producer.load(atomic::Ordering::Relaxed) { + Thread::yield_now(); + } + } else { + signal_producer.store(true, atomic::Ordering::Relaxed); + } + + produce(producer); + })); + + let consumer = Thread::spawn_kernel_thread(ThreadOptions::new(move || { + let consumer = consumer_with_lock.lock().take().unwrap(); + + if ordering == Ordering::ProduceThenConsume { + while !signal_consumer.load(atomic::Ordering::Relaxed) { + Thread::yield_now(); + } + } else { + signal_consumer.store(true, atomic::Ordering::Relaxed); + } + + consume(consumer); + })); + + producer.join(); + consumer.join(); + } + + #[ktest] + fn test_read_empty() { + test_blocking( + |producer| { + assert_eq!(producer.write(&[1]).unwrap(), 1); + }, + |consumer| { + let mut buf = [0; 1]; + assert_eq!(consumer.read(&mut buf).unwrap(), 1); + assert_eq!(&buf, &[1]); + }, + Ordering::ConsumeThenProduce, + ); + } + + #[ktest] + fn test_write_full() { + test_blocking( + |producer| { + assert_eq!(producer.write(&[1, 2]).unwrap(), 1); + assert_eq!(producer.write(&[2]).unwrap(), 1); + }, + |consumer| { + let mut buf = [0; 2]; + assert_eq!(consumer.read(&mut buf).unwrap(), 1); + assert_eq!(&buf[..1], &[1]); + assert_eq!(consumer.read(&mut buf).unwrap(), 1); + assert_eq!(&buf[..1], &[2]); + }, + Ordering::ProduceThenConsume, + ); + } + + #[ktest] + fn test_read_closed() { + test_blocking( + |producer| drop(producer), + |consumer| { + let mut buf = [0; 1]; + assert_eq!(consumer.read(&mut buf).unwrap(), 0); + }, + Ordering::ConsumeThenProduce, + ); + } + + #[ktest] + fn test_write_closed() { + test_blocking( + |producer| { + assert_eq!(producer.write(&[1, 2]).unwrap(), 1); + assert_eq!(producer.write(&[2]).unwrap_err().error(), Errno::EPIPE); + }, + |consumer| drop(consumer), + Ordering::ProduceThenConsume, + ); + } }