Add basic preemptive scheduling

This commit is contained in:
Chuandong Li
2023-09-28 05:34:06 +00:00
committed by Tate, Hongliang Tian
parent baeaa9b4d3
commit e2b4302620
14 changed files with 336 additions and 144 deletions

View File

@ -19,3 +19,5 @@ pub const KVA_START: usize = (usize::MAX) << PAGE_SIZE_BITS;
pub const DEFAULT_LOG_LEVEL: Level = Level::Error;
/// This value represent the base timer frequency in Hz
pub const TIMER_FREQ: u64 = 500;
pub const REAL_TIME_TASK_PRI: u16 = 100;

View File

@ -1,10 +1,11 @@
//! Tasks are the unit of code execution.
mod priority;
mod processor;
mod scheduler;
#[allow(clippy::module_inception)]
mod task;
pub use self::processor::{current_task, disable_preempt, schedule, DisablePreemptGuard};
pub use self::processor::{current_task, disable_preempt, preempt, schedule, DisablePreemptGuard};
pub use self::scheduler::{add_task, set_scheduler, Scheduler};
pub use self::task::{Task, TaskAdapter, TaskStatus};
pub use self::task::{Task, TaskAdapter, TaskOptions, TaskStatus};

View File

@ -0,0 +1,47 @@
use crate::config::REAL_TIME_TASK_PRI;
/// The priority of a task.
/// Similar to Linux, a larger value represents a lower priority,
/// with a range of 0 to 139. Priorities ranging from 0 to 99 are considered real-time,
/// while those ranging from 100 to 139 are considered normal.
#[derive(Copy, Clone)]
pub struct Priority(u16);
impl Priority {
pub const fn new(val: u16) -> Self {
assert!(val <= 139);
Self(val)
}
pub const fn lowest() -> Self {
Self::new(139)
}
pub const fn low() -> Self {
Self::new(110)
}
pub const fn normal() -> Self {
Self::new(100)
}
pub const fn high() -> Self {
Self::new(10)
}
pub const fn highest() -> Self {
Self::new(0)
}
pub const fn set(&mut self, val: u16) {
self.0 = val;
}
pub const fn get(self) -> u16 {
self.0
}
pub const fn is_real_time(&self) -> bool {
self.0 < REAL_TIME_TASK_PRI
}
}

View File

@ -1,8 +1,8 @@
use core::sync::atomic::AtomicUsize;
use crate::cpu::CpuLocal;
use crate::cpu_local;
use crate::sync::Mutex;
use crate::{cpu::CpuLocal, trap::disable_local};
use core::sync::atomic::Ordering::Relaxed;
@ -63,6 +63,23 @@ pub fn schedule() {
}
}
pub fn preempt() {
// disable interrupts to avoid nested preemption.
let disable_irq = disable_local();
let Some(curr_task) = current_task() else {
return;
};
let mut scheduler = GLOBAL_SCHEDULER.lock_irq_disabled();
if !scheduler.should_preempt(&curr_task) {
return;
}
let Some(next_task) = scheduler.dequeue() else {
return;
};
drop(scheduler);
switch_to_task(next_task);
}
/// call this function to switch to other task
///
/// if current task is none, then it will use the default task context and it will not return to this function again

View File

