Make task store Arc<Thread> and process store Arc<Task>

This commit is contained in:
Jianfeng Jiang
2024-09-13 03:13:07 +00:00
committed by Tate, Hongliang Tian
parent 81b0f265b5
commit f3174dbbbc
22 changed files with 255 additions and 246 deletions

View File

@ -4,11 +4,12 @@ use core::sync::atomic::Ordering;
use ostd::{
cpu::UserContext,
task::Task,
user::{UserContextApi, UserSpace},
};
use super::{
posix_thread::{PosixThread, PosixThreadBuilder, PosixThreadExt, ThreadName},
posix_thread::{thread_table, PosixThread, PosixThreadBuilder, PosixThreadExt, ThreadName},
process_table,
process_vm::ProcessVm,
signal::sig_disposition::SigDispositions,
@ -19,7 +20,7 @@ use crate::{
fs::{file_table::FileTable, fs_resolver::FsResolver, utils::FileCreationMask},
prelude::*,
process::posix_thread::allocate_posix_tid,
thread::{thread_table, Thread, Tid},
thread::{Thread, Tid},
};
bitflags! {
@ -133,7 +134,8 @@ pub fn clone_child(
) -> Result<Tid> {
clone_args.clone_flags.check_unsupported_flags()?;
if clone_args.clone_flags.contains(CloneFlags::CLONE_THREAD) {
let child_thread = clone_child_thread(ctx, parent_context, clone_args)?;
let child_task = clone_child_task(ctx, parent_context, clone_args)?;
let child_thread = Thread::borrow_from_task(&child_task);
child_thread.run();
let child_tid = child_thread.tid();
@ -147,11 +149,11 @@ pub fn clone_child(
}
}
fn clone_child_thread(
fn clone_child_task(
ctx: &Context,
parent_context: &UserContext,
clone_args: CloneArgs,
) -> Result<Arc<Thread>> {
) -> Result<Arc<Task>> {
let Context {
process,
posix_thread,
@ -182,7 +184,7 @@ fn clone_child_thread(
let sig_mask = posix_thread.sig_mask().load(Ordering::Relaxed).into();
let child_tid = allocate_posix_tid();
let child_thread = {
let child_task = {
let credentials = {
let credentials = ctx.posix_thread.credentials();
Credentials::new_from(&credentials)
@ -194,13 +196,13 @@ fn clone_child_thread(
thread_builder.build()
};
process.threads().lock().push(child_thread.clone());
process.tasks().lock().push(child_task.clone());
let child_posix_thread = child_thread.as_posix_thread().unwrap();
let child_posix_thread = child_task.as_posix_thread().unwrap();
clone_parent_settid(child_tid, clone_args.parent_tidptr, clone_flags)?;
clone_child_cleartid(child_posix_thread, clone_args.child_tidptr, clone_flags)?;
clone_child_settid(child_posix_thread, clone_args.child_tidptr, clone_flags)?;
Ok(child_thread)
Ok(child_task)
}
fn clone_child_process(
@ -296,7 +298,7 @@ fn clone_child_process(
};
// Deals with clone flags
let child_thread = thread_table::get_posix_thread(child_tid).unwrap();
let child_thread = thread_table::get_thread(child_tid).unwrap();
let child_posix_thread = child_thread.as_posix_thread().unwrap();
clone_parent_settid(child_tid, clone_args.parent_tidptr, clone_flags)?;
clone_child_cleartid(child_posix_thread, clone_args.child_tidptr, clone_flags)?;

View File

@ -7,6 +7,7 @@ use crate::{
posix_thread::{do_exit, PosixThreadExt},
signal::{constants::SIGCHLD, signals::kernel::KernelSignal},
},
thread::Thread,
};
pub fn do_exit_group(term_status: TermStatus) {
@ -18,9 +19,11 @@ pub fn do_exit_group(term_status: TermStatus) {
current.set_zombie(term_status);
// Exit all threads
let threads = current.threads().lock().clone();
for thread in threads {
if let Err(e) = do_exit(&thread, thread.as_posix_thread().unwrap(), term_status) {
let tasks = current.tasks().lock().clone();
for task in tasks {
let thread = Thread::borrow_from_task(&task);
let posix_thread = thread.as_posix_thread().unwrap();
if let Err(e) = do_exit(thread, posix_thread, term_status) {
debug!("Ignore error when call exit: {:?}", e);
}
}

View File

@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0
use super::{
posix_thread::PosixThreadExt,
posix_thread::{thread_table, PosixThreadExt},
process_table,
signal::{
constants::SIGCONT,
@ -10,10 +10,7 @@ use super::{
},
Pgid, Pid, Process, Sid, Uid,
};
use crate::{
prelude::*,
thread::{thread_table, Tid},
};
use crate::{prelude::*, thread::Tid};
/// Sends a signal to a process, using the current process as the sender.
///
@ -71,7 +68,7 @@ pub fn kill_group(pgid: Pgid, signal: Option<UserSignal>, ctx: &Context) -> Resu
/// If `signal` is `None`, this method will only check permission without sending
/// any signal.
pub fn tgkill(tid: Tid, tgid: Pid, signal: Option<UserSignal>, ctx: &Context) -> Result<()> {
let thread = thread_table::get_posix_thread(tid)
let thread = thread_table::get_thread(tid)
.ok_or_else(|| Error::with_message(Errno::ESRCH, "target thread does not exist"))?;
if thread.status().is_exited() {
@ -120,14 +117,14 @@ pub fn kill_all(signal: Option<UserSignal>, ctx: &Context) -> Result<()> {
}
fn kill_process(process: &Process, signal: Option<UserSignal>, ctx: &Context) -> Result<()> {
let threads = process.threads().lock();
let tasks = process.tasks().lock();
let signum = signal.map(|signal| signal.num());
let sender_ids = current_thread_sender_ids(signum.as_ref(), ctx);
let mut permitted_thread = None;
for thread in threads.iter() {
let posix_thread = thread.as_posix_thread().unwrap();
for task in tasks.iter() {
let posix_thread = task.as_posix_thread().unwrap();
// First check permission
if posix_thread

View File

@ -2,9 +2,9 @@
#![allow(dead_code)]
use ostd::user::UserSpace;
use ostd::{task::Task, user::UserSpace};
use super::PosixThread;
use super::{thread_table, PosixThread};
use crate::{
prelude::*,
process::{
@ -12,7 +12,7 @@ use crate::{
signal::{sig_mask::AtomicSigMask, sig_queues::SigQueues},
Credentials, Process,
},
thread::{status::ThreadStatus, task, thread_table, Thread, Tid},
thread::{status::ThreadStatus, task, Thread, Tid},
time::{clocks::ProfClock, TimerManager},
};
@ -72,7 +72,7 @@ impl PosixThreadBuilder {
self
}
pub fn build(self) -> Arc<Thread> {
pub fn build(self) -> Arc<Task> {
let Self {
tid,
user_space,
@ -85,15 +85,13 @@ impl PosixThreadBuilder {
sig_queues,
} = self;
let thread = Arc::new_cyclic(|thread_ref| {
let task = task::create_new_user_task(user_space, thread_ref.clone());
let status = ThreadStatus::Init;
Arc::new_cyclic(|weak_task| {
let posix_thread = {
let prof_clock = ProfClock::new();
let virtual_timer_manager = TimerManager::new(prof_clock.user_clock().clone());
let prof_timer_manager = TimerManager::new(prof_clock.clone());
let posix_thread = PosixThread {
PosixThread {
process,
tid,
name: Mutex::new(thread_name),
@ -109,11 +107,14 @@ impl PosixThreadBuilder {
prof_clock,
virtual_timer_manager,
prof_timer_manager,
}
};
Thread::new(task, posix_thread, status)
});
thread_table::add_posix_thread(tid, thread.clone());
thread
let status = ThreadStatus::Init;
let thread = Arc::new(Thread::new(weak_task.clone(), posix_thread, status));
thread_table::add_thread(tid, thread.clone());
task::create_new_user_task(user_space, thread)
})
}
}

View File

@ -1,10 +1,10 @@
// SPDX-License-Identifier: MPL-2.0
use super::{futex::futex_wake, robust_list::wake_robust_futex, PosixThread};
use super::{futex::futex_wake, robust_list::wake_robust_futex, thread_table, PosixThread};
use crate::{
prelude::*,
process::{do_exit_group, TermStatus},
thread::{thread_table, Thread, Tid},
thread::{Thread, Tid},
};
/// Exits the thread if the thread is a POSIX thread.
@ -36,7 +36,7 @@ pub fn do_exit(thread: &Thread, posix_thread: &PosixThread, term_status: TermSta
if tid != posix_thread.process().pid() {
// We don't remove main thread.
// The main thread is removed when the process is reaped.
thread_table::remove_posix_thread(tid);
thread_table::remove_thread(tid);
}
if posix_thread.is_main_thread(tid) || posix_thread.is_last_thread() {

View File

@ -22,7 +22,7 @@ use crate::{
events::Observer,
prelude::*,
process::signal::constants::SIGCONT,
thread::Tid,
thread::{Thread, Tid},
time::{clocks::ProfClock, Timer, TimerManager},
};
@ -32,11 +32,12 @@ pub mod futex;
mod name;
mod posix_thread_ext;
mod robust_list;
pub mod thread_table;
pub use builder::PosixThreadBuilder;
pub use exit::do_exit;
pub use name::{ThreadName, MAX_THREAD_NAME_LEN};
pub use posix_thread_ext::PosixThreadExt;
pub use posix_thread_ext::{create_posix_task_from_executable, PosixThreadExt};
pub use robust_list::RobustListHead;
pub struct PosixThread {
@ -273,12 +274,10 @@ impl PosixThread {
fn is_last_thread(&self) -> bool {
let process = self.process.upgrade().unwrap();
let threads = process.threads().lock();
threads
let tasks = process.tasks().lock();
tasks
.iter()
.filter(|thread| !thread.status().is_exited())
.count()
== 0
.any(|task| !Thread::borrow_from_task(task).status().is_exited())
}
/// Gets the read-only credentials of the thread.

View File

@ -2,6 +2,7 @@
use ostd::{
cpu::UserContext,
task::Task,
user::{UserContextApi, UserSpace},
};
@ -22,22 +23,25 @@ pub trait PosixThreadExt {
self.as_posix_thread().unwrap().tid()
}
fn as_posix_thread(&self) -> Option<&PosixThread>;
#[allow(clippy::too_many_arguments)]
fn new_posix_thread_from_executable(
tid: Tid,
credentials: Credentials,
process_vm: &ProcessVm,
fs_resolver: &FsResolver,
executable_path: &str,
process: Weak<Process>,
argv: Vec<CString>,
envp: Vec<CString>,
) -> Result<Arc<Self>>;
}
impl PosixThreadExt for Thread {
/// This function should only be called when launch shell()
fn new_posix_thread_from_executable(
fn as_posix_thread(&self) -> Option<&PosixThread> {
self.data().downcast_ref::<PosixThread>()
}
}
impl PosixThreadExt for Arc<Task> {
fn as_posix_thread(&self) -> Option<&PosixThread> {
Thread::borrow_from_task(self).as_posix_thread()
}
}
/// Creates a task for running an executable file.
///
/// This function should _only_ be used to create the init user task.
#[allow(clippy::too_many_arguments)]
pub fn create_posix_task_from_executable(
tid: Tid,
credentials: Credentials,
process_vm: &ProcessVm,
@ -46,13 +50,12 @@ impl PosixThreadExt for Thread {
process: Weak<Process>,
argv: Vec<CString>,
envp: Vec<CString>,
) -> Result<Arc<Self>> {
) -> Result<Arc<Task>> {
let elf_file = {
let fs_path = FsPath::new(AT_FDCWD, executable_path)?;
fs_resolver.lookup(&fs_path)?
};
let (_, elf_load_info) =
load_program_to_vm(process_vm, elf_file, argv, envp, fs_resolver, 1)?;
let (_, elf_load_info) = load_program_to_vm(process_vm, elf_file, argv, envp, fs_resolver, 1)?;
let vm_space = process_vm.root_vmar().vm_space().clone();
let mut cpu_ctx = UserContext::default();
@ -65,8 +68,3 @@ impl PosixThreadExt for Thread {
.process(process);
Ok(thread_builder.build())
}
fn as_posix_thread(&self) -> Option<&PosixThread> {
self.data().downcast_ref::<PosixThread>()
}
}

View File

@ -0,0 +1,22 @@
// SPDX-License-Identifier: MPL-2.0
use super::{Thread, Tid};
use crate::{prelude::*, process::posix_thread::PosixThreadExt};
static THREAD_TABLE: SpinLock<BTreeMap<Tid, Arc<Thread>>> = SpinLock::new(BTreeMap::new());
/// Adds a posix thread to global thread table
pub fn add_thread(tid: Tid, thread: Arc<Thread>) {
debug_assert_eq!(tid, thread.tid());
THREAD_TABLE.lock().insert(tid, thread);
}
/// Removes a posix thread to global thread table
pub fn remove_thread(tid: Tid) {
THREAD_TABLE.lock().remove(&tid);
}
/// Gets a posix thread from the global thread table
pub fn get_thread(tid: Tid) -> Option<Arc<Thread>> {
THREAD_TABLE.lock().get(&tid).cloned()
}

View File

@ -7,14 +7,13 @@ use crate::{
fs::{file_table::FileTable, fs_resolver::FsResolver, utils::FileCreationMask},
prelude::*,
process::{
posix_thread::{PosixThreadBuilder, PosixThreadExt},
posix_thread::{create_posix_task_from_executable, PosixThreadBuilder},
process_vm::ProcessVm,
rlimit::ResourceLimits,
signal::sig_disposition::SigDispositions,
Credentials,
},
sched::nice::Nice,
thread::Thread,
};
pub struct ProcessBuilder<'a> {
@ -190,11 +189,11 @@ impl<'a> ProcessBuilder<'a> {
)
};
let thread = if let Some(thread_builder) = main_thread_builder {
let task = if let Some(thread_builder) = main_thread_builder {
let builder = thread_builder.process(Arc::downgrade(&process));
builder.build()
} else {
Thread::new_posix_thread_from_executable(
create_posix_task_from_executable(
pid,
credentials.unwrap(),
process.vm(),
@ -206,7 +205,7 @@ impl<'a> ProcessBuilder<'a> {
)?
};
process.threads().lock().push(thread);
process.tasks().lock().push(task);
process.set_runnable();

View File

@ -37,7 +37,7 @@ use aster_rights::Full;
use atomic::Atomic;
pub use builder::ProcessBuilder;
pub use job_control::JobControl;
use ostd::sync::WaitQueue;
use ostd::{sync::WaitQueue, task::Task};
pub use process_group::ProcessGroup;
pub use session::Session;
pub use terminal::Terminal;
@ -68,7 +68,7 @@ pub struct Process {
/// The executable path.
executable_path: RwLock<String>,
/// The threads
threads: Mutex<Vec<Arc<Thread>>>,
tasks: Mutex<Vec<Arc<Task>>>,
/// Process status
status: ProcessStatus,
/// Parent process
@ -167,14 +167,20 @@ impl Process {
/// - the function is called in the bootstrap context;
/// - or if the current task is not associated with a process.
pub fn current() -> Option<Arc<Process>> {
Some(Thread::current()?.as_posix_thread()?.process())
Some(
Task::current()?
.data()
.downcast_ref::<Arc<Thread>>()?
.as_posix_thread()?
.process(),
)
}
#[allow(clippy::too_many_arguments)]
fn new(
pid: Pid,
parent: Weak<Process>,
threads: Vec<Arc<Thread>>,
tasks: Vec<Arc<Task>>,
executable_path: String,
process_vm: ProcessVm,
@ -194,7 +200,7 @@ impl Process {
Arc::new_cyclic(|process_ref: &Weak<Process>| Self {
pid,
threads: Mutex::new(threads),
tasks: Mutex::new(tasks),
executable_path: RwLock::new(executable_path),
process_vm,
children_wait_queue,
@ -271,13 +277,14 @@ impl Process {
/// start to run current process
pub fn run(&self) {
let threads = self.threads.lock();
let tasks = self.tasks.lock();
// when run the process, the process should has only one thread
debug_assert!(threads.len() == 1);
debug_assert!(tasks.len() == 1);
debug_assert!(self.is_runnable());
let thread = threads[0].clone();
let task = tasks[0].clone();
// should not hold the lock when run thread
drop(threads);
drop(tasks);
let thread = Thread::borrow_from_task(&task);
thread.run();
}
@ -297,8 +304,8 @@ impl Process {
&self.timer_manager
}
pub fn threads(&self) -> &Mutex<Vec<Arc<Thread>>> {
&self.threads
pub fn tasks(&self) -> &Mutex<Vec<Arc<Task>>> {
&self.tasks
}
pub fn executable_path(&self) -> String {
@ -318,10 +325,11 @@ impl Process {
}
pub fn main_thread(&self) -> Option<Arc<Thread>> {
self.threads
self.tasks
.lock()
.iter()
.find(|thread| thread.tid() == self.pid)
.find(|task| task.tid() == self.pid)
.map(Thread::borrow_from_task)
.cloned()
}
@ -644,7 +652,7 @@ impl Process {
// TODO: check that the signal is not user signal
// Enqueue signal to the first thread that does not block the signal
let threads = self.threads.lock();
let threads = self.tasks.lock();
for thread in threads.iter() {
let posix_thread = thread.as_posix_thread().unwrap();
if !posix_thread.has_signal_blocked(signal.num()) {

View File

@ -86,16 +86,9 @@ impl Pause for Waiter {
return Ok(res);
}
let current_thread = self
.task()
.data()
.downcast_ref::<Weak<Thread>>()
.and_then(|thread| thread.upgrade());
let current_thread = self.task().data().downcast_ref::<Arc<Thread>>();
let Some(posix_thread) = current_thread
.as_ref()
.and_then(|thread| thread.as_posix_thread())
else {
let Some(posix_thread) = current_thread.and_then(|thread| thread.as_posix_thread()) else {
if let Some(timeout) = timeout {
return self.wait_until_or_timeout(cond, timeout);
} else {

View File

@ -5,8 +5,11 @@
use super::{process_filter::ProcessFilter, signal::constants::SIGCHLD, ExitCode, Pid, Process};
use crate::{
prelude::*,
process::{posix_thread::PosixThreadExt, process_table, signal::with_signal_blocked},
thread::thread_table,
process::{
posix_thread::{thread_table, PosixThreadExt},
process_table,
signal::with_signal_blocked,
},
};
// The definition of WaitOptions is from Occlum
@ -85,8 +88,8 @@ pub fn wait_child_exit(
fn reap_zombie_child(process: &Process, pid: Pid) -> ExitCode {
let child_process = process.children().lock().remove(&pid).unwrap();
assert!(child_process.is_zombie());
for thread in &*child_process.threads().lock() {
thread_table::remove_posix_thread(thread.tid());
for task in &*child_process.tasks().lock() {
thread_table::remove_thread(task.tid());
}
// Lock order: session table -> group table -> process table -> group of process

View File

@ -7,8 +7,10 @@ use int_to_c_enum::TryFromInt;
use super::SyscallReturn;
use crate::{
prelude::*,
process::{posix_thread::PosixThreadExt, process_table},
thread::thread_table,
process::{
posix_thread::{thread_table, PosixThreadExt},
process_table,
},
time::{
clockid_t,
clocks::{
@ -136,7 +138,7 @@ pub fn read_clock(clockid: clockid_t, ctx: &Context) -> Result<Duration> {
}
}
DynamicClockIdInfo::Tid(tid, clock_type) => {
let thread = thread_table::get_posix_thread(tid)
let thread = thread_table::get_thread(tid)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "invalid clock ID"))?;
let posix_thread = thread.as_posix_thread().unwrap();
match clock_type {

View File

@ -7,7 +7,7 @@ use super::{
use crate::{
prelude::*,
process::{
posix_thread::PosixThreadExt,
posix_thread::{thread_table, PosixThreadExt},
process_table,
signal::{
c_types::{sigevent_t, SigNotify},
@ -17,10 +17,7 @@ use crate::{
},
},
syscall::ClockId,
thread::{
thread_table,
work_queue::{submit_work_item, work_item::WorkItem},
},
thread::work_queue::{submit_work_item, work_item::WorkItem},
time::{
clockid_t,
clocks::{BootTimeClock, MonotonicClock, RealTimeClock},
@ -76,7 +73,7 @@ pub fn sys_timer_create(
// Send a signal to the specified thread when the timer is expired.
SigNotify::SIGEV_THREAD_ID => {
let tid = sig_event.sigev_un.read_tid() as u32;
let thread = thread_table::get_posix_thread(tid).ok_or_else(|| {
let thread = thread_table::get_thread(tid).ok_or_else(|| {
Error::with_message(Errno::EINVAL, "target thread does not exist")
})?;
let posix_thread = thread.as_posix_thread().unwrap();
@ -132,7 +129,7 @@ pub fn sys_timer_create(
}
}
DynamicClockIdInfo::Tid(tid, clock_type) => {
let thread = thread_table::get_posix_thread(tid)
let thread = thread_table::get_thread(tid)
.ok_or_else(|| Error::with_message(Errno::EINVAL, "invalid clock id"))?;
let posix_thread = thread.as_posix_thread().unwrap();
match clock_type {

View File

@ -2,23 +2,22 @@
use ostd::{
cpu::CpuSet,
task::{Priority, TaskOptions},
task::{Priority, Task, TaskOptions},
};
use super::{status::ThreadStatus, thread_table, Thread};
use super::{status::ThreadStatus, Thread};
use crate::prelude::*;
/// The inner data of a kernel thread
pub struct KernelThread;
pub trait KernelThreadExt {
/// get the kernel_thread structure
/// Gets the kernel_thread structure
fn as_kernel_thread(&self) -> Option<&KernelThread>;
/// create a new kernel thread structure, **NOT** run the thread.
fn new_kernel_thread(thread_options: ThreadOptions) -> Arc<Thread>;
/// create a new kernel thread structure, and then run the thread.
/// Creates a new kernel thread, and then run the thread.
fn spawn_kernel_thread(thread_options: ThreadOptions) -> Arc<Thread> {
let thread = Self::new_kernel_thread(thread_options);
let task = create_new_kernel_task(thread_options);
let thread = Thread::borrow_from_task(&task).clone();
thread.run();
thread
}
@ -31,31 +30,6 @@ impl KernelThreadExt for Thread {
self.data().downcast_ref::<KernelThread>()
}
fn new_kernel_thread(mut thread_options: ThreadOptions) -> Arc<Self> {
let task_fn = thread_options.take_func();
let thread_fn = move || {
task_fn();
let current_thread = current_thread!();
// ensure the thread is exit
current_thread.exit();
thread_table::remove_kernel_thread(current_thread);
};
let thread = Arc::new_cyclic(|thread_ref| {
let weak_thread = thread_ref.clone();
let task = TaskOptions::new(thread_fn)
.data(weak_thread)
.priority(thread_options.priority)
.cpu_affinity(thread_options.cpu_affinity)
.build()
.unwrap();
let status = ThreadStatus::Init;
let kernel_thread = KernelThread;
Thread::new(task, kernel_thread, status)
});
thread_table::add_kernel_thread(thread.clone());
thread
}
fn join(&self) {
loop {
if self.status().is_exited() {
@ -67,6 +41,31 @@ impl KernelThreadExt for Thread {
}
}
/// Creates a new task of kernel thread, **NOT** run the thread.
pub fn create_new_kernel_task(mut thread_options: ThreadOptions) -> Arc<Task> {
let task_fn = thread_options.take_func();
let thread_fn = move || {
task_fn();
// Ensures the thread is exit
current_thread!().exit();
};
Arc::new_cyclic(|weak_task| {
let thread = {
let kernel_thread = KernelThread;
let status = ThreadStatus::Init;
Arc::new(Thread::new(weak_task.clone(), kernel_thread, status))
};
TaskOptions::new(thread_fn)
.data(thread)
.priority(thread_options.priority)
.cpu_affinity(thread_options.cpu_affinity)
.build()
.unwrap()
})
}
/// Options to create or spawn a new thread.
pub struct ThreadOptions {
func: Option<Box<dyn Fn() + Send + Sync>>,

View File

@ -13,7 +13,6 @@ pub mod exception;
pub mod kernel_thread;
pub mod status;
pub mod task;
pub mod thread_table;
pub mod work_queue;
pub type Tid = u32;
@ -22,7 +21,7 @@ pub type Tid = u32;
pub struct Thread {
// immutable part
/// Low-level info
task: Arc<Task>,
task: Weak<Task>,
/// Data: Posix thread info/Kernel thread Info
data: Box<dyn Send + Sync + Any>,
@ -32,7 +31,7 @@ pub struct Thread {
impl Thread {
/// Never call these function directly
pub fn new(task: Arc<Task>, data: impl Send + Sync + Any, status: ThreadStatus) -> Self {
pub fn new(task: Weak<Task>, data: impl Send + Sync + Any, status: ThreadStatus) -> Self {
Thread {
task,
data: Box::new(data),
@ -47,18 +46,23 @@ impl Thread {
pub fn current() -> Option<Arc<Self>> {
Task::current()?
.data()
.downcast_ref::<Weak<Thread>>()?
.upgrade()
.downcast_ref::<Arc<Thread>>()
.cloned()
}
pub(in crate::thread) fn task(&self) -> &Arc<Task> {
&self.task
/// Gets the Thread from task's data.
///
/// # Panics
///
/// This method panics if the task is not a thread.
pub fn borrow_from_task(task: &Arc<Task>) -> &Arc<Self> {
task.data().downcast_ref::<Arc<Thread>>().unwrap()
}
/// Runs this thread at once.
pub fn run(&self) {
self.set_status(ThreadStatus::Running);
self.task.run();
self.task.upgrade().unwrap().run();
}
pub(super) fn exit(&self) {

View File

@ -16,12 +16,12 @@ use crate::{
};
/// create new task with userspace and parent process
pub fn create_new_user_task(user_space: Arc<UserSpace>, thread_ref: Weak<Thread>) -> Arc<Task> {
pub fn create_new_user_task(user_space: Arc<UserSpace>, thread_ref: Arc<Thread>) -> Task {
fn user_task_entry() {
let current_thread = current_thread!();
let current_posix_thread = current_thread.as_posix_thread().unwrap();
let current_process = current_posix_thread.process();
let current_task = current_thread.task();
let current_task = Task::current().unwrap();
let user_space = current_task
.user_space()

View File

@ -1,30 +0,0 @@
// SPDX-License-Identifier: MPL-2.0
use keyable_arc::KeyableArc;
use super::{Thread, Tid};
use crate::{prelude::*, process::posix_thread::PosixThreadExt};
static POSIX_THREAD_TABLE: SpinLock<BTreeMap<Tid, Arc<Thread>>> = SpinLock::new(BTreeMap::new());
static KERNEL_THREAD_TABLE: SpinLock<BTreeSet<KeyableArc<Thread>>> = SpinLock::new(BTreeSet::new());
pub fn add_posix_thread(tid: Tid, thread: Arc<Thread>) {
debug_assert_eq!(tid, thread.tid());
POSIX_THREAD_TABLE.lock().insert(tid, thread);
}
pub fn remove_posix_thread(tid: Tid) {
POSIX_THREAD_TABLE.lock().remove(&tid);
}
pub fn get_posix_thread(tid: Tid) -> Option<Arc<Thread>> {
POSIX_THREAD_TABLE.lock().get(&tid).cloned()
}
pub(super) fn add_kernel_thread(thread: Arc<Thread>) {
KERNEL_THREAD_TABLE.lock().insert(KeyableArc::from(thread));
}
pub(super) fn remove_kernel_thread(thread: Arc<Thread>) {
KERNEL_THREAD_TABLE.lock().remove(&KeyableArc::from(thread));
}

View File

@ -2,12 +2,15 @@
#![allow(dead_code)]
use ostd::{cpu::CpuSet, task::Priority};
use ostd::{
cpu::CpuSet,
task::{Priority, Task},
};
use super::worker_pool::WorkerPool;
use crate::{
prelude::*,
thread::kernel_thread::{KernelThreadExt, ThreadOptions},
thread::kernel_thread::{create_new_kernel_task, ThreadOptions},
Thread,
};
@ -17,7 +20,7 @@ use crate::{
/// added to the `WorkerPool`.
pub(super) struct Worker {
worker_pool: Weak<WorkerPool>,
bound_thread: Arc<Thread>,
bound_task: Arc<Task>,
bound_cpu: u32,
inner: SpinLock<WorkerInner>,
}
@ -51,14 +54,14 @@ impl Worker {
if worker_pool.upgrade().unwrap().is_high_priority() {
priority = Priority::high();
}
let bound_thread = Thread::new_kernel_thread(
let bound_task = create_new_kernel_task(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(priority),
);
Self {
worker_pool,
bound_thread,
bound_task,
bound_cpu,
inner: SpinLock::new(WorkerInner {
worker_status: WorkerStatus::Running,
@ -68,7 +71,8 @@ impl Worker {
}
pub(super) fn run(&self) {
self.bound_thread.run();
let thread = Thread::borrow_from_task(&self.bound_task);
thread.run();
}
/// The thread function bound to normal workers.
@ -97,8 +101,8 @@ impl Worker {
self.exit();
}
pub(super) fn bound_thread(&self) -> &Arc<Thread> {
&self.bound_thread
pub(super) fn bound_task(&self) -> &Arc<Task> {
&self.bound_task
}
pub(super) fn is_idle(&self) -> bool {

View File

@ -7,12 +7,16 @@ use core::{
time::Duration,
};
use ostd::{cpu::CpuSet, sync::WaitQueue, task::Priority};
use ostd::{
cpu::CpuSet,
sync::WaitQueue,
task::{Priority, Task},
};
use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue};
use crate::{
prelude::*,
thread::kernel_thread::{KernelThreadExt, ThreadOptions},
thread::kernel_thread::{create_new_kernel_task, ThreadOptions},
Thread,
};
@ -60,7 +64,7 @@ pub trait WorkerScheduler: Sync + Send {
/// are found processing in the pool.
pub struct Monitor {
worker_pool: Weak<WorkerPool>,
bound_thread: Arc<Thread>,
bound_task: Arc<Task>,
}
impl LocalWorkerPool {
@ -77,7 +81,7 @@ impl LocalWorkerPool {
fn add_worker(&self) {
let worker = Worker::new(self.parent.clone(), self.cpu_id);
self.workers.disable_irq().lock().push_back(worker.clone());
worker.bound_thread().run();
Thread::borrow_from_task(worker.bound_task()).run();
}
fn remove_worker(&self) {
@ -236,20 +240,20 @@ impl Monitor {
WorkPriority::High => Priority::high(),
WorkPriority::Normal => Priority::normal(),
};
let bound_thread = Thread::new_kernel_thread(
let bound_task = create_new_kernel_task(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(priority),
);
Self {
worker_pool,
bound_thread,
bound_task,
}
})
}
pub fn run(&self) {
self.bound_thread.run();
Thread::borrow_from_task(&self.bound_task).run()
}
fn run_monitor_loop(self: &Arc<Self>) {

View File

@ -102,11 +102,13 @@ fn create_user_task(user_space: Arc<UserSpace>) -> Arc<Task> {
// Kernel tasks are managed by the Framework,
// while scheduling algorithms for them can be
// determined by the users of the Framework.
Arc::new(
TaskOptions::new(user_task)
.user_space(Some(user_space))
.data(0)
.build()
.unwrap()
.unwrap(),
)
}
fn handle_syscall(user_context: &mut UserContext, user_space: &UserSpace) {

View File

@ -166,7 +166,7 @@ impl TaskOptions {
}
/// Builds a new task without running it immediately.
pub fn build(self) -> Result<Arc<Task>> {
pub fn build(self) -> Result<Task> {
/// all task will entering this function
/// this function is mean to executing the task_fn in Task
extern "C" fn kernel_task_entry() {
@ -201,12 +201,12 @@ impl TaskOptions {
// have any arguments, so we only need to align the stack pointer to 16 bytes.
ctx.set_stack_pointer(crate::mm::paddr_to_vaddr(new_task.kstack.end_paddr() - 16));
Ok(Arc::new(new_task))
Ok(new_task)
}
/// Builds a new task and run it immediately.
pub fn spawn(self) -> Result<Arc<Task>> {
let task = self.build()?;
let task = Arc::new(self.build()?);
task.run();
Ok(task)
}
@ -237,11 +237,13 @@ mod test {
let task = || {
assert_eq!(1, 1);
};
let task_option = crate::task::TaskOptions::new(task)
let task = Arc::new(
crate::task::TaskOptions::new(task)
.data(())
.build()
.unwrap();
task_option.run();
.unwrap(),
);
task.run();
}
#[ktest]