diff --git a/kernel/aster-nix/src/process/posix_thread/futex.rs b/kernel/aster-nix/src/process/posix_thread/futex.rs index 7e90fae06..1a3f258c0 100644 --- a/kernel/aster-nix/src/process/posix_thread/futex.rs +++ b/kernel/aster-nix/src/process/posix_thread/futex.rs @@ -2,12 +2,14 @@ #![allow(dead_code)] -use core::sync::atomic::{AtomicBool, Ordering}; - -use ostd::{cpu::num_cpus, sync::WaitQueue}; +use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink}; +use ostd::{ + cpu::num_cpus, + sync::{Waiter, Waker}, +}; use spin::Once; -use crate::{prelude::*, thread::Tid, util::read_val_from_user}; +use crate::{prelude::*, util::read_val_from_user}; type FutexBitSet = u32; type FutexBucketRef = Arc>; @@ -32,22 +34,24 @@ pub fn futex_wait_bitset( "futex_wait_bitset addr: {:#x}, val: {}, timeout: {:?}, bitset: {:#x}", futex_addr, futex_val, timeout, bitset ); - let futex_key = FutexKey::new(futex_addr); - let (_, futex_bucket_ref) = get_futex_bucket(futex_key); + let futex_key = FutexKey::new(futex_addr, bitset); + let (futex_item, waiter) = FutexItem::create(futex_key); + let (_, futex_bucket_ref) = get_futex_bucket(futex_key); // lock futex bucket ref here to avoid data race let mut futex_bucket = futex_bucket_ref.lock(); if futex_key.load_val() != futex_val { - return_errno_with_message!(Errno::EINVAL, "futex value does not match"); + return_errno_with_message!(Errno::EAGAIN, "futex value does not match"); } - let futex_item = FutexItem::new(futex_key, bitset); - futex_bucket.enqueue_item(futex_item.clone()); + + futex_bucket.add_item(futex_item); // drop lock drop(futex_bucket); - // Wait on the futex item - futex_item.wait(timeout.clone()); + + // TODO: wait on the futex item with a timeout. + waiter.wait(); Ok(()) } @@ -68,10 +72,10 @@ pub fn futex_wake_bitset( futex_addr, max_count, bitset ); - let futex_key = FutexKey::new(futex_addr); + let futex_key = FutexKey::new(futex_addr, bitset); let (_, futex_bucket_ref) = get_futex_bucket(futex_key); let mut futex_bucket = futex_bucket_ref.lock(); - let res = futex_bucket.dequeue_and_wake_items(futex_key, max_count, bitset); + let res = futex_bucket.remove_and_wake_items(futex_key, max_count); drop(futex_bucket); Ok(res) @@ -88,16 +92,15 @@ pub fn futex_requeue( return futex_wake(futex_addr, max_nwakes); } - let futex_key = FutexKey::new(futex_addr); - let futex_new_key = FutexKey::new(futex_new_addr); + let futex_key = FutexKey::new(futex_addr, FUTEX_BITSET_MATCH_ANY); + let futex_new_key = FutexKey::new(futex_new_addr, FUTEX_BITSET_MATCH_ANY); let (bucket_idx, futex_bucket_ref) = get_futex_bucket(futex_key); let (new_bucket_idx, futex_new_bucket_ref) = get_futex_bucket(futex_new_key); let nwakes = { if bucket_idx == new_bucket_idx { let mut futex_bucket = futex_bucket_ref.lock(); - let nwakes = - futex_bucket.dequeue_and_wake_items(futex_key, max_nwakes, FUTEX_BITSET_MATCH_ANY); + let nwakes = futex_bucket.remove_and_wake_items(futex_key, max_nwakes); futex_bucket.update_item_keys(futex_key, futex_new_key, max_nrequeues); drop(futex_bucket); nwakes @@ -115,8 +118,7 @@ pub fn futex_requeue( } }; - let nwakes = - futex_bucket.dequeue_and_wake_items(futex_key, max_nwakes, FUTEX_BITSET_MATCH_ANY); + let nwakes = futex_bucket.remove_and_wake_items(futex_key, max_nwakes); futex_bucket.requeue_items_to_another_bucket( futex_key, &mut futex_new_bucket, @@ -189,63 +191,74 @@ impl FutexBucketVec { } struct FutexBucket { - queue: VecDeque, + items: LinkedList, } +intrusive_adapter!(FutexItemAdapter = Box: FutexItem { link: LinkedListAtomicLink }); + impl FutexBucket { pub fn new() -> FutexBucket { FutexBucket { - queue: VecDeque::new(), + items: LinkedList::new(FutexItemAdapter::new()), } } - pub fn enqueue_item(&mut self, item: FutexItem) { - self.queue.push_back(item); + pub fn add_item(&mut self, item: Box) { + self.items.push_back(item); } - pub fn dequeue_item(&mut self, item: &FutexItem) { - let item_i = self - .queue - .iter() - .position(|futex_item| *futex_item == *item); - if let Some(item_i) = item_i { - self.queue.remove(item_i).unwrap(); - } - } + pub fn remove_item(&mut self, item: &FutexItem) { + let mut item_cursor = self.items.front_mut(); + while !item_cursor.is_null() { + // The item_cursor has been checked not null. + let futex_item = item_cursor.get().unwrap(); - pub fn dequeue_and_wake_items( - &mut self, - key: FutexKey, - max_count: usize, - bitset: FutexBitSet, - ) -> usize { - let mut count = 0; - let mut items_to_wake = Vec::new(); - - self.queue.retain(|item| { - if count >= max_count || key != item.key || (bitset & item.bitset) == 0 { - true + if !futex_item.match_up(item) { + item_cursor.move_next(); + continue; } else { - items_to_wake.push(item.clone()); - count += 1; - false + let _ = item_cursor.remove(); + break; } - }); + } + } + + pub fn remove_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize { + let mut count = 0; + let mut item_cursor = self.items.front_mut(); + while !item_cursor.is_null() && count < max_count { + // The item_cursor has been checked not null. + let item = item_cursor.get().unwrap(); + + if !item.key.match_up(&key) { + item_cursor.move_next(); + continue; + } + + let item = item_cursor.remove().unwrap(); + item.wake(); + count += 1; + } - FutexItem::batch_wake(&items_to_wake); count } pub fn update_item_keys(&mut self, key: FutexKey, new_key: FutexKey, max_count: usize) { let mut count = 0; - for item in self.queue.iter_mut() { - if count == max_count { - break; - } - if item.key == key { - item.key = new_key; - count += 1; + let mut item_cursor = self.items.front_mut(); + while !item_cursor.is_null() && count < max_count { + // The item_cursor has been checked not null. + let item = item_cursor.get().unwrap(); + + if !item.key.match_up(&key) { + item_cursor.move_next(); + continue; } + + let mut item = item_cursor.remove().unwrap(); + item.key = new_key; + item_cursor.insert_before(item); + count += 1; } } @@ -257,72 +270,79 @@ impl FutexBucket { max_nrequeues: usize, ) { let mut count = 0; + let mut item_cursor = self.items.front_mut(); + while !item_cursor.is_null() && count < max_nrequeues { + // The item_cursor has been checked not null. + let item = item_cursor.get().unwrap(); - self.queue.retain(|item| { - if count >= max_nrequeues || key != item.key { - true - } else { - let mut new_item = item.clone(); - new_item.key = new_key; - another.enqueue_item(new_item); - count += 1; - false + if !item.key.match_up(&key) { + item_cursor.move_next(); + continue; } - }); + + let mut item = item_cursor.remove().unwrap(); + item.key = new_key; + another.add_item(item); + count += 1; + } } } -#[derive(Debug, PartialEq, Clone)] struct FutexItem { key: FutexKey, - bitset: FutexBitSet, - waiter: FutexWaiterRef, + waker: Arc, + link: LinkedListAtomicLink, } impl FutexItem { - pub fn new(key: FutexKey, bitset: FutexBitSet) -> Self { - FutexItem { + pub fn create(key: FutexKey) -> (Box, Waiter) { + let (waiter, waker) = Waiter::new_pair(); + let futex_item = Box::new(FutexItem { key, - bitset, - waiter: Arc::new(FutexWaiter::new()), - } + waker, + link: LinkedListAtomicLink::new(), + }); + + (futex_item, waiter) } pub fn wake(&self) { - self.waiter.wake(); + self.waker.wake_up(); } - pub fn wait(&self, timeout: Option) { - self.waiter.wait(timeout); - } - - pub fn waiter(&self) -> &FutexWaiterRef { - &self.waiter - } - - pub fn batch_wake(items: &[FutexItem]) { - let waiters = items.iter().map(|item| item.waiter()).collect::>(); - FutexWaiter::batch_wake(&waiters); + pub fn match_up(&self, another: &Self) -> bool { + self.key.match_up(&another.key) } } // The addr of a futex, it should be used to mark different futex word -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct FutexKey(Vaddr); +#[derive(Debug, Clone, Copy)] +struct FutexKey { + addr: Vaddr, + bitset: FutexBitSet, +} impl FutexKey { - pub fn new(futex_addr: Vaddr) -> Self { - FutexKey(futex_addr as _) + pub fn new(addr: Vaddr, bitset: FutexBitSet) -> Self { + Self { addr, bitset } } pub fn load_val(&self) -> i32 { // FIXME: how to implement a atomic load? warn!("implement an atomic load"); - read_val_from_user(self.0).unwrap() + read_val_from_user(self.addr).unwrap() } pub fn addr(&self) -> Vaddr { - self.0 + self.addr + } + + pub fn bitset(&self) -> FutexBitSet { + self.bitset + } + + pub fn match_up(&self, another: &Self) -> bool { + self.addr == another.addr && (self.bitset & another.bitset) != 0 } } @@ -388,75 +408,3 @@ pub fn futex_op_and_flags_from_u32(bits: u32) -> Result<(FutexOp, FutexFlags)> { }; Ok((op, flags)) } - -type FutexWaiterRef = Arc; - -struct FutexWaiter { - is_woken: AtomicBool, - wait_queue: WaitQueue, - tid: Tid, -} - -impl Debug for FutexWaiter { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("FutexWaiter") - .field("is_woken", &self.is_woken) - .field("tid", &self.tid) - .finish() - } -} - -impl PartialEq for FutexWaiter { - fn eq(&self, other: &Self) -> bool { - self.tid == other.tid - } -} - -impl FutexWaiter { - pub fn new() -> Self { - Self { - is_woken: AtomicBool::new(false), - tid: current_thread!().tid(), - wait_queue: WaitQueue::new(), - } - } - - pub fn wait(&self, timeout: Option) { - let current_thread = current_thread!(); - if current_thread.tid() != self.tid { - return; - } - - self.is_woken.store(false, Ordering::SeqCst); - let wake_cond = || { - if self.is_woken() { - Some(()) - } else { - None - } - }; - - if let Some(_timeout) = timeout { - todo!() - } else { - self.wait_queue.wait_until(wake_cond); - } - } - - pub fn wake(&self) { - if !self.is_woken() { - self.is_woken.store(true, Ordering::SeqCst); - self.wait_queue.wake_all(); - } - } - - pub fn is_woken(&self) -> bool { - self.is_woken.load(Ordering::SeqCst) - } - - pub fn batch_wake(waiters: &[&FutexWaiterRef]) { - waiters.iter().for_each(|waiter| { - waiter.wake(); - }); - } -}