@ -17,6 +17,9 @@ pub trait Scheduler: Sync + Send {
fn enqueue(&self, task: Arc<Task>);
fn dequeue(&self) -> Option<Arc<Task>>;
/// Tells whether the given task should be preempted by other tasks in the queue.
fn should_preempt(&self, task: &Arc<Task>) -> bool;
}
pub struct GlobalScheduler {
@ -38,6 +41,10 @@ impl GlobalScheduler {
pub fn enqueue(&mut self, task: Arc<Task>) {
self.scheduler.unwrap().enqueue(task)
}
pub fn should_preempt(&self, task: &Arc<Task>) -> bool {
self.scheduler.unwrap().should_preempt(task)
}
}
/// Set the global task scheduler.
///

View File

@ -1,14 +1,14 @@
use spin::{Mutex, MutexGuard};
use crate::config::{KERNEL_STACK_SIZE, PAGE_SIZE};
use crate::prelude::*;
use crate::task::processor::switch_to_task;
use crate::user::UserSpace;
use crate::vm::{VmAllocOptions, VmFrameVec};
use spin::{Mutex, MutexGuard};
use intrusive_collections::intrusive_adapter;
use intrusive_collections::LinkedListAtomicLink;
use super::priority::Priority;
use super::processor::{current_task, schedule};
core::arch::global_asm!(include_str!("switch.S"));
@ -64,6 +64,7 @@ pub struct Task {
/// kernel stack, note that the top is SyscallFrame/TrapFrame
kstack: KernelStack,
link: LinkedListAtomicLink,
priority: Priority,
}
// TaskAdapter struct is implemented for building relationships between doubly linked list and Task struct
@ -98,89 +99,6 @@ impl Task {
schedule();
}
/// Spawns a task that executes a function.
///
/// Each task is associated with a per-task data and an optional user space.
/// If having a user space, then the task can switch to the user space to
/// execute user code. Multiple tasks can share a single user space.
pub fn spawn<F, T>(
task_fn: F,
task_data: T,
user_space: Option<Arc<UserSpace>>,
) -> Result<Arc<Self>>
where
F: Fn() + Send + Sync + 'static,
T: Any + Send + Sync,
{
/// all task will entering this function
/// this function is mean to executing the task_fn in Task
fn kernel_task_entry() {
let current_task = current_task()
.expect("no current task, it should have current task in kernel task entry");
current_task.func.call(());
current_task.exit();
}
let result = Self {
func: Box::new(task_fn),
data: Box::new(task_data),
user_space,
task_inner: Mutex::new(TaskInner {
task_status: TaskStatus::Runnable,
ctx: TaskContext::default(),
}),
exit_code: 0,
kstack: KernelStack::new()?,
link: LinkedListAtomicLink::new(),
};
result.task_inner.lock().task_status = TaskStatus::Runnable;
result.task_inner.lock().ctx.rip = kernel_task_entry as usize;
result.task_inner.lock().ctx.regs.rsp =
(crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64;
let arc_self = Arc::new(result);
switch_to_task(arc_self.clone());
Ok(arc_self)
}
pub fn new<F, T>(
task_fn: F,
task_data: T,
user_space: Option<Arc<UserSpace>>,
) -> Result<Arc<Self>>
where
F: Fn() + Send + Sync + 'static,
T: Any + Send + Sync,
{
/// all task will entering this function
/// this function is mean to executing the task_fn in Task
fn kernel_task_entry() {
let current_task = current_task()
.expect("no current task, it should have current task in kernel task entry");
current_task.func.call(());
current_task.exit();
}
let result = Self {
func: Box::new(task_fn),
data: Box::new(task_data),
user_space,
task_inner: Mutex::new(TaskInner {
task_status: TaskStatus::Runnable,
ctx: TaskContext::default(),
}),
exit_code: 0,
kstack: KernelStack::new()?,
link: LinkedListAtomicLink::new(),
};
result.task_inner.lock().task_status = TaskStatus::Runnable;
result.task_inner.lock().ctx.rip = kernel_task_entry as usize;
result.task_inner.lock().ctx.regs.rsp =
(crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64;
Ok(Arc::new(result))
}
pub fn run(self: &Arc<Self>) {
switch_to_task(self.clone());
}
@ -209,6 +127,10 @@ impl Task {
schedule();
unreachable!()
}
pub fn is_real_time(&self) -> bool {
self.priority.is_real_time()
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
@ -221,3 +143,125 @@ pub enum TaskStatus {
/// The task has exited.
Exited,
}
/// Options to create or spawn a new task.
pub struct TaskOptions {
func: Option<Box<dyn Fn() + Send + Sync>>,
data: Option<Box<dyn Any + Send + Sync>>,
user_space: Option<Arc<UserSpace>>,
priority: Priority,
}
impl TaskOptions {
/// Creates a set of options for a task.
pub fn new<F>(func: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Self {
func: Some(Box::new(func)),
data: None,
user_space: None,
priority: Priority::normal(),
}
}
pub fn func<F>(mut self, func: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.func = Some(Box::new(func));
self
}
pub fn data<T>(mut self, data: T) -> Self
where
T: Any + Send + Sync,
{
self.data = Some(Box::new(data));
self
}
/// Sets the user space associated with the task.
pub fn user_space(mut self, user_space: Option<Arc<UserSpace>>) -> Self {
self.user_space = user_space;
self
}
/// Sets the priority of the task.
pub fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}
/// Builds a new task but not run it immediately.
pub fn build(self) -> Result<Arc<Task>> {
/// all task will entering this function
/// this function is mean to executing the task_fn in Task
fn kernel_task_entry() {
let current_task = current_task()
.expect("no current task, it should have current task in kernel task entry");
current_task.func.call(());
current_task.exit();
}
let result = Task {
func: self.func.unwrap(),
data: self.data.unwrap(),
user_space: self.user_space,
task_inner: Mutex::new(TaskInner {
task_status: TaskStatus::Runnable,
ctx: TaskContext::default(),
}),
exit_code: 0,
kstack: KernelStack::new()?,
link: LinkedListAtomicLink::new(),
priority: self.priority,
//cpu_affinity: task_attrs.cpu_affinity,
};
result.task_inner.lock().task_status = TaskStatus::Runnable;
result.task_inner.lock().ctx.rip = kernel_task_entry as usize;
result.task_inner.lock().ctx.regs.rsp =
(crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64;
Ok(Arc::new(result))
}
/// Builds a new task and run it immediately.
///
/// Each task is associated with a per-task data and an optional user space.
/// If having a user space, then the task can switch to the user space to
/// execute user code. Multiple tasks can share a single user space.
pub fn spawn(self) -> Result<Arc<Task>> {
/// all task will entering this function
/// this function is mean to executing the task_fn in Task
fn kernel_task_entry() {
let current_task = current_task()
.expect("no current task, it should have current task in kernel task entry");
current_task.func.call(());
current_task.exit();
}
let result = Task {
func: self.func.unwrap(),
data: self.data.unwrap(),
user_space: self.user_space,
task_inner: Mutex::new(TaskInner {
task_status: TaskStatus::Runnable,
ctx: TaskContext::default(),
}),
exit_code: 0,
kstack: KernelStack::new()?,
link: LinkedListAtomicLink::new(),
priority: self.priority,
};
result.task_inner.lock().task_status = TaskStatus::Runnable;
result.task_inner.lock().ctx.rip = kernel_task_entry as usize;
result.task_inner.lock().ctx.regs.rsp =
(crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64;
let arc_self = Arc::new(result);
switch_to_task(arc_self.clone());
Ok(arc_self)
}
}

View File

@ -22,7 +22,10 @@
use crate::{
prelude::*,
thread::{kernel_thread::KernelThreadExt, Thread},
thread::{
kernel_thread::{KernelThreadExt, ThreadOptions},
Thread,
},
};
use jinux_frame::{boot, exit_qemu, QemuExitCode};
use process::Process;
@ -40,6 +43,7 @@ pub mod fs;
pub mod net;
pub mod prelude;
mod process;
mod sched;
pub mod syscall;
pub mod thread;
pub mod time;
@ -49,7 +53,7 @@ pub mod vm;
pub fn init() {
driver::init();
net::init();
process::fifo_scheduler::init();
sched::init();
fs::rootfs::init(boot::initramfs()).unwrap();
device::init().unwrap();
}
@ -61,12 +65,12 @@ fn init_thread() {
);
net::lazy_init();
// driver::pci::virtio::block::block_device_test();
let thread = Thread::spawn_kernel_thread(|| {
let thread = Thread::spawn_kernel_thread(ThreadOptions::new(|| {
println!("[kernel] Hello world from kernel!");
let current = current_thread!();
let tid = current.tid();
debug!("current tid = {}", tid);
});
}));
thread.join();
info!(
"[jinux-std/lib.rs] spawn kernel thread, tid = {}",
@ -103,7 +107,7 @@ fn init_thread() {
/// first process never return
#[controlled]
pub fn run_first_process() -> ! {
Thread::spawn_kernel_thread(init_thread);
Thread::spawn_kernel_thread(ThreadOptions::new(init_thread));
unreachable!()
}

View File

@ -2,7 +2,10 @@ use jinux_frame::timer::read_monotonic_milli_seconds;
use crate::{
prelude::*,
thread::{kernel_thread::KernelThreadExt, Thread},
thread::{
kernel_thread::{KernelThreadExt, ThreadOptions},
Thread,
},
};
use super::Iface;
@ -67,5 +70,5 @@ pub fn spawn_background_poll_thread(iface: Arc<dyn Iface>) {
}
}
};
Thread::spawn_kernel_thread(task_fn);
Thread::spawn_kernel_thread(ThreadOptions::new(task_fn));
}

View File

@ -1,32 +0,0 @@
use crate::prelude::*;
use jinux_frame::task::{set_scheduler, Scheduler, Task, TaskAdapter};
use intrusive_collections::LinkedList;
pub struct FifoScheduler {
tasks: SpinLock<LinkedList<TaskAdapter>>,
}
impl FifoScheduler {
pub fn new() -> Self {
Self {
tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())),
}
}
}
impl Scheduler for FifoScheduler {
fn enqueue(&self, task: Arc<Task>) {
self.tasks.lock_irq_disabled().push_back(task.clone());
}
fn dequeue(&self) -> Option<Arc<Task>> {
self.tasks.lock_irq_disabled().pop_front()
}
}
pub fn init() {
let fifo_scheduler = Box::new(FifoScheduler::new());
let scheduler = Box::<FifoScheduler>::leak(fifo_scheduler);
set_scheduler(scheduler);
}

View File

@ -1,6 +1,5 @@
mod clone;
mod exit;
pub mod fifo_scheduler;
pub mod posix_thread;
#[allow(clippy::module_inception)]
mod process;

View File

@ -0,0 +1,5 @@
mod priority_scheduler;
// There may be multiple scheduling policies in the system,
// and subsequent schedulers can be placed under this module.
pub use self::priority_scheduler::init;

View File

@ -0,0 +1,57 @@
use crate::prelude::*;
use intrusive_collections::LinkedList;
use jinux_frame::task::{set_scheduler, Scheduler, Task, TaskAdapter};
pub fn init() {
let preempt_scheduler = Box::new(PreemptScheduler::new());
let scheduler = Box::<PreemptScheduler>::leak(preempt_scheduler);
set_scheduler(scheduler);
}
/// The preempt scheduler
///
/// Real-time tasks are placed in the `real_time_tasks` queue and
/// are always prioritized during scheduling.
/// Normal tasks are placed in the `normal_tasks` queue and are only
/// scheduled for execution when there are no real-time tasks.
struct PreemptScheduler {
/// Tasks with a priority of less than 100 are regarded as real-time tasks.
real_time_tasks: SpinLock<LinkedList<TaskAdapter>>,
/// Tasks with a priority greater than or equal to 100 are regarded as normal tasks.
normal_tasks: SpinLock<LinkedList<TaskAdapter>>,
}
impl PreemptScheduler {
pub fn new() -> Self {
Self {
real_time_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())),
normal_tasks: SpinLock::new(LinkedList::new(TaskAdapter::new())),
}
}
}
impl Scheduler for PreemptScheduler {
fn enqueue(&self, task: Arc<Task>) {
if task.is_real_time() {
self.real_time_tasks
.lock_irq_disabled()
.push_back(task.clone());
} else {
self.normal_tasks
.lock_irq_disabled()
.push_back(task.clone());
}
}
fn dequeue(&self) -> Option<Arc<Task>> {
if !self.real_time_tasks.lock_irq_disabled().is_empty() {
self.real_time_tasks.lock_irq_disabled().pop_front()
} else {
self.normal_tasks.lock_irq_disabled().pop_front()
}
}
fn should_preempt(&self, task: &Arc<Task>) -> bool {
!task.is_real_time() && !self.real_time_tasks.lock_irq_disabled().is_empty()
}
}

