Implement system call sched_get/set_affinity

This commit is contained in:
Zhang Junyang
2024-09-23 14:38:17 +08:00
committed by Tate, Hongliang Tian
parent 3468ec213b
commit e319641b4d
8 changed files with 167 additions and 94 deletions

View File

@ -1,8 +1,9 @@
// SPDX-License-Identifier: MPL-2.0
use core::sync::atomic::Ordering;
use ostd::{
cpu::{num_cpus, CpuId, CpuSet, PinCurrentCpu},
sync::PreemptDisabled,
task::{
scheduler::{
info::CommonSchedInfo, inject_scheduler, EnqueueFlags, LocalRunQueue, Scheduler,
@ -254,11 +255,11 @@ impl PreemptSchedInfo for Thread {
const LOWEST_TASK_PRIORITY: Priority = Priority::new(PriorityRange::new(PriorityRange::MAX));
fn priority(&self) -> Priority {
self.priority()
self.atomic_priority().load(Ordering::Relaxed)
}
fn cpu_affinity(&self) -> SpinLockGuard<CpuSet, PreemptDisabled> {
self.lock_cpu_affinity()
fn cpu_affinity(&self) -> CpuSet {
self.atomic_cpu_affinity().load()
}
}
@ -268,7 +269,7 @@ trait PreemptSchedInfo {
fn priority(&self) -> Priority;
fn cpu_affinity(&self) -> SpinLockGuard<CpuSet, PreemptDisabled>;
fn cpu_affinity(&self) -> CpuSet;
fn is_real_time(&self) -> bool {
self.priority() < Self::REAL_TIME_TASK_PRIORITY

View File

@ -79,7 +79,7 @@ use crate::syscall::{
rt_sigpending::sys_rt_sigpending,
rt_sigprocmask::sys_rt_sigprocmask,
rt_sigsuspend::sys_rt_sigsuspend,
sched_getaffinity::sys_sched_getaffinity,
sched_affinity::{sys_sched_getaffinity, sys_sched_setaffinity},
sched_yield::sys_sched_yield,
semctl::sys_semctl,
semget::sys_semget,
@ -191,6 +191,7 @@ impl_syscall_nums_and_dispatch_fn! {
SYS_SETITIMER = 103 => sys_setitimer(args[..3]);
SYS_TIMER_CREATE = 107 => sys_timer_create(args[..3]);
SYS_TIMER_DELETE = 111 => sys_timer_delete(args[..1]);
SYS_SCHED_SETAFFINITY = 122 => sys_sched_setaffinity(args[..3]);
SYS_SCHED_GETAFFINITY = 123 => sys_sched_getaffinity(args[..3]);
SYS_SCHED_YIELD = 124 => sys_sched_yield(args[..0]);
SYS_KILL = 129 => sys_kill(args[..2]);

View File

@ -86,7 +86,7 @@ use crate::syscall::{
rt_sigprocmask::sys_rt_sigprocmask,
rt_sigreturn::sys_rt_sigreturn,
rt_sigsuspend::sys_rt_sigsuspend,
sched_getaffinity::sys_sched_getaffinity,
sched_affinity::{sys_sched_getaffinity, sys_sched_setaffinity},
sched_yield::sys_sched_yield,
select::sys_select,
semctl::sys_semctl,
@ -263,6 +263,7 @@ impl_syscall_nums_and_dispatch_fn! {
SYS_GETTID = 186 => sys_gettid(args[..0]);
SYS_TIME = 201 => sys_time(args[..1]);
SYS_FUTEX = 202 => sys_futex(args[..6]);
SYS_SCHED_SETAFFINITY = 203 => sys_sched_setaffinity(args[..3]);
SYS_SCHED_GETAFFINITY = 204 => sys_sched_getaffinity(args[..3]);
SYS_EPOLL_CREATE = 213 => sys_epoll_create(args[..1]);
SYS_GETDENTS64 = 217 => sys_getdents64(args[..3]);

View File

@ -93,7 +93,7 @@ mod rt_sigpending;
mod rt_sigprocmask;
mod rt_sigreturn;
mod rt_sigsuspend;
mod sched_getaffinity;
mod sched_affinity;
mod sched_yield;
mod select;
mod semctl;

View File

@ -0,0 +1,130 @@
// SPDX-License-Identifier: MPL-2.0
use core::{cmp, mem};
use ostd::cpu::{num_cpus, CpuId, CpuSet};
use super::SyscallReturn;
use crate::{prelude::*, process::posix_thread::thread_table, thread::Tid};
pub fn sys_sched_getaffinity(
tid: Tid,
cpuset_size: usize,
cpu_set_ptr: Vaddr,
ctx: &Context,
) -> Result<SyscallReturn> {
let cpu_set = match tid {
0 => ctx.thread.atomic_cpu_affinity().load(),
_ => match thread_table::get_thread(tid) {
Some(thread) => thread.atomic_cpu_affinity().load(),
None => return Err(Error::with_message(Errno::ESRCH, "thread does not exist")),
},
};
let bytes_written = write_cpu_set_to(ctx.get_user_space(), &cpu_set, cpuset_size, cpu_set_ptr)?;
Ok(SyscallReturn::Return(bytes_written as isize))
}
// TODO: The manual page of `sched_setaffinity` says that if the thread is not
// running on the CPU specified in the affinity mask, it would be migrated to
// one of the CPUs specified in the mask. We currently do not support this
// feature as the scheduler is not ready for migration yet.
pub fn sys_sched_setaffinity(
tid: Tid,
cpuset_size: usize,
cpu_set_ptr: Vaddr,
ctx: &Context,
) -> Result<SyscallReturn> {
let user_cpu_set = read_cpu_set_from(ctx.get_user_space(), cpuset_size, cpu_set_ptr)?;
match tid {
0 => ctx.thread.atomic_cpu_affinity().store(&user_cpu_set),
_ => match thread_table::get_thread(tid) {
Some(thread) => {
thread.atomic_cpu_affinity().store(&user_cpu_set);
}
None => return Err(Error::with_message(Errno::ESRCH, "thread does not exist")),
},
}
Ok(SyscallReturn::Return(0))
}
// Linux uses `DECLARE_BITMAP` for `cpu_set_t`, inside which each part is a
// `long`. We use the same scheme to ensure byte endianness compatibility.
type Part = u64;
const SIZE_OF_PART: usize = mem::size_of::<Part>();
const CPUS_IN_PART: usize = SIZE_OF_PART * 8;
fn read_cpu_set_from(
uspace: CurrentUserSpace,
cpuset_size: usize,
cpu_set_ptr: Vaddr,
) -> Result<CpuSet> {
if cpuset_size == 0 {
return Err(Error::with_message(Errno::EINVAL, "invalid cpuset size"));
}
let num_cpus = num_cpus();
let mut ret_set = CpuSet::new_empty();
let nr_parts_to_read = cmp::min(cpuset_size / SIZE_OF_PART, num_cpus.div_ceil(CPUS_IN_PART));
for part_id in 0..nr_parts_to_read {
let user_part: Part = uspace.read_val(cpu_set_ptr + part_id * SIZE_OF_PART)?;
for bit_id in 0..CPUS_IN_PART {
if user_part & (1 << bit_id) != 0 {
// If the CPU ID is invalid, just ignore it.
let Ok(cpu_id) = CpuId::try_from(part_id * CPUS_IN_PART + bit_id) else {
continue;
};
ret_set.add(cpu_id);
}
}
}
if ret_set.is_empty() {
return Err(Error::with_message(Errno::EINVAL, "empty cpuset"));
}
Ok(ret_set)
}
// Returns the number of bytes written.
fn write_cpu_set_to(
uspace: CurrentUserSpace,
cpu_set: &CpuSet,
cpuset_size: usize,
cpu_set_ptr: Vaddr,
) -> Result<usize> {
if cpuset_size == 0 {
return Err(Error::with_message(Errno::EINVAL, "invalid cpuset size"));
}
let num_cpus = num_cpus();
let nr_parts_to_write = cmp::min(cpuset_size / SIZE_OF_PART, num_cpus.div_ceil(CPUS_IN_PART));
let mut user_part: Part = 0;
let mut part_idx = 0;
for cpu_id in cpu_set.iter() {
let id = cpu_id.as_usize();
while part_idx < cmp::min(id / CPUS_IN_PART, nr_parts_to_write) {
uspace.write_val(cpu_set_ptr + part_idx * SIZE_OF_PART, &user_part)?;
user_part = 0;
part_idx += 1;
}
if part_idx >= nr_parts_to_write {
break;
}
user_part |= 1 << (id % CPUS_IN_PART);
}
while part_idx < nr_parts_to_write {
uspace.write_val(cpu_set_ptr + part_idx * SIZE_OF_PART, &user_part)?;
user_part = 0;
part_idx += 1;
}
Ok(nr_parts_to_write * SIZE_OF_PART)
}

View File

@ -1,70 +0,0 @@
// SPDX-License-Identifier: MPL-2.0
use core::{cmp, mem};
use super::SyscallReturn;
use crate::{
prelude::*,
process::{process_table, Pid},
};
fn get_num_cpus() -> usize {
// TODO: Properly determine the number of available CPUs
// This could be through a system configuration query.
1
}
pub fn sys_sched_getaffinity(
pid: Pid,
cpuset_size: usize,
cpu_set_ptr: Vaddr,
ctx: &Context,
) -> Result<SyscallReturn> {
let num_cpus = get_num_cpus();
if cpuset_size < core::mem::size_of::<cpu_set_t>() {
return Err(Error::with_message(Errno::EINVAL, "invalid cpuset size"));
}
match pid {
0 => {
// TODO: Get the current thread's CPU affinity
// Placeholder for future implementation.
}
_ => {
match process_table::get_process(pid) {
Some(_process) => { /* Placeholder if process-specific logic needed */ }
None => return Err(Error::with_message(Errno::ESRCH, "process does not exist")),
}
}
}
let dummy_cpu_set = cpu_set_t::new(num_cpus);
ctx.get_user_space()
.write_val(cpu_set_ptr, &dummy_cpu_set)?;
Ok(SyscallReturn::Return(0))
}
const CPU_SETSIZE: usize = 1024; // Max number of CPU bits.
const __NCPUBITS: usize = 8 * mem::size_of::<usize>();
#[derive(Debug, Clone, Copy, Pod)]
#[repr(C, packed)]
struct cpu_set_t {
__bits: [usize; CPU_SETSIZE / __NCPUBITS],
}
impl cpu_set_t {
/// Creates a new cpu_set_t representing available CPUs.
fn new(num_cpus: usize) -> Self {
let mut bits = [0usize; CPU_SETSIZE / __NCPUBITS];
for cpu in 0..cmp::min(num_cpus, CPU_SETSIZE) {
bits[cpu / __NCPUBITS] |= 1 << (cpu % __NCPUBITS);
}
Self { __bits: bits }
}
}

View File

@ -4,7 +4,10 @@
use core::sync::atomic::Ordering;
use ostd::{cpu::CpuSet, sync::PreemptDisabled, task::Task};
use ostd::{
cpu::{AtomicCpuSet, CpuSet},
task::Task,
};
use self::status::{AtomicThreadStatus, ThreadStatus};
use crate::{
@ -34,8 +37,8 @@ pub struct Thread {
status: AtomicThreadStatus,
/// Thread priority
priority: AtomicPriority,
/// Thread cpu affinity
cpu_affinity: SpinLock<CpuSet>,
/// Thread CPU affinity
cpu_affinity: AtomicCpuSet,
}
impl Thread {
@ -52,7 +55,7 @@ impl Thread {
data: Box::new(data),
status: AtomicThreadStatus::new(status),
priority: AtomicPriority::new(priority),
cpu_affinity: SpinLock::new(cpu_affinity),
cpu_affinity: AtomicCpuSet::new(cpu_affinity),
}
}
@ -67,6 +70,11 @@ impl Thread {
.cloned()
}
/// Returns the task associated with this thread.
pub fn task(&self) -> Arc<Task> {
self.task.upgrade().unwrap()
}
/// Gets the Thread from task's data.
///
/// # Panics
@ -116,14 +124,9 @@ impl Thread {
self.priority.store(new_priority, Ordering::Relaxed)
}
/// Acquires the lock of cpu affinity.
pub fn lock_cpu_affinity(&self) -> SpinLockGuard<CpuSet, PreemptDisabled> {
self.cpu_affinity.lock()
}
/// Updates the cpu affinity with the new value.
pub fn set_cpu_affinity(&self, new_cpu_affinity: CpuSet) {
*self.cpu_affinity.lock() = new_cpu_affinity;
/// Returns the reference to the atomic CPU affinity.
pub fn atomic_cpu_affinity(&self) -> &AtomicCpuSet {
&self.cpu_affinity
}
pub fn yield_now() {

View File

@ -80,6 +80,11 @@ impl CpuSet {
.sum()
}
/// Returns true if the set is empty.
pub fn is_empty(&self) -> bool {
self.bits.iter().all(|part| *part == 0)
}
/// Adds all CPUs to the set.
pub fn add_all(&mut self) {
self.bits.fill(!0);
@ -92,6 +97,8 @@ impl CpuSet {
}
/// Iterates over the CPUs in the set.
///
/// The order of the iteration is guaranteed to be in ascending order.
pub fn iter(&self) -> impl Iterator<Item = CpuId> + '_ {
self.bits.iter().enumerate().flat_map(|(part_idx, &part)| {
(0..BITS_PER_PART).filter_map(move |bit_idx| {
@ -165,9 +172,9 @@ impl AtomicCpuSet {
///
/// This operation can only be done in the [`Ordering::Relaxed`] memory
/// order. It cannot be used to synchronize anything between CPUs.
pub fn store(&self, value: CpuSet) {
for (part, new_part) in self.bits.iter().zip(value.bits) {
part.store(new_part, Ordering::Relaxed);
pub fn store(&self, value: &CpuSet) {
for (part, new_part) in self.bits.iter().zip(value.bits.iter()) {
part.store(*new_part, Ordering::Relaxed);
}
}
@ -260,7 +267,7 @@ mod test {
}
}
atomic_set.store(CpuSet::with_capacity_val(test_num_cpus, 0));
atomic_set.store(&CpuSet::with_capacity_val(test_num_cpus, 0));
for cpu_id in test_all_iter() {
assert!(!atomic_set.contains(cpu_id, Ordering::Relaxed));