// SPDX-License-Identifier: MPL-2.0 #![allow(dead_code)] use ostd::cpu::CpuSet; use spin::Once; use work_item::WorkItem; use worker_pool::WorkerPool; use crate::prelude::*; mod simple_scheduler; pub mod work_item; pub mod worker; pub mod worker_pool; static WORKERPOOL_NORMAL: Once> = Once::new(); static WORKERPOOL_HIGH_PRI: Once> = Once::new(); static WORKQUEUE_GLOBAL_NORMAL: Once> = Once::new(); static WORKQUEUE_GLOBAL_HIGH_PRI: Once> = Once::new(); /// Work queue mechanism. /// /// # Overview /// /// A `workqueue` is a kernel-level mechanism used to schedule and execute deferred work. /// Deferred work refers to tasks that need to be executed at some point in the future, /// but not necessarily immediately. /// /// The workqueue mechanism is implemented using a combination of kernel threads and data /// structures such as `WorkItem`, `WorkQueue`, `Worker` and `WorkerPool`. The `WorkItem` /// represents a task to be processed, while the `WorkQueue` maintains the queue of submitted /// `WorkItems`. The `Worker` is responsible for processing these submitted tasks, /// and the `WorkerPool` manages and schedules these workers. /// /// # Examples /// /// The system has a default work queue and worker pool, /// and it also provides high-level APIs for users to use. /// Here is a basic example to how to use those APIs. /// /// ```rust /// use crate::thread::work_queue::{submit_work_func, submit_work_item, WorkItem}; /// /// // Submit to high priority queue. /// submit_work_func(||{ }, true); /// /// // Submit to low priority queue. /// submit_work_func(||{ }, false); /// /// fn deferred_task(){ /// // ... /// } /// /// // Create a work item. /// let work_item = Arc::new(WorkItem::new(Box::new(deferred_task))); /// /// // Submit to high priority queue. /// submit_work_item(work_item, true); /// /// // Submit to low priority queue. /// submit_work_item(work_item, false); /// ``` /// /// Certainly, users can also create a dedicated WorkQueue and WorkerPool. /// /// ```rust /// use ostd::cpu::CpuSet; /// use crate::thread::work_queue::{WorkQueue, WorkerPool, WorkItem}; /// /// fn deferred_task(){ /// // ... /// } /// /// let cpu_set = CpuSet::new_full(); /// let high_pri_pool = WorkerPool::new(true, cpu_set); /// let my_queue = WorkQueue::new(Arc::downgrade(high_pri_pool.get().unwrap())); /// /// let work_item = Arc::new(WorkItem::new(Box::new(deferred_task))); /// my_queue.enqueue(work_item); /// /// ``` /// Submit a function to a global work queue. pub fn submit_work_func(work_func: F, work_priority: WorkPriority) where F: Fn() + Send + Sync + 'static, { let work_item = Arc::new(WorkItem::new(Box::new(work_func))); submit_work_item(work_item, work_priority); } /// Submit a work item to a global work queue. pub fn submit_work_item(work_item: Arc, work_priority: WorkPriority) -> bool { match work_priority { WorkPriority::High => WORKQUEUE_GLOBAL_HIGH_PRI .get() .unwrap() .enqueue(work_item.clone()), WorkPriority::Normal => WORKQUEUE_GLOBAL_NORMAL .get() .unwrap() .enqueue(work_item.clone()), } } /// A work queue maintains a series of work items to be handled /// asynchronously in a process context. pub struct WorkQueue { worker_pool: Weak, inner: SpinLock, } struct WorkQueueInner { pending_work_items: Vec>, } impl WorkQueue { /// Create a `WorkQueue` and specify a `WorkerPool` to /// process the submitted `WorkItems`. pub fn new(worker_pool: Weak) -> Arc { let queue = Arc::new(WorkQueue { worker_pool: worker_pool.clone(), inner: SpinLock::new(WorkQueueInner { pending_work_items: Vec::new(), }), }); worker_pool .upgrade() .unwrap() .assign_work_queue(queue.clone()); queue } /// Submit a work item. Return `false` if the work item is currently pending. pub fn enqueue(&self, work_item: Arc) -> bool { if !work_item.try_pending() { return false; } self.inner .disable_irq() .lock() .pending_work_items .push(work_item); if let Some(worker_pool) = self.worker_pool.upgrade() { worker_pool.schedule() } true } /// Request a pending work item. The `request_cpu` indicates the CPU where /// the calling worker is located. fn dequeue(&self, request_cpu: u32) -> Option> { let mut inner = self.inner.disable_irq().lock(); let index = inner .pending_work_items .iter() .position(|item| item.is_valid_cpu(request_cpu))?; let item = inner.pending_work_items.remove(index); Some(item) } fn has_pending_work_items(&self, request_cpu: u32) -> bool { self.inner .disable_irq() .lock() .pending_work_items .iter() .any(|item| item.is_valid_cpu(request_cpu)) } } /// Initialize global worker pools and work queues. pub fn init() { WORKERPOOL_NORMAL.call_once(|| { let cpu_set = CpuSet::new_full(); WorkerPool::new(WorkPriority::Normal, cpu_set) }); WORKERPOOL_NORMAL.get().unwrap().run(); WORKERPOOL_HIGH_PRI.call_once(|| { let cpu_set = CpuSet::new_full(); WorkerPool::new(WorkPriority::High, cpu_set) }); WORKERPOOL_HIGH_PRI.get().unwrap().run(); WORKQUEUE_GLOBAL_NORMAL .call_once(|| WorkQueue::new(Arc::downgrade(WORKERPOOL_NORMAL.get().unwrap()))); WORKQUEUE_GLOBAL_HIGH_PRI .call_once(|| WorkQueue::new(Arc::downgrade(WORKERPOOL_HIGH_PRI.get().unwrap()))); } impl Drop for WorkQueue { fn drop(&mut self) { //TODO: Handling non-empty queues. } } #[derive(PartialEq)] pub enum WorkPriority { High, Normal, }