View File

@ -1,4 +1,4 @@
use jinux_frame::task::Task;
use jinux_frame::task::TaskOptions;
use crate::prelude::*;
@ -11,15 +11,10 @@ pub trait KernelThreadExt {
/// get the kernel_thread structure
fn as_kernel_thread(&self) -> Option<&KernelThread>;
/// create a new kernel thread structure, **NOT** run the thread.
fn new_kernel_thread<F>(task_fn: F) -> Arc<Thread>
where
F: Fn() + Send + Sync + 'static;
fn new_kernel_thread(thread_options: ThreadOptions) -> Arc<Thread>;
/// create a new kernel thread structure, and then run the thread.
fn spawn_kernel_thread<F>(task_fn: F) -> Arc<Thread>
where
F: Fn() + Send + Sync + 'static,
{
let thread = Self::new_kernel_thread(task_fn);
fn spawn_kernel_thread(thread_options: ThreadOptions) -> Arc<Thread> {
let thread = Self::new_kernel_thread(thread_options);
thread.run();
thread
}
@ -32,10 +27,8 @@ impl KernelThreadExt for Thread {
self.data().downcast_ref::<KernelThread>()
}
fn new_kernel_thread<F>(task_fn: F) -> Arc<Self>
where
F: Fn() + Send + Sync + 'static,
{
fn new_kernel_thread(mut thread_options: ThreadOptions) -> Arc<Self> {
let task_fn = thread_options.take_func();
let thread_fn = move || {
task_fn();
let current_thread = current_thread!();
@ -44,7 +37,11 @@ impl KernelThreadExt for Thread {
};
let tid = allocate_tid();
let thread = Arc::new_cyclic(|thread_ref| {
let task = Task::new(thread_fn, thread_ref.clone(), None).unwrap();
let weal_thread = thread_ref.clone();
let task = TaskOptions::new(thread_fn)
.data(weal_thread)
.build()
.unwrap();
let status = ThreadStatus::Init;
let kernel_thread = KernelThread;
Thread::new(tid, task, kernel_thread, status)
@ -65,3 +62,38 @@ impl KernelThreadExt for Thread {
}
}
}
/// Options to create or spawn a new thread.
pub struct ThreadOptions {
func: Option<Box<dyn Fn() + Send + Sync>>,
priority: u16,
}
impl ThreadOptions {
pub fn new<F>(func: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Self {
func: Some(Box::new(func)),
priority: 100,
}
}
pub fn func<F>(mut self, func: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.func = Some(Box::new(func));
self
}
fn take_func(&mut self) -> Box<dyn Fn() + Send + Sync> {
self.func.take().unwrap()
}
pub fn priority(mut self, priority: u16) -> Self {
self.priority = priority;
self
}
}

View File

@ -1,6 +1,6 @@
use jinux_frame::{
cpu::UserContext,
task::Task,
task::{preempt, Task, TaskOptions},
user::{UserContextApi, UserEvent, UserMode, UserSpace},
};
@ -50,13 +50,19 @@ pub fn create_new_user_task(user_space: Arc<UserSpace>, thread_ref: Weak<Thread>
debug!("{} is suspended.", current_thread.tid());
handle_pending_signal(context).unwrap();
}
// a preemption point after handling user event.
preempt();
}
debug!("exit user loop");
// FIXME: This is a work around: exit in kernel task entry may be not called. Why this will happen?
Task::current().exit();
}
Task::new(user_task_entry, thread_ref, Some(user_space)).expect("spawn task failed")
TaskOptions::new(user_task_entry)
.data(thread_ref)
.user_space(Some(user_space))
.build()
.expect("spawn task failed")
}
fn handle_user_event(user_event: UserEvent, context: &mut UserContext) {