Refactor Monitor

This commit is contained in:
Yuke Peng 2024-06-13 14:13:59 +08:00 committed by Tate, Hongliang Tian
parent 89ce1f4df9
commit c5d0099dfd

View File

@ -139,7 +139,7 @@ impl WorkerPool {
}
WorkerPool {
local_pools,
monitor: Monitor::new(pool_ref.clone()),
monitor: Monitor::new(pool_ref.clone(), &priority),
priority,
cpu_set,
scheduler: Arc::new(SimpleScheduler::new(pool_ref.clone())),
@ -230,7 +230,7 @@ impl Drop for WorkerPool {
}
impl Monitor {
pub fn new(worker_pool: Weak<WorkerPool>) -> Arc<Self> {
pub fn new(worker_pool: Weak<WorkerPool>, priority: &WorkPriority) -> Arc<Self> {
Arc::new_cyclic(|monitor_ref| {
let weal_monitor = monitor_ref.clone();
let task_fn = Box::new(move || {
@ -238,10 +238,14 @@ impl Monitor {
current_monitor.run_monitor_loop();
});
let cpu_affinity = CpuSet::new_full();
let priority = match priority {
WorkPriority::High => Priority::high(),
WorkPriority::Normal => Priority::normal(),
};
let bound_thread = Thread::new_kernel_thread(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(Priority::normal()),
.priority(priority),
);
Self {
worker_pool,
@ -255,6 +259,8 @@ impl Monitor {
}
fn run_monitor_loop(self: &Arc<Self>) {
let sleep_queue = WaitQueue::new();
let sleep_duration = Duration::from_millis(100);
loop {
let worker_pool = self.worker_pool.upgrade();
let Some(worker_pool) = worker_pool else {
@ -264,7 +270,7 @@ impl Monitor {
for local_pool in worker_pool.local_pools.iter() {
local_pool.set_heartbeat(false);
}
Thread::yield_now();
sleep_queue.wait_until_or_timeout(|| -> Option<()> { None }, &sleep_duration);
}
}
}