mirror of
https://github.com/asterinas/asterinas.git
synced 2025-06-18 03:56:42 +00:00
Optimize futex implementation
This commit is contained in:
committed by
Tate, Hongliang Tian
parent
b5f8e4e7bb
commit
cd9e5d11d2
@ -2,12 +2,14 @@
|
|||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use core::sync::atomic::{AtomicBool, Ordering};
|
use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
|
||||||
|
use ostd::{
|
||||||
use ostd::{cpu::num_cpus, sync::WaitQueue};
|
cpu::num_cpus,
|
||||||
|
sync::{Waiter, Waker},
|
||||||
|
};
|
||||||
use spin::Once;
|
use spin::Once;
|
||||||
|
|
||||||
use crate::{prelude::*, thread::Tid, util::read_val_from_user};
|
use crate::{prelude::*, util::read_val_from_user};
|
||||||
|
|
||||||
type FutexBitSet = u32;
|
type FutexBitSet = u32;
|
||||||
type FutexBucketRef = Arc<Mutex<FutexBucket>>;
|
type FutexBucketRef = Arc<Mutex<FutexBucket>>;
|
||||||
@ -32,22 +34,24 @@ pub fn futex_wait_bitset(
|
|||||||
"futex_wait_bitset addr: {:#x}, val: {}, timeout: {:?}, bitset: {:#x}",
|
"futex_wait_bitset addr: {:#x}, val: {}, timeout: {:?}, bitset: {:#x}",
|
||||||
futex_addr, futex_val, timeout, bitset
|
futex_addr, futex_val, timeout, bitset
|
||||||
);
|
);
|
||||||
let futex_key = FutexKey::new(futex_addr);
|
let futex_key = FutexKey::new(futex_addr, bitset);
|
||||||
let (_, futex_bucket_ref) = get_futex_bucket(futex_key);
|
let (futex_item, waiter) = FutexItem::create(futex_key);
|
||||||
|
|
||||||
|
let (_, futex_bucket_ref) = get_futex_bucket(futex_key);
|
||||||
// lock futex bucket ref here to avoid data race
|
// lock futex bucket ref here to avoid data race
|
||||||
let mut futex_bucket = futex_bucket_ref.lock();
|
let mut futex_bucket = futex_bucket_ref.lock();
|
||||||
|
|
||||||
if futex_key.load_val() != futex_val {
|
if futex_key.load_val() != futex_val {
|
||||||
return_errno_with_message!(Errno::EINVAL, "futex value does not match");
|
return_errno_with_message!(Errno::EAGAIN, "futex value does not match");
|
||||||
}
|
}
|
||||||
let futex_item = FutexItem::new(futex_key, bitset);
|
|
||||||
futex_bucket.enqueue_item(futex_item.clone());
|
futex_bucket.add_item(futex_item);
|
||||||
|
|
||||||
// drop lock
|
// drop lock
|
||||||
drop(futex_bucket);
|
drop(futex_bucket);
|
||||||
// Wait on the futex item
|
|
||||||
futex_item.wait(timeout.clone());
|
// TODO: wait on the futex item with a timeout.
|
||||||
|
waiter.wait();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -68,10 +72,10 @@ pub fn futex_wake_bitset(
|
|||||||
futex_addr, max_count, bitset
|
futex_addr, max_count, bitset
|
||||||
);
|
);
|
||||||
|
|
||||||
let futex_key = FutexKey::new(futex_addr);
|
let futex_key = FutexKey::new(futex_addr, bitset);
|
||||||
let (_, futex_bucket_ref) = get_futex_bucket(futex_key);
|
let (_, futex_bucket_ref) = get_futex_bucket(futex_key);
|
||||||
let mut futex_bucket = futex_bucket_ref.lock();
|
let mut futex_bucket = futex_bucket_ref.lock();
|
||||||
let res = futex_bucket.dequeue_and_wake_items(futex_key, max_count, bitset);
|
let res = futex_bucket.remove_and_wake_items(futex_key, max_count);
|
||||||
drop(futex_bucket);
|
drop(futex_bucket);
|
||||||
|
|
||||||
Ok(res)
|
Ok(res)
|
||||||
@ -88,16 +92,15 @@ pub fn futex_requeue(
|
|||||||
return futex_wake(futex_addr, max_nwakes);
|
return futex_wake(futex_addr, max_nwakes);
|
||||||
}
|
}
|
||||||
|
|
||||||
let futex_key = FutexKey::new(futex_addr);
|
let futex_key = FutexKey::new(futex_addr, FUTEX_BITSET_MATCH_ANY);
|
||||||
let futex_new_key = FutexKey::new(futex_new_addr);
|
let futex_new_key = FutexKey::new(futex_new_addr, FUTEX_BITSET_MATCH_ANY);
|
||||||
let (bucket_idx, futex_bucket_ref) = get_futex_bucket(futex_key);
|
let (bucket_idx, futex_bucket_ref) = get_futex_bucket(futex_key);
|
||||||
let (new_bucket_idx, futex_new_bucket_ref) = get_futex_bucket(futex_new_key);
|
let (new_bucket_idx, futex_new_bucket_ref) = get_futex_bucket(futex_new_key);
|
||||||
|
|
||||||
let nwakes = {
|
let nwakes = {
|
||||||
if bucket_idx == new_bucket_idx {
|
if bucket_idx == new_bucket_idx {
|
||||||
let mut futex_bucket = futex_bucket_ref.lock();
|
let mut futex_bucket = futex_bucket_ref.lock();
|
||||||
let nwakes =
|
let nwakes = futex_bucket.remove_and_wake_items(futex_key, max_nwakes);
|
||||||
futex_bucket.dequeue_and_wake_items(futex_key, max_nwakes, FUTEX_BITSET_MATCH_ANY);
|
|
||||||
futex_bucket.update_item_keys(futex_key, futex_new_key, max_nrequeues);
|
futex_bucket.update_item_keys(futex_key, futex_new_key, max_nrequeues);
|
||||||
drop(futex_bucket);
|
drop(futex_bucket);
|
||||||
nwakes
|
nwakes
|
||||||
@ -115,8 +118,7 @@ pub fn futex_requeue(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let nwakes =
|
let nwakes = futex_bucket.remove_and_wake_items(futex_key, max_nwakes);
|
||||||
futex_bucket.dequeue_and_wake_items(futex_key, max_nwakes, FUTEX_BITSET_MATCH_ANY);
|
|
||||||
futex_bucket.requeue_items_to_another_bucket(
|
futex_bucket.requeue_items_to_another_bucket(
|
||||||
futex_key,
|
futex_key,
|
||||||
&mut futex_new_bucket,
|
&mut futex_new_bucket,
|
||||||
@ -189,63 +191,74 @@ impl FutexBucketVec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct FutexBucket {
|
struct FutexBucket {
|
||||||
queue: VecDeque<FutexItem>,
|
items: LinkedList<FutexItemAdapter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
intrusive_adapter!(FutexItemAdapter = Box<FutexItem>: FutexItem { link: LinkedListAtomicLink });
|
||||||
|
|
||||||
impl FutexBucket {
|
impl FutexBucket {
|
||||||
pub fn new() -> FutexBucket {
|
pub fn new() -> FutexBucket {
|
||||||
FutexBucket {
|
FutexBucket {
|
||||||
queue: VecDeque::new(),
|
items: LinkedList::new(FutexItemAdapter::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enqueue_item(&mut self, item: FutexItem) {
|
pub fn add_item(&mut self, item: Box<FutexItem>) {
|
||||||
self.queue.push_back(item);
|
self.items.push_back(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dequeue_item(&mut self, item: &FutexItem) {
|
pub fn remove_item(&mut self, item: &FutexItem) {
|
||||||
let item_i = self
|
let mut item_cursor = self.items.front_mut();
|
||||||
.queue
|
while !item_cursor.is_null() {
|
||||||
.iter()
|
// The item_cursor has been checked not null.
|
||||||
.position(|futex_item| *futex_item == *item);
|
let futex_item = item_cursor.get().unwrap();
|
||||||
if let Some(item_i) = item_i {
|
|
||||||
self.queue.remove(item_i).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn dequeue_and_wake_items(
|
if !futex_item.match_up(item) {
|
||||||
&mut self,
|
item_cursor.move_next();
|
||||||
key: FutexKey,
|
continue;
|
||||||
max_count: usize,
|
|
||||||
bitset: FutexBitSet,
|
|
||||||
) -> usize {
|
|
||||||
let mut count = 0;
|
|
||||||
let mut items_to_wake = Vec::new();
|
|
||||||
|
|
||||||
self.queue.retain(|item| {
|
|
||||||
if count >= max_count || key != item.key || (bitset & item.bitset) == 0 {
|
|
||||||
true
|
|
||||||
} else {
|
} else {
|
||||||
items_to_wake.push(item.clone());
|
let _ = item_cursor.remove();
|
||||||
count += 1;
|
break;
|
||||||
false
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize {
|
||||||
|
let mut count = 0;
|
||||||
|
let mut item_cursor = self.items.front_mut();
|
||||||
|
while !item_cursor.is_null() && count < max_count {
|
||||||
|
// The item_cursor has been checked not null.
|
||||||
|
let item = item_cursor.get().unwrap();
|
||||||
|
|
||||||
|
if !item.key.match_up(&key) {
|
||||||
|
item_cursor.move_next();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let item = item_cursor.remove().unwrap();
|
||||||
|
item.wake();
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
FutexItem::batch_wake(&items_to_wake);
|
|
||||||
count
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_item_keys(&mut self, key: FutexKey, new_key: FutexKey, max_count: usize) {
|
pub fn update_item_keys(&mut self, key: FutexKey, new_key: FutexKey, max_count: usize) {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
for item in self.queue.iter_mut() {
|
let mut item_cursor = self.items.front_mut();
|
||||||
if count == max_count {
|
while !item_cursor.is_null() && count < max_count {
|
||||||
break;
|
// The item_cursor has been checked not null.
|
||||||
}
|
let item = item_cursor.get().unwrap();
|
||||||
if item.key == key {
|
|
||||||
item.key = new_key;
|
if !item.key.match_up(&key) {
|
||||||
count += 1;
|
item_cursor.move_next();
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut item = item_cursor.remove().unwrap();
|
||||||
|
item.key = new_key;
|
||||||
|
item_cursor.insert_before(item);
|
||||||
|
count += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -257,72 +270,79 @@ impl FutexBucket {
|
|||||||
max_nrequeues: usize,
|
max_nrequeues: usize,
|
||||||
) {
|
) {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
|
let mut item_cursor = self.items.front_mut();
|
||||||
|
while !item_cursor.is_null() && count < max_nrequeues {
|
||||||
|
// The item_cursor has been checked not null.
|
||||||
|
let item = item_cursor.get().unwrap();
|
||||||
|
|
||||||
self.queue.retain(|item| {
|
if !item.key.match_up(&key) {
|
||||||
if count >= max_nrequeues || key != item.key {
|
item_cursor.move_next();
|
||||||
true
|
continue;
|
||||||
} else {
|
|
||||||
let mut new_item = item.clone();
|
|
||||||
new_item.key = new_key;
|
|
||||||
another.enqueue_item(new_item);
|
|
||||||
count += 1;
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
let mut item = item_cursor.remove().unwrap();
|
||||||
|
item.key = new_key;
|
||||||
|
another.add_item(item);
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Clone)]
|
|
||||||
struct FutexItem {
|
struct FutexItem {
|
||||||
key: FutexKey,
|
key: FutexKey,
|
||||||
bitset: FutexBitSet,
|
waker: Arc<Waker>,
|
||||||
waiter: FutexWaiterRef,
|
link: LinkedListAtomicLink,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FutexItem {
|
impl FutexItem {
|
||||||
pub fn new(key: FutexKey, bitset: FutexBitSet) -> Self {
|
pub fn create(key: FutexKey) -> (Box<Self>, Waiter) {
|
||||||
FutexItem {
|
let (waiter, waker) = Waiter::new_pair();
|
||||||
|
let futex_item = Box::new(FutexItem {
|
||||||
key,
|
key,
|
||||||
bitset,
|
waker,
|
||||||
waiter: Arc::new(FutexWaiter::new()),
|
link: LinkedListAtomicLink::new(),
|
||||||
}
|
});
|
||||||
|
|
||||||
|
(futex_item, waiter)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wake(&self) {
|
pub fn wake(&self) {
|
||||||
self.waiter.wake();
|
self.waker.wake_up();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wait(&self, timeout: Option<FutexTimeout>) {
|
pub fn match_up(&self, another: &Self) -> bool {
|
||||||
self.waiter.wait(timeout);
|
self.key.match_up(&another.key)
|
||||||
}
|
|
||||||
|
|
||||||
pub fn waiter(&self) -> &FutexWaiterRef {
|
|
||||||
&self.waiter
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn batch_wake(items: &[FutexItem]) {
|
|
||||||
let waiters = items.iter().map(|item| item.waiter()).collect::<Vec<_>>();
|
|
||||||
FutexWaiter::batch_wake(&waiters);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The addr of a futex, it should be used to mark different futex word
|
// The addr of a futex, it should be used to mark different futex word
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
struct FutexKey(Vaddr);
|
struct FutexKey {
|
||||||
|
addr: Vaddr,
|
||||||
|
bitset: FutexBitSet,
|
||||||
|
}
|
||||||
|
|
||||||
impl FutexKey {
|
impl FutexKey {
|
||||||
pub fn new(futex_addr: Vaddr) -> Self {
|
pub fn new(addr: Vaddr, bitset: FutexBitSet) -> Self {
|
||||||
FutexKey(futex_addr as _)
|
Self { addr, bitset }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_val(&self) -> i32 {
|
pub fn load_val(&self) -> i32 {
|
||||||
// FIXME: how to implement a atomic load?
|
// FIXME: how to implement a atomic load?
|
||||||
warn!("implement an atomic load");
|
warn!("implement an atomic load");
|
||||||
read_val_from_user(self.0).unwrap()
|
read_val_from_user(self.addr).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn addr(&self) -> Vaddr {
|
pub fn addr(&self) -> Vaddr {
|
||||||
self.0
|
self.addr
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bitset(&self) -> FutexBitSet {
|
||||||
|
self.bitset
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn match_up(&self, another: &Self) -> bool {
|
||||||
|
self.addr == another.addr && (self.bitset & another.bitset) != 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -388,75 +408,3 @@ pub fn futex_op_and_flags_from_u32(bits: u32) -> Result<(FutexOp, FutexFlags)> {
|
|||||||
};
|
};
|
||||||
Ok((op, flags))
|
Ok((op, flags))
|
||||||
}
|
}
|
||||||
|
|
||||||
type FutexWaiterRef = Arc<FutexWaiter>;
|
|
||||||
|
|
||||||
struct FutexWaiter {
|
|
||||||
is_woken: AtomicBool,
|
|
||||||
wait_queue: WaitQueue,
|
|
||||||
tid: Tid,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Debug for FutexWaiter {
|
|
||||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
||||||
f.debug_struct("FutexWaiter")
|
|
||||||
.field("is_woken", &self.is_woken)
|
|
||||||
.field("tid", &self.tid)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartialEq for FutexWaiter {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.tid == other.tid
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FutexWaiter {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
is_woken: AtomicBool::new(false),
|
|
||||||
tid: current_thread!().tid(),
|
|
||||||
wait_queue: WaitQueue::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn wait(&self, timeout: Option<FutexTimeout>) {
|
|
||||||
let current_thread = current_thread!();
|
|
||||||
if current_thread.tid() != self.tid {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.is_woken.store(false, Ordering::SeqCst);
|
|
||||||
let wake_cond = || {
|
|
||||||
if self.is_woken() {
|
|
||||||
Some(())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(_timeout) = timeout {
|
|
||||||
todo!()
|
|
||||||
} else {
|
|
||||||
self.wait_queue.wait_until(wake_cond);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn wake(&self) {
|
|
||||||
if !self.is_woken() {
|
|
||||||
self.is_woken.store(true, Ordering::SeqCst);
|
|
||||||
self.wait_queue.wake_all();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_woken(&self) -> bool {
|
|
||||||
self.is_woken.load(Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn batch_wake(waiters: &[&FutexWaiterRef]) {
|
|
||||||
waiters.iter().for_each(|waiter| {
|
|
||||||
waiter.wake();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Reference in New Issue
Block a user