add syscall futex

This commit is contained in:
Jianfeng Jiang
2022-09-29 19:25:28 +08:00
parent e29bb58d45
commit 895e5c340b
7 changed files with 558 additions and 33 deletions

View File

@ -15,4 +15,4 @@ pub const PAGE_SIZE_BITS: usize = 0xc;
pub const KVA_START: usize = (usize::MAX) << PAGE_SIZE_BITS;
pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Trace;
pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Info;

View File

@ -12,7 +12,8 @@ macro_rules! cpu_local {
/// Returns the number of CPUs.
pub fn num_cpus() -> u32 {
todo!()
// FIXME: we only start one cpu now.
1
}
/// Returns the ID of this CPU.

View File

@ -44,9 +44,9 @@ use bootloader::{
pub use mm::address::{align_down, align_up, is_aligned, virt_to_phys};
pub use trap::{allocate_irq, IrqAllocateHandle, TrapFrame};
use trap::{IrqCallbackHandle, IrqLine};
pub use util::AlignExt;
pub use vm::Pod;
use x86_64_util::enable_common_cpu_features;
pub use util::AlignExt;
static mut IRQ_CALLBACK_LIST: Vec<IrqCallbackHandle> = Vec::new();

View File

@ -1,4 +1,6 @@
use alloc::collections::VecDeque;
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use spin::mutex::Mutex;
use crate::{debug, task::Task};
@ -10,7 +12,7 @@ use crate::{debug, task::Task};
/// Other threads may invoke the `wake`-family methods of a wait queue to
/// wake up one or many waiter threads.
pub struct WaitQueue<D: Clone + Eq + PartialEq> {
waiters: Mutex<VecDeque<Waiter<D>>>,
waiters: Mutex<VecDeque<WaiterRef<D>>>,
}
impl<D: Clone + Eq + PartialEq> WaitQueue<D> {
@ -25,37 +27,53 @@ impl<D: Clone + Eq + PartialEq> WaitQueue<D> {
///
/// This method takes a closure that tests a user-given condition.
/// The method only returns if the condition returns Some(_).
/// A waker thread should first make the condition true, then invoke the
/// A waker thread should first make the condition Some(_), then invoke the
/// `wake`-family method. This ordering is important to ensure that waiter
/// threads do not lose any wakeup notifiations.
///
/// By taking a condition closure, this wait-wakeup mechanism becomes
/// By taking a condition closure, his wait-wakeup mechanism becomes
/// more efficient and robust.
pub fn wait_until<F, R>(&self, data: D, mut cond: F) -> R
where
F: FnMut() -> Option<R>,
{
let waiter = Waiter::new(data);
self.enqueue(&waiter);
let waiter = Arc::new(Waiter::new(data));
self.enqueue_waiter(&waiter);
loop {
if let Some(r) = cond() {
self.dequeue(&waiter);
self.dequeue_waiter(&waiter);
return r;
}
waiter.wait();
}
}
/// Wait on an waiter with data until the waiter is woken up.
/// Note this func cannot be implemented with wait_until. This func always requires the waiter become woken.
/// While wait_until does not check the waiter if cond is true.
/// TODO: This function can take a timeout param further.
pub fn wait_on(&self, data: D) {
let index = self
.waiters
.lock()
.iter()
.position(|waiter| *waiter.data() == data);
if let Some(index) = index {
let waiter = self.waiters.lock().iter().nth(index).unwrap().clone();
waiter.wait();
}
}
/// Wake one waiter thread, if there is one.
pub fn wake_one(&self) {
if let Some(waiter) = self.waiters.lock().front_mut() {
if let Some(waiter) = self.waiters.lock().front() {
waiter.wake_up();
}
}
/// Wake all waiter threads.
pub fn wake_all(&self) {
self.waiters.lock().iter_mut().for_each(|waiter| {
self.waiters.lock().iter().for_each(|waiter| {
waiter.wake_up();
});
}
@ -66,55 +84,147 @@ impl<D: Clone + Eq + PartialEq> WaitQueue<D> {
where
F: Fn(&D, &C) -> bool,
{
self.waiters.lock().iter_mut().for_each(|waiter| {
self.waiters.lock().iter().for_each(|waiter| {
if cond(waiter.data(), cond_data) {
waiter.wake_up()
}
})
}
fn enqueue(&self, waiter: &Waiter<D>) {
self.waiters.lock().push_back(waiter.clone());
/// Wake at most max_count waiters if given condition is true.
/// returns the number of woken waiters
pub fn batch_wake_and_deque<F, C>(&self, max_count: usize, cond_data: &C, cond: F) -> usize
where
F: Fn(&D, &C) -> bool,
{
let mut count = 0;
let mut waiters_to_wake = Vec::new();
self.waiters.lock().retain(|waiter| {
if count >= max_count || waiter.is_woken_up() || !cond(waiter.data(), cond_data) {
true
} else {
waiters_to_wake.push(waiter.clone());
count += 1;
false
}
});
waiters_to_wake.into_iter().for_each(|waiter| {
waiter.wake_up();
});
return count;
}
fn dequeue(&self, waiter: &Waiter<D>) {
let mut waiters_lock = self.waiters.lock();
let len = waiters_lock.len();
let mut index = 0;
for i in 0..len {
if waiters_lock[i] == *waiter {
index = i;
break;
/// create a waiter with given data, and enqueue
pub fn enqueue(&self, data: D) {
let waiter = Arc::new(Waiter::new(data));
self.enqueue_waiter(&waiter);
}
/// dequeue a waiter with given data
pub fn dequeue(&self, data: D) {
let waiter = Arc::new(Waiter::new(data));
self.dequeue_waiter(&waiter);
}
/// update the waiters data
/// if cond(old_data, old_value) is true.
/// The new data should be calculated by get_new_data(old_data, new_value).
pub fn update_waiters_data<F1, F2, C>(
&self,
cond: F1,
old_value: &C,
new_value: &C,
get_new_data: F2,
max_count: usize,
) where
F1: Fn(&C, &D) -> bool,
F2: Fn(&D, &C) -> D,
{
let mut waiters = self.waiters.lock();
let len = waiters.len();
let mut count = 0;
for index in 0..len {
let waiter = &waiters[index];
let old_data = waiter.data();
if cond(old_value, waiter.data()) {
let new_data = get_new_data(old_data, new_value);
let new_waiter = Arc::new(Waiter::new(new_data));
waiters[index] = new_waiter;
count += 1;
if count >= max_count {
break;
}
}
}
waiters_lock.remove(index);
}
/// remove waiters for which the cond returns true
pub fn remove_waiters<C, F>(&self, cond: F, cond_data: &C, max_count: usize) -> Vec<D>
where
F: Fn(&D, &C) -> bool,
{
let mut removed_waiters = Vec::new();
let mut count = 0;
self.waiters.lock().retain(|waiter| {
let data = waiter.data();
if count >= max_count || !cond(data, cond_data) {
true
} else {
count += 1;
removed_waiters.push(data.clone());
false
}
});
removed_waiters
}
fn enqueue_waiter(&self, waiter_ref: &WaiterRef<D>) {
self.waiters.lock().push_back(waiter_ref.clone());
}
fn dequeue_waiter(&self, waiter_ref: &WaiterRef<D>) {
let mut waiters_lock = self.waiters.lock();
let index = waiters_lock
.iter()
.position(|waiter_| *waiter_ref.data() == *waiter_.data());
if let Some(index) = index {
waiters_lock.remove(index);
}
drop(waiters_lock);
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
type WaiterRef<D> = Arc<Waiter<D>>;
#[derive(Debug)]
struct Waiter<D: Clone + Eq + PartialEq> {
is_woken_up: bool,
is_woken_up: AtomicBool,
data: D,
}
impl<D: Clone + Eq + PartialEq> Waiter<D> {
pub fn new(data: D) -> Self {
Waiter {
is_woken_up: false,
is_woken_up: AtomicBool::new(false),
data,
}
}
pub fn wait(&self) {
while !self.is_woken_up {
while !self.is_woken_up.load(Ordering::Relaxed) {
// yield the execution, to allow other task to contine
debug!("Waiter: wait");
Task::yield_now();
}
}
pub fn wake_up(&mut self) {
self.is_woken_up = true;
pub fn is_woken_up(&self) -> bool {
self.is_woken_up.load(Ordering::Relaxed)
}
pub fn wake_up(&self) {
self.is_woken_up.store(true, Ordering::Relaxed);
}
pub fn data(&self) -> &D {

View File

@ -55,15 +55,14 @@ pub unsafe trait Pod: Copy + Sized + Debug {
/// FIXME: use derive instead
#[macro_export]
macro_rules! impl_pod_for {
($($pod_ty:ty),*/* define the input */) => {
/* define the expansion */
($($pod_ty:ty),*) => {
$(unsafe impl Pod for $pod_ty {})*
};
}
impl_pod_for!(u8, u16, u32, u64, i8, i16, i32, i64, isize, usize);
//unsafe impl<T: Pod, const N> [T; N] for Pod {}
unsafe impl<T: Pod, const N: usize> Pod for [T; N] {}
/// Get the offset of a field within a type as a pointer.
///

View File

@ -0,0 +1,411 @@
use crate::{memory::copy_val_from_user, syscall::SYS_FUTEX};
use super::SyscallResult;
use alloc::{sync::Arc, vec::Vec};
use bitflags::bitflags;
use kxos_frame::{cpu::num_cpus, debug, sync::WaitQueue, vm::Vaddr, warn};
use lazy_static::lazy_static;
use spin::Mutex;
type FutexBitSet = u32;
type FutexBucketRef = Arc<Mutex<FutexBucket>>;
const FUTEX_OP_MASK: u32 = 0x0000_000F;
const FUTEX_FLAGS_MASK: u32 = 0xFFFF_FFF0;
const FUTEX_BITSET_MATCH_ANY: FutexBitSet = 0xFFFF_FFFF;
pub fn sys_futex(
futex_addr: u64,
futex_op: u64,
futex_val: u64,
utime_addr: u64,
futex_new_addr: u64,
bitset: u64,
) -> SyscallResult {
debug!("[syscall][id={}][SYS_FUTEX]", SYS_FUTEX);
// FIXME: we current ignore futex flags
let (futex_op, futex_flags) = futex_op_and_flags_from_u32(futex_op as _).unwrap();
let get_futex_val = |val: i32| -> Result<usize, &'static str> {
if val < 0 {
return Err("the futex val must not be negative");
}
Ok(val as usize)
};
let get_futex_timeout = |timeout_addr| -> Result<Option<FutexTimeout>, &'static str> {
if timeout_addr == 0 {
return Ok(None);
}
// TODO: parse a timeout
todo!()
};
let res = match futex_op {
FutexOp::FUTEX_WAIT => {
let timeout = get_futex_timeout(utime_addr).expect("Invalid time addr");
futex_wait(futex_addr as _, futex_val as _, &timeout).map(|_| 0)
}
FutexOp::FUTEX_WAIT_BITSET => {
let timeout = get_futex_timeout(utime_addr).expect("Invalid time addr");
futex_wait_bitset(futex_addr as _, futex_val as _, &timeout, bitset as _).map(|_| 0)
}
FutexOp::FUTEX_WAKE => {
let max_count = get_futex_val(futex_val as i32).expect("Invalid futex val");
futex_wake(futex_addr as _, max_count).map(|count| count as isize)
}
FutexOp::FUTEX_WAKE_BITSET => {
let max_count = get_futex_val(futex_val as i32).expect("Invalid futex val");
futex_wake_bitset(futex_addr as _, max_count, bitset as _).map(|count| count as isize)
}
FutexOp::FUTEX_REQUEUE => {
let max_nwakes = get_futex_val(futex_val as i32).expect("Invalid futex val");
let max_nrequeues = get_futex_val(utime_addr as i32).expect("Invalid utime addr");
futex_requeue(
futex_addr as _,
max_nwakes,
max_nrequeues,
futex_new_addr as _,
)
.map(|nwakes| nwakes as _)
}
_ => panic!("Unsupported futex operations"),
}
.unwrap();
SyscallResult::Return(res as _)
}
/// do futex wait
pub fn futex_wait(
futex_addr: u64,
futex_val: i32,
timeout: &Option<FutexTimeout>,
) -> Result<(), &'static str> {
futex_wait_bitset(futex_addr as _, futex_val, timeout, FUTEX_BITSET_MATCH_ANY)
}
/// do futex wait bitset
pub fn futex_wait_bitset(
futex_addr: Vaddr,
futex_val: i32,
timeout: &Option<FutexTimeout>,
bitset: FutexBitSet,
) -> Result<(), &'static str> {
debug!(
"futex_wait_bitset addr: {:#x}, val: {}, timeout: {:?}, bitset: {:#x}",
futex_addr, futex_val, timeout, bitset
);
let futex_key = FutexKey::new(futex_addr);
let (_, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_key);
// lock futex bucket ref here to avoid data race
let futex_bucket = futex_bucket_ref.lock();
if futex_key.load_val() != futex_val {
return Err("futex value does not match");
}
let futex_item = FutexItem::new(futex_key, bitset);
futex_bucket.enqueue_item(futex_item);
let wait_queue = futex_bucket.wait_queue();
// drop lock
drop(futex_bucket);
wait_queue.wait_on(futex_item);
Ok(())
}
/// do futex wake
pub fn futex_wake(futex_addr: Vaddr, max_count: usize) -> Result<usize, &'static str> {
futex_wake_bitset(futex_addr, max_count, FUTEX_BITSET_MATCH_ANY)
}
/// Do futex wake with bitset
pub fn futex_wake_bitset(
futex_addr: Vaddr,
max_count: usize,
bitset: FutexBitSet,
) -> Result<usize, &'static str> {
debug!(
"futex_wake_bitset addr: {:#x}, max_count: {}, bitset: {:#x}",
futex_addr as usize, max_count, bitset
);
let futex_key = FutexKey::new(futex_addr);
let (_, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_key);
let futex_bucket = futex_bucket_ref.lock();
let res = futex_bucket.batch_wake_and_deque_items(futex_key, max_count, bitset);
Ok(res)
}
/// Do futex requeue
pub fn futex_requeue(
futex_addr: Vaddr,
max_nwakes: usize,
max_nrequeues: usize,
futex_new_addr: Vaddr,
) -> Result<usize, &'static str> {
if futex_new_addr == futex_addr {
return futex_wake(futex_addr, max_nwakes);
}
let futex_key = FutexKey::new(futex_addr);
let futex_new_key = FutexKey::new(futex_new_addr);
let (bucket_idx, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_key);
let (new_bucket_idx, futex_new_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_new_key);
let nwakes = {
if bucket_idx == new_bucket_idx {
let futex_bucket = futex_bucket_ref.lock();
let nwakes = futex_bucket.batch_wake_and_deque_items(
futex_key,
max_nwakes,
FUTEX_BITSET_MATCH_ANY,
);
futex_bucket.update_item_keys(futex_key, futex_new_key, max_nrequeues);
drop(futex_bucket);
nwakes
} else {
let (futex_bucket, futex_new_bucket) = {
if bucket_idx < new_bucket_idx {
let futex_bucket = futex_bucket_ref.lock();
let futext_new_bucket = futex_new_bucket_ref.lock();
(futex_bucket, futext_new_bucket)
} else {
// bucket_idx > new_bucket_idx
let futex_new_bucket = futex_new_bucket_ref.lock();
let futex_bucket = futex_bucket_ref.lock();
(futex_bucket, futex_new_bucket)
}
};
let nwakes = futex_bucket.batch_wake_and_deque_items(
futex_key,
max_nwakes,
FUTEX_BITSET_MATCH_ANY,
);
futex_bucket.requeue_items_to_another_bucket(
futex_key,
&futex_new_bucket,
futex_new_key,
max_nrequeues,
);
nwakes
}
};
Ok(nwakes)
}
lazy_static! {
// Use the same count as linux kernel to keep the same performance
static ref BUCKET_COUNT: usize = ((1<<8)* num_cpus()).next_power_of_two() as _;
static ref BUCKET_MASK: usize = *BUCKET_COUNT - 1;
static ref FUTEX_BUCKETS: FutexBucketVec = FutexBucketVec::new(*BUCKET_COUNT);
}
#[derive(Debug, Clone)]
pub struct FutexTimeout {}
impl FutexTimeout {
pub fn new() -> Self {
todo!()
}
}
struct FutexBucketVec {
vec: Vec<FutexBucketRef>,
}
impl FutexBucketVec {
pub fn new(size: usize) -> FutexBucketVec {
let mut buckets = FutexBucketVec {
vec: Vec::with_capacity(size),
};
for _ in 0..size {
let bucket = Arc::new(Mutex::new(FutexBucket::new()));
buckets.vec.push(bucket);
}
buckets
}
pub fn get_bucket(&self, key: FutexKey) -> (usize, FutexBucketRef) {
let index = *BUCKET_MASK & {
// The addr is the multiples of 4, so we ignore the last 2 bits
let addr = key.addr() >> 2;
// simple hash
addr / self.size()
};
(index, self.vec[index].clone())
}
fn size(&self) -> usize {
self.vec.len()
}
}
struct FutexBucket {
wait_queue: Arc<WaitQueue<FutexItem>>,
}
impl FutexBucket {
pub fn new() -> FutexBucket {
FutexBucket {
wait_queue: Arc::new(WaitQueue::new()),
}
}
pub fn wait_queue(&self) -> Arc<WaitQueue<FutexItem>> {
self.wait_queue.clone()
}
pub fn enqueue_item(&self, item: FutexItem) {
self.wait_queue.enqueue(item);
}
pub fn dequeue_item(&self, item: FutexItem) {
self.wait_queue.dequeue(item);
}
pub fn batch_wake_and_deque_items(
&self,
key: FutexKey,
max_count: usize,
bitset: FutexBitSet,
) -> usize {
self.wait_queue.batch_wake_and_deque(
max_count,
&(key, bitset),
|futex_item, (futex_key, bitset)| {
if futex_item.key == *futex_key && (*bitset & futex_item.bitset) != 0 {
true
} else {
false
}
},
)
}
pub fn update_item_keys(&self, key: FutexKey, new_key: FutexKey, max_count: usize) {
self.wait_queue.update_waiters_data(
|futex_key, futex_item| futex_item.key == *futex_key,
&key,
&new_key,
|futex_item, new_futex_key| FutexItem::new(new_futex_key.clone(), futex_item.bitset),
max_count,
)
}
pub fn requeue_items_to_another_bucket(
&self,
key: FutexKey,
another: &Self,
new_key: FutexKey,
max_nrequeues: usize,
) {
let requeue_items =
self.wait_queue
.remove_waiters(|item, key| item.key == *key, &key, max_nrequeues);
requeue_items.into_iter().for_each(|mut item| {
item.key = new_key;
another.enqueue_item(item);
});
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct FutexItem {
key: FutexKey,
bitset: FutexBitSet,
}
impl FutexItem {
pub fn new(key: FutexKey, bitset: FutexBitSet) -> Self {
FutexItem { key, bitset }
}
}
// The addr of a futex, it should be used to mark different futex word
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct FutexKey(Vaddr);
impl FutexKey {
pub fn new(futex_addr: Vaddr) -> Self {
FutexKey(futex_addr as _)
}
pub fn load_val(&self) -> i32 {
// FIXME: how to implement a atomic load?
warn!("implement an atomic load");
copy_val_from_user(self.0)
}
pub fn addr(&self) -> Vaddr {
self.0
}
}
// The implementation is from occlum
#[derive(PartialEq)]
#[allow(non_camel_case_types)]
pub enum FutexOp {
FUTEX_WAIT = 0,
FUTEX_WAKE = 1,
FUTEX_FD = 2,
FUTEX_REQUEUE = 3,
FUTEX_CMP_REQUEUE = 4,
FUTEX_WAKE_OP = 5,
FUTEX_LOCK_PI = 6,
FUTEX_UNLOCK_PI = 7,
FUTEX_TRYLOCK_PI = 8,
FUTEX_WAIT_BITSET = 9,
FUTEX_WAKE_BITSET = 10,
}
impl FutexOp {
pub fn from_u32(bits: u32) -> Result<FutexOp, &'static str> {
match bits {
0 => Ok(FutexOp::FUTEX_WAIT),
1 => Ok(FutexOp::FUTEX_WAKE),
2 => Ok(FutexOp::FUTEX_FD),
3 => Ok(FutexOp::FUTEX_REQUEUE),
4 => Ok(FutexOp::FUTEX_CMP_REQUEUE),
5 => Ok(FutexOp::FUTEX_WAKE_OP),
6 => Ok(FutexOp::FUTEX_LOCK_PI),
7 => Ok(FutexOp::FUTEX_UNLOCK_PI),
8 => Ok(FutexOp::FUTEX_TRYLOCK_PI),
9 => Ok(FutexOp::FUTEX_WAIT_BITSET),
10 => Ok(FutexOp::FUTEX_WAKE_BITSET),
_ => Err("Unknown futex op"),
}
}
}
bitflags! {
pub struct FutexFlags : u32 {
const FUTEX_PRIVATE = 128;
const FUTEX_CLOCK_REALTIME = 256;
}
}
impl FutexFlags {
pub fn from_u32(bits: u32) -> Result<FutexFlags, &'static str> {
FutexFlags::from_bits(bits).ok_or_else(|| "unknown futex flags")
}
}
pub fn futex_op_and_flags_from_u32(bits: u32) -> Result<(FutexOp, FutexFlags), &'static str> {
let op = {
let op_bits = bits & FUTEX_OP_MASK;
FutexOp::from_u32(op_bits)?
};
let flags = {
let flags_bits = bits & FUTEX_FLAGS_MASK;
FutexFlags::from_u32(flags_bits)?
};
Ok((op, flags))
}

View File

@ -12,6 +12,7 @@ use crate::syscall::exit::sys_exit;
use crate::syscall::exit_group::sys_exit_group;
use crate::syscall::fork::sys_fork;
use crate::syscall::fstat::sys_fstat;
use crate::syscall::futex::sys_futex;
use crate::syscall::getpid::sys_getpid;
use crate::syscall::gettid::sys_gettid;
use crate::syscall::mmap::sys_mmap;
@ -30,6 +31,7 @@ mod exit;
mod exit_group;
mod fork;
mod fstat;
mod futex;
mod getpid;
mod gettid;
pub mod mmap;
@ -64,6 +66,7 @@ const SYS_GETEUID: u64 = 107;
const SYS_GETEGID: u64 = 108;
const SYS_ARCH_PRCTL: u64 = 158;
const SYS_GETTID: u64 = 186;
const SYS_FUTEX: u64 = 202;
const SYS_EXIT_GROUP: u64 = 231;
const SYS_TGKILL: u64 = 234;
const SYS_WAITID: u64 = 247;
@ -136,6 +139,7 @@ pub fn syscall_dispatch(
SYS_GETEGID => sys_getegid(),
SYS_ARCH_PRCTL => sys_arch_prctl(args[0], args[1], context),
SYS_GETTID => sys_gettid(),
SYS_FUTEX => sys_futex(args[0], args[1], args[2], args[3], args[4], args[5]),
SYS_EXIT_GROUP => sys_exit_group(args[0]),
SYS_TGKILL => sys_tgkill(args[0], args[1], args[2]),
SYS_WAITID => sys_waitid(args[0], args[1], args[2], args[3], args[4]),