diff --git a/framework/jinux-frame/src/config.rs b/framework/jinux-frame/src/config.rs index 557a84e9b..416ade6c8 100644 --- a/framework/jinux-frame/src/config.rs +++ b/framework/jinux-frame/src/config.rs @@ -19,3 +19,5 @@ pub const KVA_START: usize = (usize::MAX) << PAGE_SIZE_BITS; pub const DEFAULT_LOG_LEVEL: Level = Level::Error; /// This value represent the base timer frequency in Hz pub const TIMER_FREQ: u64 = 500; + +pub const REAL_TIME_TASK_PRI: u16 = 100; diff --git a/framework/jinux-frame/src/task/mod.rs b/framework/jinux-frame/src/task/mod.rs index d172bbe8c..61a92a18c 100644 --- a/framework/jinux-frame/src/task/mod.rs +++ b/framework/jinux-frame/src/task/mod.rs @@ -1,10 +1,11 @@ //! Tasks are the unit of code execution. +mod priority; mod processor; mod scheduler; #[allow(clippy::module_inception)] mod task; -pub use self::processor::{current_task, disable_preempt, schedule, DisablePreemptGuard}; +pub use self::processor::{current_task, disable_preempt, preempt, schedule, DisablePreemptGuard}; pub use self::scheduler::{add_task, set_scheduler, Scheduler}; -pub use self::task::{Task, TaskAdapter, TaskStatus}; +pub use self::task::{Task, TaskAdapter, TaskOptions, TaskStatus}; diff --git a/framework/jinux-frame/src/task/priority.rs b/framework/jinux-frame/src/task/priority.rs new file mode 100644 index 000000000..6a577c4a3 --- /dev/null +++ b/framework/jinux-frame/src/task/priority.rs @@ -0,0 +1,47 @@ +use crate::config::REAL_TIME_TASK_PRI; + +/// The priority of a task. +/// Similar to Linux, a larger value represents a lower priority, +/// with a range of 0 to 139. Priorities ranging from 0 to 99 are considered real-time, +/// while those ranging from 100 to 139 are considered normal. +#[derive(Copy, Clone)] +pub struct Priority(u16); + +impl Priority { + pub const fn new(val: u16) -> Self { + assert!(val <= 139); + Self(val) + } + + pub const fn lowest() -> Self { + Self::new(139) + } + + pub const fn low() -> Self { + Self::new(110) + } + + pub const fn normal() -> Self { + Self::new(100) + } + + pub const fn high() -> Self { + Self::new(10) + } + + pub const fn highest() -> Self { + Self::new(0) + } + + pub const fn set(&mut self, val: u16) { + self.0 = val; + } + + pub const fn get(self) -> u16 { + self.0 + } + + pub const fn is_real_time(&self) -> bool { + self.0 < REAL_TIME_TASK_PRI + } +} diff --git a/framework/jinux-frame/src/task/processor.rs b/framework/jinux-frame/src/task/processor.rs index dfdef5ac1..baf548bf9 100644 --- a/framework/jinux-frame/src/task/processor.rs +++ b/framework/jinux-frame/src/task/processor.rs @@ -1,8 +1,8 @@ use core::sync::atomic::AtomicUsize; -use crate::cpu::CpuLocal; use crate::cpu_local; use crate::sync::Mutex; +use crate::{cpu::CpuLocal, trap::disable_local}; use core::sync::atomic::Ordering::Relaxed; @@ -63,6 +63,23 @@ pub fn schedule() { } } +pub fn preempt() { + // disable interrupts to avoid nested preemption. + let disable_irq = disable_local(); + let Some(curr_task) = current_task() else { + return; + }; + let mut scheduler = GLOBAL_SCHEDULER.lock_irq_disabled(); + if !scheduler.should_preempt(&curr_task) { + return; + } + let Some(next_task) = scheduler.dequeue() else { + return; + }; + drop(scheduler); + switch_to_task(next_task); +} + /// call this function to switch to other task /// /// if current task is none, then it will use the default task context and it will not return to this function again diff --git a/framework/jinux-frame/src/task/scheduler.rs b/framework/jinux-frame/src/task/scheduler.rs index 78c07a988..40185cffd 100644 --- a/framework/jinux-frame/src/task/scheduler.rs +++ b/framework/jinux-frame/src/task/scheduler.rs @@ -17,6 +17,9 @@ pub trait Scheduler: Sync + Send { fn enqueue(&self, task: Arc); fn dequeue(&self) -> Option>; + + /// Tells whether the given task should be preempted by other tasks in the queue. + fn should_preempt(&self, task: &Arc) -> bool; } pub struct GlobalScheduler { @@ -38,6 +41,10 @@ impl GlobalScheduler { pub fn enqueue(&mut self, task: Arc) { self.scheduler.unwrap().enqueue(task) } + + pub fn should_preempt(&self, task: &Arc) -> bool { + self.scheduler.unwrap().should_preempt(task) + } } /// Set the global task scheduler. /// diff --git a/framework/jinux-frame/src/task/task.rs b/framework/jinux-frame/src/task/task.rs index e6196c091..0aa312386 100644 --- a/framework/jinux-frame/src/task/task.rs +++ b/framework/jinux-frame/src/task/task.rs @@ -1,14 +1,14 @@ -use spin::{Mutex, MutexGuard}; - use crate::config::{KERNEL_STACK_SIZE, PAGE_SIZE}; use crate::prelude::*; use crate::task::processor::switch_to_task; use crate::user::UserSpace; use crate::vm::{VmAllocOptions, VmFrameVec}; +use spin::{Mutex, MutexGuard}; use intrusive_collections::intrusive_adapter; use intrusive_collections::LinkedListAtomicLink; +use super::priority::Priority; use super::processor::{current_task, schedule}; core::arch::global_asm!(include_str!("switch.S")); @@ -64,6 +64,7 @@ pub struct Task { /// kernel stack, note that the top is SyscallFrame/TrapFrame kstack: KernelStack, link: LinkedListAtomicLink, + priority: Priority, } // TaskAdapter struct is implemented for building relationships between doubly linked list and Task struct @@ -98,89 +99,6 @@ impl Task { schedule(); } - /// Spawns a task that executes a function. - /// - /// Each task is associated with a per-task data and an optional user space. - /// If having a user space, then the task can switch to the user space to - /// execute user code. Multiple tasks can share a single user space. - pub fn spawn( - task_fn: F, - task_data: T, - user_space: Option>, - ) -> Result> - where - F: Fn() + Send + Sync + 'static, - T: Any + Send + Sync, - { - /// all task will entering this function - /// this function is mean to executing the task_fn in Task - fn kernel_task_entry() { - let current_task = current_task() - .expect("no current task, it should have current task in kernel task entry"); - current_task.func.call(()); - current_task.exit(); - } - let result = Self { - func: Box::new(task_fn), - data: Box::new(task_data), - user_space, - task_inner: Mutex::new(TaskInner { - task_status: TaskStatus::Runnable, - ctx: TaskContext::default(), - }), - exit_code: 0, - kstack: KernelStack::new()?, - link: LinkedListAtomicLink::new(), - }; - - result.task_inner.lock().task_status = TaskStatus::Runnable; - result.task_inner.lock().ctx.rip = kernel_task_entry as usize; - result.task_inner.lock().ctx.regs.rsp = - (crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64; - - let arc_self = Arc::new(result); - switch_to_task(arc_self.clone()); - Ok(arc_self) - } - - pub fn new( - task_fn: F, - task_data: T, - user_space: Option>, - ) -> Result> - where - F: Fn() + Send + Sync + 'static, - T: Any + Send + Sync, - { - /// all task will entering this function - /// this function is mean to executing the task_fn in Task - fn kernel_task_entry() { - let current_task = current_task() - .expect("no current task, it should have current task in kernel task entry"); - current_task.func.call(()); - current_task.exit(); - } - let result = Self { - func: Box::new(task_fn), - data: Box::new(task_data), - user_space, - task_inner: Mutex::new(TaskInner { - task_status: TaskStatus::Runnable, - ctx: TaskContext::default(), - }), - exit_code: 0, - kstack: KernelStack::new()?, - link: LinkedListAtomicLink::new(), - }; - - result.task_inner.lock().task_status = TaskStatus::Runnable; - result.task_inner.lock().ctx.rip = kernel_task_entry as usize; - result.task_inner.lock().ctx.regs.rsp = - (crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64; - - Ok(Arc::new(result)) - } - pub fn run(self: &Arc) { switch_to_task(self.clone()); } @@ -209,6 +127,10 @@ impl Task { schedule(); unreachable!() } + + pub fn is_real_time(&self) -> bool { + self.priority.is_real_time() + } } #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -221,3 +143,125 @@ pub enum TaskStatus { /// The task has exited. Exited, } + +/// Options to create or spawn a new task. +pub struct TaskOptions { + func: Option>, + data: Option>, + user_space: Option>, + priority: Priority, +} + +impl TaskOptions { + /// Creates a set of options for a task. + pub fn new(func: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + Self { + func: Some(Box::new(func)), + data: None, + user_space: None, + priority: Priority::normal(), + } + } + + pub fn func(mut self, func: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + self.func = Some(Box::new(func)); + self + } + + pub fn data(mut self, data: T) -> Self + where + T: Any + Send + Sync, + { + self.data = Some(Box::new(data)); + self + } + + /// Sets the user space associated with the task. + pub fn user_space(mut self, user_space: Option>) -> Self { + self.user_space = user_space; + self + } + + /// Sets the priority of the task. + pub fn priority(mut self, priority: Priority) -> Self { + self.priority = priority; + self + } + + /// Builds a new task but not run it immediately. + pub fn build(self) -> Result> { + /// all task will entering this function + /// this function is mean to executing the task_fn in Task + fn kernel_task_entry() { + let current_task = current_task() + .expect("no current task, it should have current task in kernel task entry"); + current_task.func.call(()); + current_task.exit(); + } + let result = Task { + func: self.func.unwrap(), + data: self.data.unwrap(), + user_space: self.user_space, + task_inner: Mutex::new(TaskInner { + task_status: TaskStatus::Runnable, + ctx: TaskContext::default(), + }), + exit_code: 0, + kstack: KernelStack::new()?, + link: LinkedListAtomicLink::new(), + priority: self.priority, + //cpu_affinity: task_attrs.cpu_affinity, + }; + + result.task_inner.lock().task_status = TaskStatus::Runnable; + result.task_inner.lock().ctx.rip = kernel_task_entry as usize; + result.task_inner.lock().ctx.regs.rsp = + (crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64; + + Ok(Arc::new(result)) + } + + /// Builds a new task and run it immediately. + /// + /// Each task is associated with a per-task data and an optional user space. + /// If having a user space, then the task can switch to the user space to + /// execute user code. Multiple tasks can share a single user space. + pub fn spawn(self) -> Result> { + /// all task will entering this function + /// this function is mean to executing the task_fn in Task + fn kernel_task_entry() { + let current_task = current_task() + .expect("no current task, it should have current task in kernel task entry"); + current_task.func.call(()); + current_task.exit(); + } + let result = Task { + func: self.func.unwrap(), + data: self.data.unwrap(), + user_space: self.user_space, + task_inner: Mutex::new(TaskInner { + task_status: TaskStatus::Runnable, + ctx: TaskContext::default(), + }), + exit_code: 0, + kstack: KernelStack::new()?, + link: LinkedListAtomicLink::new(), + priority: self.priority, + }; + + result.task_inner.lock().task_status = TaskStatus::Runnable; + result.task_inner.lock().ctx.rip = kernel_task_entry as usize; + result.task_inner.lock().ctx.regs.rsp = + (crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64; + + let arc_self = Arc::new(result); + switch_to_task(arc_self.clone()); + Ok(arc_self) + } +} diff --git a/services/libs/jinux-std/src/lib.rs b/services/libs/jinux-std/src/lib.rs index b9e4ad2ca..649ff5445 100644 --- a/services/libs/jinux-std/src/lib.rs +++ b/services/libs/jinux-std/src/lib.rs @@ -22,7 +22,10 @@ use crate::{ prelude::*, - thread::{kernel_thread::KernelThreadExt, Thread}, + thread::{ + kernel_thread::{KernelThreadExt, ThreadOptions}, + Thread, + }, }; use jinux_frame::{boot, exit_qemu, QemuExitCode}; use process::Process; @@ -40,6 +43,7 @@ pub mod fs; pub mod net; pub mod prelude; mod process; +mod sched; pub mod syscall; pub mod thread; pub mod time; @@ -49,7 +53,7 @@ pub mod vm; pub fn init() { driver::init(); net::init(); - process::fifo_scheduler::init(); + sched::init(); fs::rootfs::init(boot::initramfs()).unwrap(); device::init().unwrap(); } @@ -61,12 +65,12 @@ fn init_thread() { ); net::lazy_init(); // driver::pci::virtio::block::block_device_test(); - let thread = Thread::spawn_kernel_thread(|| { + let thread = Thread::spawn_kernel_thread(ThreadOptions::new(|| { println!("[kernel] Hello world from kernel!"); let current = current_thread!(); let tid = current.tid(); debug!("current tid = {}", tid); - }); + })); thread.join(); info!( "[jinux-std/lib.rs] spawn kernel thread, tid = {}", @@ -103,7 +107,7 @@ fn init_thread() { /// first process never return #[controlled] pub fn run_first_process() -> ! { - Thread::spawn_kernel_thread(init_thread); + Thread::spawn_kernel_thread(ThreadOptions::new(init_thread)); unreachable!() } diff --git a/services/libs/jinux-std/src/net/iface/util.rs b/services/libs/jinux-std/src/net/iface/util.rs index d0e950d8e..a12269674 100644 --- a/services/libs/jinux-std/src/net/iface/util.rs +++ b/services/libs/jinux-std/src/net/iface/util.rs @@ -2,7 +2,10 @@ use jinux_frame::timer::read_monotonic_milli_seconds; use crate::{ prelude::*, - thread::{kernel_thread::KernelThreadExt, Thread}, + thread::{ + kernel_thread::{KernelThreadExt, ThreadOptions}, + Thread, + }, }; use super::Iface; @@ -67,5 +70,5 @@ pub fn spawn_background_poll_thread(iface: Arc) { } } }; - Thread::spawn_kernel_thread(task_fn); + Thread::spawn_kernel_thread(ThreadOptions::new(task_fn)); } diff --git a/services/libs/jinux-std/src/process/fifo_scheduler.rs b/services/libs/jinux-std/src/process/fifo_scheduler.rs deleted file mode 100644 index cf3a63542..000000000 --- a/services/libs/jinux-std/src/process/fifo_scheduler.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::prelude::*; -use jinux_frame::task::{set_scheduler, Scheduler, Task, TaskAdapter}; - -use intrusive_collections::LinkedList; - -pub struct FifoScheduler { - tasks: SpinLock>, -} - -impl FifoScheduler { - pub fn new() -> Self { - Self { - tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())), - } - } -} - -impl Scheduler for FifoScheduler { - fn enqueue(&self, task: Arc) { - self.tasks.lock_irq_disabled().push_back(task.clone()); - } - - fn dequeue(&self) -> Option> { - self.tasks.lock_irq_disabled().pop_front() - } -} - -pub fn init() { - let fifo_scheduler = Box::new(FifoScheduler::new()); - let scheduler = Box::::leak(fifo_scheduler); - set_scheduler(scheduler); -} diff --git a/services/libs/jinux-std/src/process/mod.rs b/services/libs/jinux-std/src/process/mod.rs index de1452817..b467a525a 100644 --- a/services/libs/jinux-std/src/process/mod.rs +++ b/services/libs/jinux-std/src/process/mod.rs @@ -1,6 +1,5 @@ mod clone; mod exit; -pub mod fifo_scheduler; pub mod posix_thread; #[allow(clippy::module_inception)] mod process; diff --git a/services/libs/jinux-std/src/sched/mod.rs b/services/libs/jinux-std/src/sched/mod.rs new file mode 100644 index 000000000..b6cf2d58e --- /dev/null +++ b/services/libs/jinux-std/src/sched/mod.rs @@ -0,0 +1,5 @@ +mod priority_scheduler; + +// There may be multiple scheduling policies in the system, +// and subsequent schedulers can be placed under this module. +pub use self::priority_scheduler::init; diff --git a/services/libs/jinux-std/src/sched/priority_scheduler.rs b/services/libs/jinux-std/src/sched/priority_scheduler.rs new file mode 100644 index 000000000..061b406ae --- /dev/null +++ b/services/libs/jinux-std/src/sched/priority_scheduler.rs @@ -0,0 +1,57 @@ +use crate::prelude::*; +use intrusive_collections::LinkedList; +use jinux_frame::task::{set_scheduler, Scheduler, Task, TaskAdapter}; + +pub fn init() { + let preempt_scheduler = Box::new(PreemptScheduler::new()); + let scheduler = Box::::leak(preempt_scheduler); + set_scheduler(scheduler); +} + +/// The preempt scheduler +/// +/// Real-time tasks are placed in the `real_time_tasks` queue and +/// are always prioritized during scheduling. +/// Normal tasks are placed in the `normal_tasks` queue and are only +/// scheduled for execution when there are no real-time tasks. +struct PreemptScheduler { + /// Tasks with a priority of less than 100 are regarded as real-time tasks. + real_time_tasks: SpinLock>, + /// Tasks with a priority greater than or equal to 100 are regarded as normal tasks. + normal_tasks: SpinLock>, +} + +impl PreemptScheduler { + pub fn new() -> Self { + Self { + real_time_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())), + normal_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())), + } + } +} + +impl Scheduler for PreemptScheduler { + fn enqueue(&self, task: Arc) { + if task.is_real_time() { + self.real_time_tasks + .lock_irq_disabled() + .push_back(task.clone()); + } else { + self.normal_tasks + .lock_irq_disabled() + .push_back(task.clone()); + } + } + + fn dequeue(&self) -> Option> { + if !self.real_time_tasks.lock_irq_disabled().is_empty() { + self.real_time_tasks.lock_irq_disabled().pop_front() + } else { + self.normal_tasks.lock_irq_disabled().pop_front() + } + } + + fn should_preempt(&self, task: &Arc) -> bool { + !task.is_real_time() && !self.real_time_tasks.lock_irq_disabled().is_empty() + } +} diff --git a/services/libs/jinux-std/src/thread/kernel_thread.rs b/services/libs/jinux-std/src/thread/kernel_thread.rs index 71ddde4b5..ef9c81b66 100644 --- a/services/libs/jinux-std/src/thread/kernel_thread.rs +++ b/services/libs/jinux-std/src/thread/kernel_thread.rs @@ -1,4 +1,4 @@ -use jinux_frame::task::Task; +use jinux_frame::task::TaskOptions; use crate::prelude::*; @@ -11,15 +11,10 @@ pub trait KernelThreadExt { /// get the kernel_thread structure fn as_kernel_thread(&self) -> Option<&KernelThread>; /// create a new kernel thread structure, **NOT** run the thread. - fn new_kernel_thread(task_fn: F) -> Arc - where - F: Fn() + Send + Sync + 'static; + fn new_kernel_thread(thread_options: ThreadOptions) -> Arc; /// create a new kernel thread structure, and then run the thread. - fn spawn_kernel_thread(task_fn: F) -> Arc - where - F: Fn() + Send + Sync + 'static, - { - let thread = Self::new_kernel_thread(task_fn); + fn spawn_kernel_thread(thread_options: ThreadOptions) -> Arc { + let thread = Self::new_kernel_thread(thread_options); thread.run(); thread } @@ -32,10 +27,8 @@ impl KernelThreadExt for Thread { self.data().downcast_ref::() } - fn new_kernel_thread(task_fn: F) -> Arc - where - F: Fn() + Send + Sync + 'static, - { + fn new_kernel_thread(mut thread_options: ThreadOptions) -> Arc { + let task_fn = thread_options.take_func(); let thread_fn = move || { task_fn(); let current_thread = current_thread!(); @@ -44,7 +37,11 @@ impl KernelThreadExt for Thread { }; let tid = allocate_tid(); let thread = Arc::new_cyclic(|thread_ref| { - let task = Task::new(thread_fn, thread_ref.clone(), None).unwrap(); + let weal_thread = thread_ref.clone(); + let task = TaskOptions::new(thread_fn) + .data(weal_thread) + .build() + .unwrap(); let status = ThreadStatus::Init; let kernel_thread = KernelThread; Thread::new(tid, task, kernel_thread, status) @@ -65,3 +62,38 @@ impl KernelThreadExt for Thread { } } } + +/// Options to create or spawn a new thread. +pub struct ThreadOptions { + func: Option>, + priority: u16, +} + +impl ThreadOptions { + pub fn new(func: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + Self { + func: Some(Box::new(func)), + priority: 100, + } + } + + pub fn func(mut self, func: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + self.func = Some(Box::new(func)); + self + } + + fn take_func(&mut self) -> Box { + self.func.take().unwrap() + } + + pub fn priority(mut self, priority: u16) -> Self { + self.priority = priority; + self + } +} diff --git a/services/libs/jinux-std/src/thread/task.rs b/services/libs/jinux-std/src/thread/task.rs index 8e08d9699..e5e7a567f 100644 --- a/services/libs/jinux-std/src/thread/task.rs +++ b/services/libs/jinux-std/src/thread/task.rs @@ -1,6 +1,6 @@ use jinux_frame::{ cpu::UserContext, - task::Task, + task::{preempt, Task, TaskOptions}, user::{UserContextApi, UserEvent, UserMode, UserSpace}, }; @@ -50,13 +50,19 @@ pub fn create_new_user_task(user_space: Arc, thread_ref: Weak debug!("{} is suspended.", current_thread.tid()); handle_pending_signal(context).unwrap(); } + // a preemption point after handling user event. + preempt(); } debug!("exit user loop"); // FIXME: This is a work around: exit in kernel task entry may be not called. Why this will happen? Task::current().exit(); } - Task::new(user_task_entry, thread_ref, Some(user_space)).expect("spawn task failed") + TaskOptions::new(user_task_entry) + .data(thread_ref) + .user_space(Some(user_space)) + .build() + .expect("spawn task failed") } fn handle_user_event(user_event: UserEvent, context: &mut UserContext) {