Optimize the ThreadStatus to be lock-free

This commit is contained in:
LI Qing
2024-05-11 11:18:41 +08:00
committed by Tate, Hongliang Tian
parent 0eaa6e637d
commit 27bcece750
8 changed files with 83 additions and 53 deletions

View File

@ -20,7 +20,7 @@ pub fn do_exit_group(term_status: TermStatus) {
// Exit all threads
let threads = current.threads().lock().clone();
for thread in threads {
if thread.is_exited() {
if thread.status().is_exited() {
continue;
}

View File

@ -55,7 +55,7 @@ pub fn tgkill(tid: Tid, tgid: Pid, signal: Option<UserSignal>) -> Result<()> {
let thread = thread_table::get_thread(tid)
.ok_or_else(|| Error::with_message(Errno::ESRCH, "target thread does not exist"))?;
if thread.is_exited() {
if thread.status().is_exited() {
return Ok(());
}

View File

@ -193,7 +193,7 @@ impl PosixThread {
let threads = process.threads().lock();
threads
.iter()
.filter(|thread| !thread.status().lock().is_exited())
.filter(|thread| !thread.status().is_exited())
.count()
== 0
}

View File

@ -13,7 +13,7 @@ pub mod sig_queues;
mod sig_stack;
pub mod signals;
use core::mem;
use core::{mem, sync::atomic::Ordering};
use align_ext::AlignExt;
use aster_frame::{cpu::UserContext, task::Task};
@ -30,7 +30,7 @@ use super::posix_thread::{PosixThread, PosixThreadExt};
use crate::{
prelude::*,
process::{do_exit_group, TermStatus},
thread::Thread,
thread::{status::ThreadStatus, Thread},
util::{write_bytes_to_user, write_val_to_user},
};
@ -91,18 +91,20 @@ pub fn handle_pending_signal(
}
SigDefaultAction::Ign => {}
SigDefaultAction::Stop => {
let mut status = current_thread.status().lock();
if status.is_running() {
status.set_stopped();
}
drop(status);
let _ = current_thread.atomic_status().compare_exchange(
ThreadStatus::Running,
ThreadStatus::Stopped,
Ordering::AcqRel,
Ordering::Relaxed,
);
}
SigDefaultAction::Cont => {
let mut status = current_thread.status().lock();
if status.is_stopped() {
status.set_running();
}
drop(status);
let _ = current_thread.atomic_status().compare_exchange(
ThreadStatus::Stopped,
ThreadStatus::Running,
Ordering::AcqRel,
Ordering::Relaxed,
);
}
}
}

View File

@ -56,11 +56,9 @@ impl KernelThreadExt for Thread {
fn join(&self) {
loop {
let status = self.status.lock();
if status.is_exited() {
if self.status().is_exited() {
return;
} else {
drop(status);
Thread::yield_now();
}
}

View File

@ -6,7 +6,7 @@ use core::sync::atomic::{AtomicU32, Ordering};
use aster_frame::task::Task;
use self::status::ThreadStatus;
use self::status::{AtomicThreadStatus, ThreadStatus};
use crate::prelude::*;
pub mod exception;
@ -31,7 +31,7 @@ pub struct Thread {
data: Box<dyn Send + Sync + Any>,
// mutable part
status: Mutex<ThreadStatus>,
status: AtomicThreadStatus,
}
impl Thread {
@ -46,7 +46,7 @@ impl Thread {
tid,
task,
data: Box::new(data),
status: Mutex::new(status),
status: AtomicThreadStatus::new(status),
}
}
@ -67,25 +67,29 @@ impl Thread {
/// Run this thread at once.
pub fn run(&self) {
self.status.lock().set_running();
self.set_status(ThreadStatus::Running);
self.task.run();
}
pub fn exit(&self) {
let mut status = self.status.lock();
if !status.is_exited() {
status.set_exited();
}
self.set_status(ThreadStatus::Exited);
}
pub fn is_exited(&self) -> bool {
self.status.lock().is_exited()
}
pub fn status(&self) -> &Mutex<ThreadStatus> {
/// Returns the reference to the atomic status.
pub fn atomic_status(&self) -> &AtomicThreadStatus {
&self.status
}
/// Returns the current status.
pub fn status(&self) -> ThreadStatus {
self.status.load(Ordering::Acquire)
}
/// Updates the status with the `new` value.
pub fn set_status(&self, new_status: ThreadStatus) {
self.status.store(new_status, Ordering::Release);
}
pub fn yield_now() {
Task::yield_now()
}

View File

@ -1,11 +1,51 @@
// SPDX-License-Identifier: MPL-2.0
#[derive(Debug, PartialEq, Clone, Copy)]
use core::sync::atomic::{AtomicU8, Ordering};
use int_to_c_enum::TryFromInt;
/// A `ThreadStatus` which can be safely shared between threads.
#[derive(Debug)]
pub struct AtomicThreadStatus(AtomicU8);
impl AtomicThreadStatus {
/// Creates a new atomic status.
pub fn new(status: ThreadStatus) -> Self {
Self(AtomicU8::new(status as u8))
}
/// Loads a value from the atomic status.
pub fn load(&self, order: Ordering) -> ThreadStatus {
ThreadStatus::try_from(self.0.load(order)).unwrap()
}
/// Stores a value into the atomic status.
pub fn store(&self, new_status: ThreadStatus, order: Ordering) {
self.0.store(new_status as u8, order);
}
/// Stores a value into the atomic status if the current value is the same as the `current` value.
pub fn compare_exchange(
&self,
current: ThreadStatus,
new: ThreadStatus,
success: Ordering,
failure: Ordering,
) -> Result<ThreadStatus, ThreadStatus> {
self.0
.compare_exchange(current as u8, new as u8, success, failure)
.map(|val| ThreadStatus::try_from(val).unwrap())
.map_err(|val| ThreadStatus::try_from(val).unwrap())
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, TryFromInt)]
#[repr(u8)]
pub enum ThreadStatus {
Init,
Running,
Exited,
Stopped,
Init = 0,
Running = 1,
Exited = 2,
Stopped = 3,
}
impl ThreadStatus {
@ -20,18 +60,4 @@ impl ThreadStatus {
pub fn is_stopped(&self) -> bool {
*self == ThreadStatus::Stopped
}
pub fn set_running(&mut self) {
debug_assert!(!self.is_exited());
*self = ThreadStatus::Running;
}
pub fn set_stopped(&mut self) {
debug_assert!(!self.is_exited());
*self = ThreadStatus::Stopped;
}
pub fn set_exited(&mut self) {
*self = ThreadStatus::Exited;
}
}

View File

@ -40,16 +40,16 @@ pub fn create_new_user_task(user_space: Arc<UserSpace>, thread_ref: Weak<Thread>
// handle user event:
handle_user_event(user_event, context);
// should be do this comparison before handle signal?
if current_thread.status().lock().is_exited() {
if current_thread.status().is_exited() {
break;
}
handle_pending_signal(context, &current_thread).unwrap();
if current_thread.status().lock().is_exited() {
if current_thread.status().is_exited() {
debug!("exit due to signal");
break;
}
// If current is suspended, wait for a signal to wake up self
while current_thread.status().lock().is_stopped() {
while current_thread.status().is_stopped() {
Thread::yield_now();
debug!("{} is suspended.", current_thread.tid());
handle_pending_signal(context, &current_thread).unwrap();